如何利用.NET6 EasyNetQ实现RabbitMQ延迟消息的最佳实践?
摘要:背景 最近遇到一个比较特殊需求,需要修改一个的RabbitMQ消费者,以实现在消费某种特定的类型消息时,延迟1小时再处理,几个需要注意的点: 延迟是以小时为单位 不是所有消息都延迟消费,只延迟特定类型的消息 只在第一次消费时延迟1小时,容错
背景
最近遇到一个比较特殊需求,需要修改一个的RabbitMQ消费者,以实现在消费某种特定的类型消息时,延迟1小时再处理,几个需要注意的点:
延迟是以小时为单位
不是所有消息都延迟消费,只延迟特定类型的消息
只在第一次消费时延迟1小时,容错机制产生的重新消费(也即消息消费失败,多次进入延迟队列重试),则不再延迟1小时
消费者消费过程中可能会重启
考虑到这几点,我们需要一个标识以及持久化,不能简单使用Thread.Sleep或者Task.Delay;下面开始演示在不引入其它框架资源的前提下,利用现有的RabbitMQ来实现这个需求。
准备
如果没有可用的RabbitMQ测试环境,推荐使用docker本地搭建
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
项目搭建
创建解决方案RabbitMQDemo, 并添加一个.Net6控制台程序Producer作为生产者,
mkdir RabbitMQDemo
cd RabbitMQDemo
dotnet new sln -n RabbitMQDemo
mkdir src
cd src
dotnet new console -n Producer
cd Producer
dotnet add package EasyNetQ -s https://api.nuget.org/v3/index.json
dotnet add package Newtonsoft.Json -s https://api.nuget.org/v3/index.json
cd ../..
dotnet sln add ./src/Producer/Producer.csproj
我们给Producer项目添加了两个包 ——EasyNetQ是用来简便RabbitMQ操作,添加Newtonsoft.Json则是因为EasyNetQ从v7版本开始移除了对前者的依赖,需要使用者自行添加。
接下来定义消息的数据结构,添加一个类库Core到解决方案,
cd src
dotnet new classlib --name Core
cd ..
dotnet sln add ./src/Core/Core.csproj
添加如下OrderNotification类,后面我们根据消息的 Type的值来确定是正常消费还是延迟消费。
