[奇思异想]使用RabbitMQ实现定时任务
[奇思异想]使用RabbitMQ实现定时任务
背景
工作中经常会有定时任务的需求,常见的做法可以使用Timer、Quartz、Hangfire等组件,这次想尝试下新的思路,使用RabbitMQ死信队列的机制来实现定时任务,同时帮助再次了解RabbitMQ的死信队列。
交互流程
1. 用户创建定时任务
2. 往死信队列插入一条消息,并设置过期时间为首个任务执行时间
3. 死信队列中的消息过期后,消息流向工作队列
4. 任务执行消费者监听工作队列,工作队列向消费者推送消息
5. 消费者查询数据库,读取任务信息
6. 消费者确认任务有效(未被撤销),执行任务
7. 消费者确认有下个任务,再往死信队列插入一条消息,并设置过期时间为任务执行时间
8. 重复2-7的步骤,直到所有任务执行完成或任务撤销
环境准备
请自行完成MongoDB和RabbitMQ的安装,Windows、Linux、Docker皆可,以下提供Windows的安装方法:
MongoDB:/
RabbitMQ:.html
核心代码
1. (WebApi)创建任务,并根据设置创建子任务,把任务数据写入数据库
var task = new Task{Name = form.Name,StartTime = form.StartTime,EndTime = form.EndTime,Interval = form.Interval,SubTasks = new List<SubTask>()};var startTime = task.StartTime;var endTime = task.EndTime;while ((endTime - startTime).TotalMinutes >= 0){var sendTime = startTime;if (sendTime <= endTime && sendTime > DateTime.UtcNow){task.SubTasks.Add(new SubTask { Id = ObjectId.GenerateNewId(), SendTime = sendTime });}startTime = startTime.AddMinutes(task.Interval);}await _mongoDbContext.Collection<Task>().InsertOneAsync(task);
2. (WebApi)往死信队列中写入消息
var timeFlag = task.SubTasks[0].SendTime.ToString("yyyy-MM-dd HH:mm:ssZ");var exchange = "Task";var queue = "Task";var index = 0;var pendingExchange = "PendingTask";var pendingQueue = $"PendingTask|Task:{task.Id}_{index}_{timeFlag}";using (var channel = _rabbitConnection.CreateModel()){channel.ExchangeDeclare(exchange, "direct", true);channel.QueueDeclare(queue, true, false, false);channel.QueueBind(queue, exchange, queue);var retryDic = new Dictionary<string, object>{{"x-dead-letter-exchange", exchange},{"x-dead-letter-routing-key", queue}};channel.ExchangeDeclare(pendingExchange, "direct", true);channel.QueueDeclare(pendingQueue, true, false, false, retryDic);channel.QueueBind(pendingQueue, pendingExchange, pendingQueue);var properties = channel.CreateBasicProperties();properties.Headers = new Dictionary<string, object>{["index"] = index,["id"] = task.Id.ToString(),["sendtime"] = timeFlag};properties.Expiration = ((int)(task.SubTasks[0].SendTime - DateTime.UtcNow).TotalMilliseconds).ToString(CultureInfo.InvariantCulture);channel.BasicPublish(pendingExchange, pendingQueue, properties, Encoding.UTF8.GetBytes(string.Empty));}
其中:
PendingTask为死信队列Exchange,死信队列的队列名(Queue Name)会包含Task、index、timeFlag的信息,帮助跟踪队列和子任务,同时也起到唯一标识的作用。
task.id为任务Id
index为子任务下标
timeFlag为子任务执行时间
3. (消费者)处理消息
var exchange = "Task";var queue = "Task";_channel.ExchangeDeclare(exchange, "direct", true);_channel.QueueDeclare(queue, true, false, false);_channel.QueueBind(queue, exchange, queue);var consumer = new EventingBasicConsumer(_channel);
//监听处理consumer.Received += (model, ea) =>{
//获取消息头信息var index = (int)ea.BasicProperties.Headers["index"];var id = (ea.BasicProperties.Headers["id"] as byte[]).BytesToString();var timeFlag = (ea.BasicProperties.Headers["sendtime"] as byte[]).BytesToString();
//删除临时死信队列_channel.QueueDelete($"PendingTask|Task:{id}_{index}_{timeFlag}", false, true);var taskId = new ObjectId(id);var task = _mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefault();
//撤销或已完成的任务不执行if (task == null || task.Status != TaskStatus.Normal){_channel.BasicAck(ea.DeliveryTag, false);return;}
//执行任务_logger.LogInformation($"[{DateTime.UtcNow}]执行任务...");
//设置子任务已完成task.SubTasks[index].IsSent = true;if (task.SubTasks.Count > index + 1) //还有未完成的子任务,把下个子任务的信息写入死信队列{PublishPendingMsg(_channel, task, index + 1);}else{task.Status = TaskStatus.Finished; //所有子任务执行完毕,设置任务状态为完成}_mongoDbContext.Collection<Task>().ReplaceOne(n => n.Id == taskId, task); //更新任务状态_channel.BasicAck(ea.DeliveryTag, false);};_channel.BasicConsume(queue, false, consumer);
4. (WebApi)撤销任务,更新任务状态即可
var taskId = new ObjectId(id);var task = await _mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefaultAsync();if (task == null){return NotFound(new { message = "任务不存在!" });}task.Status = TaskStatus.Canceled;await _mongoDbContext.Collection<Task>().FindOneAndReplaceAsync(n => n.Id == taskId, task);
效果展示
1. 先使用控制台把消费者启动起来。
2. 创建任务
启动WebApi,创建一个任务,开始时间为2019-07-16T07:55:00.000Z,结束时间为2019-07-16T07:59:00.000Z,执行时间间隔1分钟:
任务与相应的子任务也写入了MongoDB,这里假设子任务可能是邮件发送任务:
创建了一个临时死信队列,队列名称包含任务Id,子任务下标、以及子任务执行时间,并往其写入一条消息:
3. 执行(子)任务
从日志内容可以看出,(子)任务正常执行:
子任务状态也标注为已发送
同时也往消息队列写入了下一个子任务的消息:
4. 撤销任务
任务状态被置为已撤销:
任务没再继续往下执行:
消息队列中的临时队列被删除,消息也被消费完
源码地址
转载于:.html
- componentDidMount,react
- java tooltip
- VMware安装windows server2008R2x64
- 软件设计中的易用性
- Cygwin 与 MinGWMSYSMSYS2,如何选择?
- Win7 64位中MinGW和MSYS的安装
- svnupdate 出现skipped '.' 或skipped '目录名称'
- 回溯法之活动安排问题
- HTK 安装、编译以及测试——Ubuntu 14.04
- 数字芯片设计流程
- 高通Linux Android 平台中的蓝牙功能学习 (4)
- screen工具使用
- 餐厅预订系统有哪些?餐厅预订系统怎么选择?
- 积累的VC编程小技巧之打印相关
- 四川省13家企业荣获第十三届创新中国企业家论坛“创新型企业”奖
- atoi和itoa(头文件stdilb.h)的C实现
- pip升级报错:def read(rel
- shiro反序列化漏洞的原理和复现
- 机器学习中的数据简介
- C语言简单实现通讯录