如何将NodeJs分布式任务队列与容器优雅停机结合成?

摘要:当在后端执行复杂的任务时,通常不能够在短时间内即时响应,例如文档导入、导出任务等。再加上当前的LLMs发展,我们可以实现文档的写作、质检、翻译等复杂任务,这些任务通常都比较耗时,这样就需要任务队列来管理这些异步任务的执行顺序和资源分配,而优
当在后端执行复杂的任务时,通常不能够在短时间内即时响应,例如文档导入、导出任务等。再加上当前的LLMs发展,我们可以实现文档的写作、质检、翻译等复杂任务,这些任务通常都比较耗时,这样就需要任务队列来管理这些异步任务的执行顺序和资源分配,而优雅停机则用以保证任务的完整处理。 AI Infra 系列相关文章 基于 fetch 的 SSE 方案 基于向量检索实现基础 RAG 服务 流式 Markdown 增量富文本解析算法 仿照豆包实现 Prompt 变量模板输入框 基于 NodeJs 的分布式任务队列与容器优雅停机 概述 任务队列用于管理和调度异步任务,在实现时我们可能会使用一些现成的库,例如Bull/BullMQ、Agenda等。而如果需要实现更复杂的任务/消息调度,例如不同系统、应用之间的可靠消息传递等服务,我们还需要使用Kafka、RabbitMQ等消息队列系统。整体来说,异步任务可以实现如下功能: 异步任务,将耗时的操作放到后台去处理,让主程序能够快速返回响应。 流量削峰,将大量请求转化为任务,平稳地存入队列,然后系统按照能承受的处理能力,从队列中取出任务进行消费。 错误重试,当任务执行失败时,队列可以尝试进行重试,如果重试多次后仍然失败,则可以将任务标记为失败。 优雅停机则是指在应用程序关闭时,能够正确地处理正在进行的任务,确保数据的一致性和完整性。在这里的优雅停机主要分为针对请求的处理和针对任务队列的处理,请求的部分通常会由网关加上框架本身处理,而任务队列的重置或者结束,则需要靠我们主动处理。整体来说,优雅停机通常需要做如下处理: 停止接收新的请求,服务需要避免新的请求进入,这部分通常需要网关等前置节点来处理。 处理当前请求,服务需要继续处理当前已经在处理中的请求,确保这些请求能够正常完成。 释放已分配资源,在请求处理完成后,需要释放所有申请的资源,例如关闭数据库连接等。 关闭服务,当所有请求都处理完毕且资源都已释放后,需要正常关闭服务,或者强制停机。 在本文中我们在Nest框架的基础上,实现简化版的分布式任务调度队列。并且基于pm2管理NodeJs进程,实现了优雅停机的能力,而且探讨了Linux系统下进程与信号的传递表现。文中涉及的实现都在 https://github.com/WindRunnerMax/webpack-simple-environment 中。 任务队列 任务队列的实现比较简单,在我们的场景中主要目标就是流量削峰,特别是在调度LLMs时,我们需要将请求排队处理,当前的云服务商都会有并发请求的限制。当前云服务商主要是会限制RPM和TPM,即每分钟请求数和每分钟Token数量,这样就需要我们在应用层进行请求的排队处理。 在这里我们就不借助已有的框架,而是直接实现任务队列,主要目标是能够控制并发任务,并且叙述单进程以及单机集群模式、分布式任务的锁控制方案。而除了任务队列之外,还有很多细节需要处理,例如从数据库读取的控制并发、任务超时控制等,这些都可以在实际应用中进行扩展。 消息队列系统通常会有推送模式以及拉取模式两种消息消费的方式,而实际上在拉取模式下,消费者也需要实现类似的任务队列来控制并发请求,例如Kafka系统。当然如果是推送模式,就可以直接在消息队列中进行并发控制,例如RabbitMQ系统。 单实例任务 我们先来实现单实例单进程的任务队列,首先需要实现一个任务队列类,通过实例化这个类我们可以在多个服务Service分别调度队列实例。在这里需要注意的是,如若不想在全局分别创建实例,在Next中就在Provider的类中独立维护实例,但是需要注意其Scope必须为全局单例而非请求级实例。 @Injectable({ scope: Scope.DEFAULT }) export class TasksService {} 接下来假设实际任务是在数据库中存储的,毕竟任务需要持久化存储以防止数据丢失。
阅读全文