C#通过rabbitmq实现定时任务(延时队列)
作者:初夏的阳光丶 发布时间:2021-11-02 23:11:17
标签:C#,rabbitmq,定时任务
本文主要讲解如何通过RabbitMQ实现定时任务(延时队列)
环境准备
需要在MQ中进行安装插件 地址链接
插件介绍地址:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
使用场景
作为一个新的预支付订单被初始化放置,如果该订单在指定时间内未进行支付,则将被认为超时订单进行关闭处理;电商系统中应用较多,用户购买商品产生订单,但未进行支付,订单产生30分钟内未支付将关闭订单(且满足该场景数量庞大),不可能采用人工干预。
代码介绍
生产者
var factory = new ConnectionFactory()
{
Uri = new Uri("MQ地址")
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
//设置Exchange队列类型
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
//设置当前消息为延时队列
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
for (int i = 0; i < 3; i++)
{
var time = 1000 * 5;
var message = $@"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
//设置消息的过期时间
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", 5000 }
};
channel.BasicPublish(exchange: exchangeName,
routingKey: routingkey,
basicProperties: props,
body: body);
Console.WriteLine(message);
}
Console.ReadLine();
消费者(自动绑定队列写法)
var factory = new ConnectionFactory()
{
Uri = new Uri(MQ地址)
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var queueName = "delay_queueName";
channel.QueueDeclare(queueName, true, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($@"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.ReadLine();
消费者(手动绑定队列写法)
var factory = new ConnectionFactory()
{
Uri = new Uri(MQ地址)
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
var autoDelete = true;
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
//channel.QueueDeclare(queueName, true, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($@"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.ReadLine();
最终实现效果(两个消费者)
在上述实现中,其实主要靠以下参数来帮我们实现当前功能
声明Exchange中的 type: "x-delayed-message" 这个表明当前队列为延时消息队列
声明Exchange中arguments中的 {"x-delayed-type", "topic"} 当前表明当前队列为Topic模式
最后 我们在CreateBasicProperties的Header中设置 { "x-delay", 5000 }来达到消息延时的功能(单位为ms)
建议
如果使用当前模式来做定时任务,在要求消息不丢失的前提下,需要运维同学提供稳定的MQ环境
来源:https://www.cnblogs.com/ancold/p/14705601.html


猜你喜欢
- 本文使用SpringBoot结合Redis进行简单的token鉴权。1.简介刚刚换了公司,所以最近有些忙碌,所以一直没有什么产出,最近朋友问
- 1. 什么是λ表达式λ表达式本质上是一个匿名方法。让我们来看下面这个例子: public int add(int x, int
- 一. 概述在开发后端接口, 通常都会涉及检验参数必填校验, 一般我们的处理都是很粗暴的写个if()判断, 然后抛异常. 本文将介绍通过代理的
- 前言CyclicBarrier和CountDownLatch这两个工具都是在java.util.concurrent包下,并且平时很多场景都
- gateway、webflux、reactor-netty请求日志输出场景在使用spring cloud gateway时想要输出请求日志,
- AutoGenerator 是 MyBatis-Plus 的代码生成器,通过 AutoGenerator 可以快速生成 Entity、Map
- 先引用using System.Runtime.InteropServices; 的命名空间, 然后在合适的位置加上如下代码就OK。。注意:
- 前言SQL注入漏洞作为WEB安全的最常见的漏洞之一,在java中随着预编译与各种ORM框架的使用,注入问题也越来越少。新手代码审计者往往对J
- 在前面的文章中可以发现当我们通过RestTemplate调用其它服务的API时,所需要的参数须在请求的URL中进行拼接,如果参数少的话或许我
- 1、基本介绍随着分词在信息检索领域应用的越来越广泛,分词这门技术对大家并不陌生。对于英文分词处理相对简单,经过拆分单词、排斥停止词、提取词干
- SpringBoot自定义 * 和跨域配置冲突技术栈vue-cli3,springboot 2.3.2.RELEASE问题引出在做毕业设计过
- 本文实例为大家分享了Android实现连连看游戏的具体代码,供大家参考,具体内容如下本人用 android studio 实现的源码主活动
- 问题提出:自己在做一个小网站充当练手,但是前端图片经过base64加密后传往后端在解码。但是一直都有问题,请大神赐教 publi
- 这篇博客将梳理一下.NET中4个Timer类,及其用法。1. System.Threading.Timerpublic Timer(Time
- spring-data-redis项目  spring-data-redis提供了在Spring应用中通
- 背景我在准备使用 JVM 的命令时候观察程序的动态,但是发现 Main 函数启动就退出了,所以也没办法直接观察,于是想到了如何让 Main
- springBoot框架的特点就是舍去了繁琐的配置。使开发者可以把更多的精力放在业务逻辑的开发上。springBoot搭建分三步。1.创建一
- 异常的英文单词是exception,字面翻译就是“意外、例外”的意思,也就是非正常情况。事实上,异常本质上是程序上的错误,包括程序逻辑错误和
- 指纹识别作为最新兴起的用户身份验证机制,已经被越来越多的应用程序所采用,相比传统的密码九宫格等验证方法,指纹识别更加安全,如今越来越多的安卓
- 占位符Placeholder的使用xml中的配置:<?xml version="1.0" encoding=&qu