Cerely框架如何实现异步执行和定时任务的疑问?

摘要:celery-处理大量的分布式系统,专注于实时处理的异步队列,也支持任务调度 celery的架构由三部分组成,消息中间件,任务执行单元和任务结果存储。 用户发起请求 >django(处理业务) >
celery-处理大量的分布式系统,专注于实时处理的异步队列,也支持任务调度 celery的架构由三部分组成,消息中间件,任务执行单元和任务结果存储。 用户发起请求--->django(处理业务)--->消息中间件(rabbitmq,redis)--->celery--->消息存储 celery异步执行 首先保证消息中间件的启动,消费者定义的任务,然后监听,只需要一条命令 消费者怎么去发消息?生产者怎么去消费? 消费者:celery消费者这边的操作是,创建一个celery的对象,然后绑定消息中间件与异步的结果存储位置,然后给想要执行的方法上加上@cel.task import celery import time backend='redis://127.0.0.1:6379/1' # 异步的结果存储位置 broker='redis://127.0.0.1:6379/2' # 消息中间件 cel = celery.Celery('test', backend=backend, broker=broker) @cel.task def send_email(name): print("向%是发送邮件...."%name) time.sleep(5) print('向%s发送邮件完成'%name) return 'ok' # 终端启动命令:celery -A test worker -l info, # 在 Celery 5.0 及更高版本中,-A 选项(用于指定 Celery 应用实例)不再是 worker 子命令的专属选项,而是成为了 全局选项。 # 正确的用法应该是将 -A 放在 celery 命令之后,worker 子命令之前。 生成者:消费者方法里面用了@cel.task,里面就会有delay方法,这个就会自动取连接消息中间件中创建好的队列,从消费这里把异步任务导入,使用delay,传入参数 # 首先从消费这里把异步任务导入 from test import send_email # 消费者方法里面用了@cel.task,里面就会有delay方法,这个就会自动取连接消息中间件中创建好的队列 result = send_email.delay("yuan") # 这里插入的一定是消费者哪里的函数(send_email),以及对应的参数(yuan),不能凭空捏造 异步取结果: # 首先先从消费者哪里引入对象,还有引入AsyncResult from celery.result import AsyncResult from test import cel # 假设 test.py 中定义了 Celery 应用实例,通常命名为 app 或 cel # 1. 构造 AsyncResult 对象 # id="shajbx1-ajsh-jhkj-knjknhub": 这是之前发送任务时返回的唯一任务ID # app=cel: 这是你的 Celery 应用实例。AsyncResult 需要这个实例来知道如何连接到结果后端(backend) # 以及消息代理(broker),从而正确地查询任务状态和结果。 async_result = AsyncResult(id="shajbx1-ajsh-jhkj-knjknhub", app=cel) # 2. 检查任务状态 # async_result.successful(): # 这是一个便捷方法,用于检查任务是否已经成功完成。 # 如果任务的状态是 SUCCESS,则返回 True。 if async_result.successful(): # 3. 获取任务结果 # result = async_result.get(): # 如果任务成功,这个方法会从结果后端获取任务的返回值。 # 注意:如果任务尚未完成,`get()` 方法会阻塞当前线程,直到任务完成并返回结果。 # 在生产环境中,你可能需要考虑超时参数 `timeout`,或者在循环中不断检查状态而非直接 `get()`。 result = async_result.get() print(result) # ok # 4. 处理任务失败的情况 # async_result.failed(): # 这是一个便捷方法,用于检查任务是否已经失败。 # 如果任务的状态是 FAILURE,则返回 True。
阅读全文