Cerely框架如何实现异步执行和定时任务的疑问?

摘要:celery-处理大量的分布式系统,专注于实时处理的异步队列,也支持任务调度 celery的架构由三部分组成,消息中间件,任务执行单元和任务结果存储。 用户发起请求 >django(处理业务) >
celery-处理大量的分布式系统,专注于实时处理的异步队列,也支持任务调度 celery的架构由三部分组成,消息中间件,任务执行单元和任务结果存储。 用户发起请求--->django(处理业务)--->消息中间件(rabbitmq,redis)--->celery--->消息存储 celery异步执行 首先保证消息中间件的启动,消费者定义的任务,然后监听,只需要一条命令 消费者怎么去发消息?生产者怎么去消费? 消费者:celery消费者这边的操作是,创建一个celery的对象,然后绑定消息中间件与异步的结果存储位置,然后给想要执行的方法上加上@cel.task import celery import time backend='redis://127.0.0.1:6379/1' # 异步的结果存储位置 broker='redis://127.0.0.1:6379/2' # 消息中间件 cel = celery.Celery('test', backend=backend, broker=broker) @cel.task def send_email(name): print("向%是发送邮件...."%name) time.sleep(5) print('向%s发送邮件完成'%name) return 'ok' # 终端启动命令:celery -A test worker -l info, # 在 Celery 5.0 及更高版本中,-A 选项(用于指定 Celery 应用实例)不再是 worker 子命令的专属选项,而是成为了 全局选项。 # 正确的用法应该是将 -A 放在 celery 命令之后,worker 子命令之前。 生成者:消费者方法里面用了@cel.task,里面就会有delay方法,这个就会自动取连接消息中间件中创建好的队列,从消费这里把异步任务导入,使用delay,传入参数 # 首先从消费这里把异步任务导入 from test import send_email # 消费者方法里面用了@cel.task,里面就会有delay方法,这个就会自动取连接消息中间件中创建好的队列 result = send_email.delay("yuan") # 这里插入的一定是消费者哪里的函数(send_email),以及对应的参数(yuan),不能凭空捏造 异步取结果: # 首先先从消费者哪里引入对象,还有引入AsyncResult from celery.result import AsyncResult from test import cel # 假设 test.py 中定义了 Celery 应用实例,通常命名为 app 或 cel # 1. 构造 AsyncResult 对象 # id="shajbx1-ajsh-jhkj-knjknhub": 这是之前发送任务时返回的唯一任务ID # app=cel: 这是你的 Celery 应用实例。AsyncResult 需要这个实例来知道如何连接到结果后端(backend) # 以及消息代理(broker),从而正确地查询任务状态和结果。 async_result = AsyncResult(id="shajbx1-ajsh-jhkj-knjknhub", app=cel) # 2. 检查任务状态 # async_result.successful(): # 这是一个便捷方法,用于检查任务是否已经成功完成。 # 如果任务的状态是 SUCCESS,则返回 True。 if async_result.successful(): # 3. 获取任务结果 # result = async_result.get(): # 如果任务成功,这个方法会从结果后端获取任务的返回值。 # 注意:如果任务尚未完成,`get()` 方法会阻塞当前线程,直到任务完成并返回结果。 # 在生产环境中,你可能需要考虑超时参数 `timeout`,或者在循环中不断检查状态而非直接 `get()`。 result = async_result.get() print(result) # ok # 4. 处理任务失败的情况 # async_result.failed(): # 这是一个便捷方法,用于检查任务是否已经失败。 # 如果任务的状态是 FAILURE,则返回 True。 elif async_result.failed(): # 这里可以获取失败的原因 (async_result.traceback) # 或者打印错误信息 (async_result.info) # ... 进行错误处理或日志记录 pass celery定时任务 引入时间模块然后转换成时间戳,使用的是apply_async(args=[,], eta=时间戳) 多目录下完成定时任务 对celery配置文件加一个beat_schedule任务调度器,要在celery.py文件中写 celery.py 文件中配置了周期性任务(即使用 Celery Beat 调度器),那么除了启动 worker 之外,您还需要单独启动一个 Celery Beat 进程。 Celery worker 负责执行任务,而 Celery Beat 进程则负责 调度和发送周期性任务。 您需要在终端敲的命令是: celery -A your_celery_app_module beat -l info 命令详解: celery: Celery 命令的入口。 -A your_celery_app_module: 全局选项,指定您的 Celery 应用实例所在的模块。如果您的 Celery 应用实例在 celery.py 文件中,那么 your_celery_app_module 就是 celery (不带 .py 后缀)。 beat: 这是启动 Celery Beat 调度器的子命令。 -l info: 设置日志级别为 info,这样您可以在终端看到调度器的运行信息。 面试话术: celery的异步执行就是让那些耗时任务不阻碍主任务的执行,去一边执行绑定中间件将消息存储,用于短信与邮件的发送。