mq延时队列使用
- mq延时队列使用 推荐度:
- 相关推荐
mq延时队列使用
一、基本配置
- 导入依赖
<!--高级消息队列协议amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- application.yml配置
#rabbitmqrabbitmq:host: 192.168.56.10virtual-host: /port: 5672
- 启动类添加注解
@EnableRabbit
- 配置mq的json序列化
@Configuration
public class RabbitmqConfig {@AutowiredRabbitTemplate rabbitTemplate;@Beanpublic MessageConverter RabbitmqConvertJSON(){return new Jackson2JsonMessageConverter();}}
- 配置交换机、队列、绑定规则
@Beanpublic Exchange StockEventExchange(){//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new TopicExchange("stock-event-exchange",true,false);}@Beanpublic Queue StockReleaseStockQueue(){//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)return new Queue("stock.release.stock.queue",true,false,false);}@Beanpublic Queue StockDelayQueue(){Map<String,Object> map = new HashMap<>();//死信路由exchangemap.put("x-dead-letter-exchange","stock-event-exchange");//死信routing-keymap.put("x-dead-letter-routing-key","stock.release");//time to livemap.put("x-message-ttl",60000);return new Queue("stock.delay.queue",true,false,false,map);}@Beanpublic Binding StockLocked(){//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}@Beanpublic Binding StockRelease(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}
- 建立连接,即可在mq中生成交换机、队列、绑定规则
@RabbitListener(queues = {"stock.release.stock.queue"})public void ListenQueue(Message message, Channel channel){}
二、订单服务使用mq
1.生成队列、交换机、绑定规则
/*** 如果设置错误需要删掉错误的Queue重启服务即可,重启服务不会覆盖原有的Queue*/
@Configuration
public class MyMQConfig {@Beanpublic Queue OrderDelayQueue(){Map<String,Object> map = new HashMap<>();//死信路由map.put("x-dead-letter-exchange","order-event-exchange");//死信map.put("x-dead-letter-routing-key","order.release.order");//time to livemap.put("x-message-ttl",30000);//持久化,排它//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)return new Queue("order.delay.order",true,false,false,map);}@Beanpublic Queue OrderReleaseOrderQueue(){return new Queue("order.release.order.queue",true,false,false,null);}//选用topic类型交换机是因为需要binding多个队列@Beanpublic Exchange OrderEventExchange(){//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new TopicExchange("order-event-exchange",true,false,null);}@Beanpublic Binding OrderCreateOrder(){//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)return new Binding("order.delay.order", Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding OrderReleaseOrder(){return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}
2.生产者发送消息
@ResponseBody@GetMapping("/test/queue")public String testQueue(){OrderEntity orderEntity = new OrderEntity();orderEntity.setOrderSn(UUID.randomUUID().toString());//给队列发消息,指定routing key//convertAndSend(String exchange, String routingKey, Object object)rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);return "给mq发消息完成";}
3.订阅队列
@RabbitListener(queues = {"order.release.order.queue"})public void ListenQueue(Channel channel, Message message, OrderEntity orderEntity) throws IOException {//因为配置了手动ack,所有这里需要签收消息//basicAck(long deliveryTag, boolean multiple)channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("手动过期的订单信息,准备关闭订单:"+orderEntity.getOrderSn());}
三、库存服务使用mq
@Configuration
public class RabbitmqConfig {@AutowiredRabbitTemplate rabbitTemplate;@RabbitListener(queues = {"stock.release.stock.queue"})public void ListenQueue(Message message, Channel channel){}/*** json序列化*/@Beanpublic MessageConverter RabbitmqConvertJSON(){return new Jackson2JsonMessageConverter();}@Beanpublic Exchange StockEventExchange(){//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new TopicExchange("stock-event-exchange",true,false);}@Beanpublic Queue StockReleaseStockQueue(){//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)return new Queue("stock.release.stock.queue",true,false,false);}@Beanpublic Queue StockDelayQueue(){Map<String,Object> map = new HashMap<>();//死信路由exchangemap.put("x-dead-letter-exchange","stock-event-exchange");//死信routing-keymap.put("x-dead-letter-routing-key","stock.release");//time to livemap.put("x-message-ttl",60000);return new Queue("stock.delay.queue",true,false,false,map);}@Beanpublic Binding StockLocked(){//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}@Beanpublic Binding StockRelease(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}}
最新文章
- 谈技术文章翻译的信雅达-上
- linux 下dump文件放在那里,怎么查看dump文件目录
- 职场论
- 扎心了!37岁被裁,好几个月都没有找到工作,面试大公司被婉拒,无奈只能降薪去小公司,没想到还被人嫌弃技术太落后...
- IIS的使用
- principal java
- 芯片的本质是什么?(4)物质与数字世界接口
- win server服务器 关闭危险端口 135,137,138,139,445的方法
- EXT3系统文件.
- void指针(void *)是什么?如何使用它
- SAR成像系列:【7】合成孔径雷达(SAR)成像算法
- 51nod:1079 中国剩余定理(数学)
- Idea 打包JAVA项目
- 常用的Linux终端命令盘点
- const和extern用法
- gcc开启C99或C11标准支持
- Shiro详解