Cerely框架如何实现异步执行和定时任务的疑问?
摘要:celery-处理大量的分布式系统,专注于实时处理的异步队列,也支持任务调度 celery的架构由三部分组成,消息中间件,任务执行单元和任务结果存储。 用户发起请求 >django(处理业务) >
celery-处理大量的分布式系统,专注于实时处理的异步队列,也支持任务调度
celery的架构由三部分组成,消息中间件,任务执行单元和任务结果存储。
用户发起请求--->django(处理业务)--->消息中间件(rabbitmq,redis)--->celery--->消息存储
celery异步执行
首先保证消息中间件的启动,消费者定义的任务,然后监听,只需要一条命令
消费者怎么去发消息?生产者怎么去消费?
消费者:celery消费者这边的操作是,创建一个celery的对象,然后绑定消息中间件与异步的结果存储位置,然后给想要执行的方法上加上@cel.task
import celery
import time
backend='redis://127.0.0.1:6379/1' # 异步的结果存储位置
broker='redis://127.0.0.1:6379/2' # 消息中间件
cel = celery.Celery('test', backend=backend, broker=broker)
@cel.task
def send_email(name):
print("向%是发送邮件...."%name)
time.sleep(5)
print('向%s发送邮件完成'%name)
return 'ok'
# 终端启动命令:celery -A test worker -l info,
# 在 Celery 5.0 及更高版本中,-A 选项(用于指定 Celery 应用实例)不再是 worker 子命令的专属选项,而是成为了 全局选项。
# 正确的用法应该是将 -A 放在 celery 命令之后,worker 子命令之前。
生成者:消费者方法里面用了@cel.task,里面就会有delay方法,这个就会自动取连接消息中间件中创建好的队列,从消费这里把异步任务导入,使用delay,传入参数
# 首先从消费这里把异步任务导入
from test import send_email
# 消费者方法里面用了@cel.task,里面就会有delay方法,这个就会自动取连接消息中间件中创建好的队列
result = send_email.delay("yuan")
# 这里插入的一定是消费者哪里的函数(send_email),以及对应的参数(yuan),不能凭空捏造
异步取结果:
# 首先先从消费者哪里引入对象,还有引入AsyncResult
from celery.result import AsyncResult
from test import cel # 假设 test.py 中定义了 Celery 应用实例,通常命名为 app 或 cel
# 1. 构造 AsyncResult 对象
# id="shajbx1-ajsh-jhkj-knjknhub": 这是之前发送任务时返回的唯一任务ID
# app=cel: 这是你的 Celery 应用实例。AsyncResult 需要这个实例来知道如何连接到结果后端(backend)
# 以及消息代理(broker),从而正确地查询任务状态和结果。
async_result = AsyncResult(id="shajbx1-ajsh-jhkj-knjknhub", app=cel)
# 2. 检查任务状态
# async_result.successful():
# 这是一个便捷方法,用于检查任务是否已经成功完成。
# 如果任务的状态是 SUCCESS,则返回 True。
if async_result.successful():
# 3. 获取任务结果
# result = async_result.get():
# 如果任务成功,这个方法会从结果后端获取任务的返回值。
# 注意:如果任务尚未完成,`get()` 方法会阻塞当前线程,直到任务完成并返回结果。
# 在生产环境中,你可能需要考虑超时参数 `timeout`,或者在循环中不断检查状态而非直接 `get()`。
result = async_result.get()
print(result) # ok
# 4. 处理任务失败的情况
# async_result.failed():
# 这是一个便捷方法,用于检查任务是否已经失败。
# 如果任务的状态是 FAILURE,则返回 True。
