【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神RabbitMQ 基于 AMQP 协议 AdvancedMess 高级消息队列协议 是一个网络协议 是应用层协议的一个开放标准 为面向消息中间件设计的

大家好,欢迎来到IT知识分享网。

目录

分布式系统通信方式

MQ选型与应用场景

应用场景(优势)

RabbitMQ工作模型

RabbitMQ简介

RabbitMQ 工作模型(流程)​编辑

Docker安装配置RabbitMQ

RabbitMQ管理控制台

RabbitMQ 简单模式构建生产者

RabbitMQ 简单模式构建消费者

 RabbitMQ工作模式 – WorkQueues

RabbitMQ工作模式 – 发布订阅

RabbitMQ工作模式 – 路由模式

RabbitMQ工作模式 – 通配符模式

RabbitMQ集成SpringBoot(上) – 异步解耦发送短信

RabbitMQ集成SpringBoot(下) – 监听消费短信发送

消息的可靠性投递Confirm机制

 消息的可靠性投递Return机制

消费端可靠性ACK机制

RabbitMQ 消费者消息限流

RabbitMQ ttl特性控制短信队列超时

1. 对整个队列设置ttl:

2. 对单独一个消息进行ttl设置

RabbitMQ 死信队列的实现

代码实现

本章小结

作业


MQ:Message Queue(消息队列),是在消息传输的过程中,把消息保存到的一个容器。消息可以是我们产生的一些信息数据,一般都都是传字符串或者json字符串。消息队列主要用于一些分布式系统或者微服务系统之间的通信

分布式系统通信方式

  • 远程调用:同步
  • 借助中间件进行通信:异步,中间件有mq,zookeeper,redis

MQ选型与应用场景

  • RabbitMQ:erlang高并发能力不错,springboot推荐集成,社区活跃资料多。并发能力和性能都不错。
  • ActiveMQ:apache出品,目前使用量不多,不如以前,性能也不如其他。
  • RocketMQ:阿里出品,吞吐量很高,是Rabbit的10倍,十万左右。功能很全,扩展性也很好。金融类项目优先推荐使用。
  • Kafka:apache出品,侧重大数据领域用的很多,吞吐量10万级。只支持主要的一些MQ功能。
  • ZeroMQ,MetaMQ,RedisMQ
    简单来说,RabbitMQ的综合能力比较好。中小型公司用的也比较多,因为足够满足自身业务,所以后期一旦公司发展可以考虑转型使用RocketMQ,因为rocket是java开发的,可以自己去重构,而rabbit是erlang语言,极不容易修改。未来有机会可以再开Rocket来聊一聊

应用场景(优势)

  • 异步任务
    • 一些耗时很长的操作本身是同步执行的,可以借助mq通知消费者去处理,这个过程是异步的。从而减少了前端用户的等等的响应时间。
    • 比如云端操作重装系统,可能需要1分钟左右的时间,用户不可能在页面上等着吧。那么生产者可以向消费者发出一个重装的通知,让其进行重装操作。前端用户可以继续做别的操作,等重装完毕以后,再通知用户即可。因为这种操作不需要及时性,延后通知即可。这也是暂时性的不一致,MQ是最终一致性。
  • 提速
    • 本来前端用户都要等着处理完毕的结果响应,现在异步可以直接返回接口,减少了等待的时间,如此一来大大提高了前端用户的等待时间,用户体验更高了。也就是说当前的请求接口的吞吐量就大大提高了。
  • 接口解耦
    • MQ就相当于是工厂创建的微信群,把批发商拉进群,让他们进行监听,起到了接口之间解耦的作用。同时也是一种广播机制。在我们的系统中,如果一个接口里要调用多个远程服务,比如下单的同时需要调用库存、支付、物流、日志、积分等多个服务,那么如果是同步进行,一旦一个环节出了问题,那么整个接口就崩了,如此整个系统的容错性太低了; 如果使用mq解耦,那么哪怕要坏也是只坏一个环节,大大降低了发生问题的风险。
  • 削峰填谷
    • 这个是属于高并发的场景,举个例子,工厂有10万件衣服,需要快速清仓,现在所有的批发商的清货能力只能在5万件左右,而且也没有那么多钱进货。所以通过MQ这个中介,工厂把衣服都放入中介,批发商慢慢的把衣服卖出去以后再把后续的5万件衣服进货不就行了?这就是瞬时高并发所遇到的情况,就比如秒杀,服务器里redis啊数据库等处理能力不高,流量太大,那我们把请求放入到mq,如此一来,后续的数据慢慢处理就行了。这也就和餐厅吃饭在外面排队等位是一个道理。处理不来,就慢慢排队等着。【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神
  • 使用MQ之前,高并发的时候,瞬时请求量很大,会达到5000以上,这个时候的当前时间就是高峰,服务器压力不行势必会打死。
  • 使用MQ之后,限制速度为1000个并发,哪怕5000甚至上万的请求进来,也会被限速,那么这就是削峰,消息的消费平均速度会维持在1000左右,哪怕高峰期过来,速度也是1000左右,那么这就是填谷
  • 举个例子,饭店吃饭高峰期,人流量很多,这个时候不可能直接进去吃饭把,餐桌就那么点,客户太多了,没办法,只能取号排队把,深有体会,排队的过程就是慢慢的消费,削峰填谷

RabbitMQ工作模型

RabbitMQ简介

RabbitMQ基于AMQP协议,Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息中间件设计的。基于这个协议的客户端和消息中间件可以传递消息,并且不会因为客户端或中间件产品的不同所首先,也不会受到不同开发语言的限制。比如用java、php、.net都可以使用消息队列,并且只要支持AMQP这个协议,不论什么客户端,都可以相互传输消息。而且甚至与你只要遵循amqp协议,自己也能开发一套消息队列产品也没问题。

RabbitMQ就是基于amqp所开发的一套产品,只是他是基于erlang语言开发的。它是一种应用于应用之间的通信方法,MQ在分布式系统中应用非常广泛。

官网地址:RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ 工作模型(流程)【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

  • RabbitMQ:消息队列服务中间件
  • 生产者produce:创建消息,发送到mq
  • 消费者Consumer:监听消息,处理业务通知
  • Exchange:MQ的交换机,他会按照一定的规则来转发到某个队列,类似controller路由机制。比如 /passport/sms。也就是分发网络请求的功能,和我们现实生活中的交换机是一个意思。
  • Queue:队列,存储消息。相当于controller。被消费者监听,一旦有消息,会被消费者接收到。
  • Binding-Routes:交换机和队列的绑定关系,通过路由结合在一起,消息如何通过交换机发送到队列里,是通过路由机制,类似于@RequestMapping,路由规则一旦匹配,那么就可以存储对应的消息。
  • Channel:生产者和消费者与MQ建立的通道,相当于httpSession会话,建立请求就会有channel。可以理解为一个桥梁作用,消息经过桥梁到达mq的queue。Channel的目的是为了管理每次请求到RabbitMQ-server的连接connection,如此才能更好的节约资源的开支,提高通信效率。

Docker安装配置RabbitMQ

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

运行mq

docker run --name rabbitmq \ -p 5671:5671 \ -p 5672:5672 \ -p 4369:4369 \ -p 15671:15671 \ -p 15672:15672 \ -p 25672:25672 \ --restart always \ -d rabbitmq:management 

如果忘记加上自动重启,可以运行如下脚本:

docker update rabbitmq –restart=always

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

运行成功

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

查看具体版本:

docker image inspect rabbitmq:management|grep -i version

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

访问管理界面:

​​​​​​http://192.168.1.112:15672/【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神​​​​​​http://192.168.1.112:15672/

RabbitMQ管理控制台

RabbitMQ的管理控制台界面相当友好,可视化程度很不错。

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

概览信息:

  • connections:客户端的连接
  • channels:没有connections就没有channels
  • exchanges:交换机,内部有默认的定义好的名字
  • queues:队列,可以通过交换机点击绑定的进入
  • admin:管理设置,可以创建账号以及分区
  • virtul host 可以理解为分区,不同项目使用

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

RabbitMQ 简单模式构建生产者

本节课开始我们将会使用java代码来构建rabbitmq的各种通信,首先我们来学习的是构建消息的生产者。

RabbitMQ Tutorials | RabbitMQ

RabbitMQ tutorial – “Hello World!” | RabbitMQ

简单模式:通过生产者发消息给队列,消费者监听收到消息

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

为了更加清晰的看到生产者与消费者,我们可以创建两个子模块工程:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

分别添加依赖坐标:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency> 

创建生产者:​​​​​​​【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

/ * 构建生产者,发送消息 */ public class FooProducer { public static void main(String[] args) throws Exception { // 1. 创建连接工厂以及参数配置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.122"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("imooc"); factory.setPassword("imooc"); // 2. 创建连接Connection Connection connection = factory.newConnection(); // 3. 创建管道Channel Channel channel = connection.createChannel(); // 4. 创建队列Queue(简单模式不需要交换机Exchange) / * queue: 队列名 * durable: 是否持久化,true:重启后,队列依然存在,否则不存在 * exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false * autoDelete: 是否自动删除,true:当没有消费者的时候,自动删除队列 * arguments: map类型其他参数 */ channel.queueDeclare("hello", true, false, false, null); // 5. 向队列发送消息 / * exchange: 交换机名称,简单模式没有,直接设置为 "" * routingKey: 路由key,映射路径,如果交换机是默认"",则路由key和队列名一致 * props: 配置信息 * body: 消息数据 */ String msg = "Hello ~"; channel.basicPublish("", "hello", null, msg.getBytes()); // 7. 释放 channel.close(); connection.close(); } } 

  运行后查看控制台:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

RabbitMQ 简单模式构建消费者

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

/ * 构建消费者,监听消息 */ public class FooConsumer { public static void main(String[] args) throws Exception { // 1. 创建连接工厂以及参数配置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.122"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("imooc"); factory.setPassword("imooc"); // 2. 创建连接Connection Connection connection = factory.newConnection(); // 3. 创建管道Channel Channel channel = connection.createChannel(); // 4. 创建队列Queue(简单模式不需要交换机Exchange) / * queue: 队列名 * durable: 是否持久化,true:重启后,队列依然存在,否则不存在 * exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false * autoDelete: 是否自动删除,true:当没有消费者的时候,自动删除队列 * arguments: map类型其他参数 */ channel.queueDeclare("hello", true, false, false, null); // 5. 监听并消费消息 / * queue: 监听的队列名 * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知 * callback: 回调函数,处理监听的消息 */ Consumer consumer = new DefaultConsumer(channel) { / * 重写消息配送方法 * @param consumerTag: 消息标签(标识) * @param envelope: 信封(一些信息,比如交换机路由等信息) * @param properties: 配置信息,和生产者的一致 * @param body: 消息数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag = " + consumerTag); System.out.println("envelope = " + envelope.toString()); System.out.println("properties = " + properties.toString()); System.out.println("body = " + new String(body)); super.handleDelivery(consumerTag, envelope, properties, body); } }; channel.basicConsume("hello", true, consumer); // 不需要关闭连接,则持续监听 } } 

监听结果:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

 RabbitMQ工作模式 – WorkQueues

   【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

工作队列没有交换机,生产者发送消息给队列,队列有两个或者多个消费者监听进行消费,但是所有的消息是会被消费者以竞争的关系进行消费,所以队列里的消息也称之为工作的任务,任务由一个人完成了就不需要被第二个人完成。所以这个队列里的所有消息只会被某一个消费者进行消费读取。

  • 使用场景:如果任务量很大很多,而一个消费者处理不过来,则此时可以使用工作队列,比如短信发送在一个系统里会有很多场景进行发送给用户,所以量很大的时候,也可以分配给多个消费者去进行消费发短信即可。这就像上班工作量很大,就需要招人共同完成一些任务是一个道理。
...... channel.queueDeclare("work_queue", true, false, false, null); Integer tasks[] = {}; for (int i = 0 ; i < 10 ; i ++) { String msg = "开始上班,任务[" + i + "]"; channel.basicPublish("", "work_queue", null, msg.getBytes()); } ...... 
... channel.queueDeclare("work_queue", true, false, false, null); ... channel.basicConsume("work_queue", true, consumer); ... 

从运行结果可以开得出来,这也是类似于负载均衡的轮询效果。

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

RabbitMQ工作模式 – 发布订阅

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

  • Fanout:广播模式,把消息发送给所有绑定的队列
  • Direct:定向投递,把消息发送给指定的routing key的队列
  • Topic:通配符模式,把消息交给符合routing pattern的队列
    需要注意,交换机只负责转发消息,不会存储消息,存储消息的职责是队列的,不要搞混噢。

生产者代码:

...... // 创建交换机 / * exchange: 交换机名称 * type: 交换机类型 * FANOUT("fanout"): 广播模式,把消息发送给所有绑定的队列 * DIRECT("direct"): 定向投递,把消息发送给指定的`routing key`的队列 * TOPIC("topic"): 通配符模式,把消息交给符合`routing pattern`的队列 * HEADERS("headers"): 使用率不多,参数匹配 * durable: 是否持久化 * autoDelete: 是否自动删除 * internal: 内部意思,true:表示当前Exchange是RabbitMQ内部使用,用户创建的队列不会消费该类型交换机下的消息,所以我们自己使用设置为false即可 * arguments: 参数 */ String exchange = "fanout_exchange"; channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null); // 创建两个队列 String fanout_queue_a = "fanout_queue_a"; String fanout_queue_b = "fanout_queue_b"; channel.queueDeclare("fanout_queue_a", true, false, false, null); channel.queueDeclare("fanout_queue_b", true, false, false, null); // 绑定交换机和队列 / * queue * exchange * routingKey: 路由key,绑定规则,这里暂不使用(fanout本身广播给所有订阅者,所以没有路由规则,使用空字符串即可) */ channel.queueBind(fanout_queue_a, exchange, ""); channel.queueBind(fanout_queue_b, exchange, ""); for (int i = 0 ; i < 10 ; i ++) { String msg = "开始上班,任务[" + i + "]"; channel.basicPublish(exchange, "", null, msg.getBytes()); } channel.close(); connection.close(); 

运行查看:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

...... String fanout_queue_a = "fanout_queue_a"; channel.queueDeclare(fanout_queue_a, true, false, false, null); ...... channel.basicConsume(fanout_queue_a, true, consumer); ...... 

运行结果:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

RabbitMQ工作模式 – 路由模式

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神路由模式routing可以针对不同的类别进行路由,比如图中,可以控制不同的日志级别进行路由,这就相当于控制器的@RequestMapping,请求的url地址根据不同的映射路径进行controller接口的调用,原理一致。

... String exchange = "routing_exchange"; channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, true, false, false, null); // 创建两个队列 String routing_queue_order = "routing_queue_order"; String routing_queue_pay = "routing_queue_pay"; channel.queueDeclare(routing_queue_order, true, false, false, null); channel.queueDeclare(routing_queue_pay, true, false, false, null); ... // 订单的创建/更新/删除都走订单队列进行消费;订单的支付走独立的队列进行消费 channel.queueBind(routing_queue_order, exchange, "order_create"); channel.queueBind(routing_queue_order, exchange, "order_delete"); channel.queueBind(routing_queue_order, exchange, "order_update"); channel.queueBind(routing_queue_pay, exchange, "order_pay"); // 根据不同的路由key进行消息的发送 String msg1 = "创建订单A"; String msg2 = "创建订单B"; String msg3 = "删除订单C"; String msg4 = "修改订单D"; String msg5 = "支付订单E"; String msg6 = "支付订单F"; channel.basicPublish(exchange, "order_create", null, msg1.getBytes()); channel.basicPublish(exchange, "order_create", null, msg2.getBytes()); channel.basicPublish(exchange, "order_delete", null, msg3.getBytes()); channel.basicPublish(exchange, "order_update", null, msg4.getBytes()); channel.basicPublish(exchange, "order_pay", null, msg5.getBytes()); channel.basicPublish(exchange, "order_pay", null, msg6.getBytes()); ... 

运行生产者,查看:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

​​​​​​​【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

... String routing_queue_order = "routing_queue_order"; channel.queueDeclare(routing_queue_order, true, false, false, null); ... channel.basicConsume(routing_queue_order, true, consumer); ... 

  运行结果:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

RabbitMQ工作模式 – 通配符模式

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

通配符可以有两种形式:

  • *:匹配一个字段
    • 比如:order.*.* 可以匹配 order.do.delete,order.create.finish
  • #:匹配0个或者多个字段
    • 比如 #.order.# 可以匹配 order.do.delete,check.order.if.finish

生产者代码:

... String exchange = "topics_exchange"; channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true, false, false, null); // 创建两个队列 String topics_queue_order = "topics_queue_order"; String topics_queue_pay = "topics_queue_pay"; channel.queueDeclare(topics_queue_order, true, false, false, null); channel.queueDeclare(topics_queue_pay, true, false, false, null); ... channel.queueBind(topics_queue_order, exchange, "order.*"); channel.queueBind(topics_queue_pay, exchange, "#.pay.#"); // 根据不同的路由key进行消息的发送 String msg1 = "创建订单A"; String msg2 = "创建订单B"; String msg3 = "删除订单C"; String msg4 = "修改订单D"; String msg5 = "支付订单E"; String msg6 = "支付订单F"; channel.basicPublish(exchange, "order.create", null, msg1.getBytes()); channel.basicPublish(exchange, "order.create", null, msg2.getBytes()); channel.basicPublish(exchange, "order.delete", null, msg3.getBytes()); channel.basicPublish(exchange, "order.update", null, msg4.getBytes()); channel.basicPublish(exchange, "order.pay", null, msg5.getBytes()); channel.basicPublish(exchange, "imooc.order.pay.display", null, "慕课网订单支付".getBytes()); channel.basicPublish(exchange, "supermarket.pay", null, "超市支付".getBytes()); ... 

运行后结果

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

... String topics_queue_pay = "topics_queue_pay"; channel.queueDeclare(topics_queue_pay, true, false, false, null); ... channel.basicConsume(topics_queue_pay, true, consumer); ... 

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

RabbitMQ集成SpringBoot(上) – 异步解耦发送短信

<!-- SpringBoot 整合RabbitMQ 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 

在api子工程中创建短信发送的短信配置类:

/ * RabbitMQ 的配置类 */ @Configuration public class RabbitMQSMSConfig { // 定义交换机的名字 public static final String SMS_EXCHANGE = "sms_exchange"; // 定义队列的名字 public static final String SMS_QUEUE = "sms_queue"; // 创建交换机 @Bean(SMS_EXCHANGE) public Exchange exchange() { return ExchangeBuilder.topicExchange(SMS_EXCHANGE).durable(true).build(); } // 创建队列 @Bean(SMS_QUEUE) public Queue queue() { // return new Queue(SMS_QUEUE); return QueueBuilder.durable(SMS_QUEUE).build(); } // 定义队列绑定到交换机的关系 @Bean public Binding smsBinding( @Qualifier(SMS_EXCHANGE) Exchange exchange, @Qualifier(SMS_QUEUE) Queue queue ) { return BindingBuilder.bind(queue).to(exchange).with("imooc.sms.#").noargs(); // 执行绑定 } } 

异步解耦短信发送:

spring: rabbitmq: host: 192.168.1.122 port: 5672 username: rabbitmq password: rabbitmq virtual-host: imooc-space 
@Autowired private RabbitTemplate rabbitTemplate; ... // 把短信内容和手机号构建为一个bean并且转换为json作为消息发送给mq rabbitTemplate.convertAndSend( RabbitMQSMSConfig.SMS_EXCHANGE, "imooc.sms.login.send", bodyJson); ... 

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

RabbitMQ集成SpringBoot(下) – 监听消费短信发送

创建一个资源服务service-resource-4001,目前用于消费者监听

@Component @Slf4j public class RabbitMQSMSConsumer { / * 监听队列,并且处理消息 */ @RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE}) public void watchQueue(String payload, Message message) { log.info("payload = " + payload); String routingKey = message.getMessageProperties().getReceivedRoutingKey(); log.info("routingKey = " + routingKey); String msg = payload; SMSContentQO smsContent = GsonUtils.stringToBean(msg, SMSContentQO.class); // 发送短信 TODO } } 

最终运行测试即可,实现短信发送的异步解耦。

消息的可靠性投递Confirm机制

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

RabbitMQ有两种方式来控制消息可靠性。

  • confirm 确认模式
    • 消息一旦从生产者发给MQ server,也就是被交换机接收到,这个时候会有一个回调 confirmCallback,代表mq服务接收到了。但是这个消息不管是否成功,这个回调函数一定会执行
  • return 回退模式
    • 交换机把消息路由到队列的时候如果出现问题,则触发回调 returnCallback,只有失败才会执行这个函数。

上代码:

配置confirm类型:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

  • NONE: 禁用发布确认模式,是默认的
  • CORRELATED: 发布消息成功到交换器后会触发回调方法
  • SIMPLE: 使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法

定义confirmCallback回调函数:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

运行测试,观察结果。

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

 消息的可靠性投递Return机制

配置return开启:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

定义returnCallback回调函数:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

  • message: 消息数据
  • replyCode: 错误code
  • replyText: 错误信息
  • exchange: 交换机
  • routingKey: 发送的路由地址

这就相当于发邮件的时候,如果你发的地址不存在有问题,对方接收不到,则你会收到一个退回邮件,机制是差不多的。

如此一来,我们就可以通过confirm和return两种机制来记录这个过程的日志,如果错误日志出现很频繁说明咱们的mq可能就不太稳定或者有什么其他因素需要排查了。

消费端可靠性ACK机制

前面两节所用的confirm和return都是生产端机制,此外还有ack模式,是基于消费端的可靠性。

ack机制有三种确认方式:

  • 自动确认: Acknowledge=none(默认)
    • 消费者收到消息,则会自动给broker一个回执,表示消息收到了,但是消费者里的监听方法内部是我们自己的业务,业务是否成功,他不管的,如果业务出现问题出现异常,那么也就相当于这条消息是失败的。
    • 这就相当于我寄了一个快递,对方收到后快递公司就自动确认了快递的签收确认,这是很常见的手段吧。一旦快递内部是否破损我们就不知道了,对吧。
  • 手动确认:Acknowledge=manual
    • 消费端收到消息后,不会通知broker回执,需要等待我们自己的业务处理完毕OK了没问题,然后手动写一条代码去确认,当然如果出现错误异常,我们可以有兜底的处理方法,比如记录日志或者重新发新的消息都行。
  • 根据异常类型确定:Acknowledge=auto
    • 消费端处理业务出现不同的异常,根据异常的类型来做出不同的响应处理,这种方式比较麻烦,需要提前预判很多异常类型。这里了解一下有这个类型即可。

代码中设置手动签收

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

创建新的方法,加入参数channel

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

运行后发送消息查看结果:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

发现有一条消息是未被确认的,因为我们没有手动确认,如果是默认的确认方式,这个时候是不会有unacked的。

添加除零错误

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

手动确认和消息重发

@RabbitListener(queues = {RabbitMQSMSConfig.SMS_QUEUE}) public void watchQueue(Message message, Channel channel) throws Exception { try { String routingKey = message.getMessageProperties().getReceivedRoutingKey(); log.info("routingKey = " + routingKey); int a = 100 / 0; String msg = new String(message.getBody()); log.info("msg = " + msg); / * deliveryTag: 消息投递标签 * multiple: 批量确认所有消费者获得的消息 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } catch (Exception e) { e.printStackTrace(); / * requeue: true:重新回到队列, false:丢弃消息 */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); // 不支持requeue // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } 

RabbitMQ 消费者消息限流

我们在一开始介绍MQ的时候,就提到了削峰填谷,本质上就是限流,所以我们需要对限流做一个落地的实现。那么现在提出一个需求,假设用户发短信一下子太多了,那么消费者在进行消费处理业务的时候,也是需要进行限流的。

限流主要在消费者这一块,基于消费者做代码实现。并且基于手动ack的开启。

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

prefetch的设置,其实也是类似于负载均衡,为了更有效的利用服务器资源。也可以提高消费者的吞吐量。

生产者加一个for循环模拟多条消息的发送。

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

消费者手动ack之前打一个断点,

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

每次都是未确认2个,消息会以此从mq中拉取。

把prefetch注释掉再测试:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

断点再一次进来,可以看到,消费者是一次性拉取了10条消息,并没有做到限流。

所以,通过prefetch可以达到消费端控制消费速度的效果。

RabbitMQ ttl特性控制短信队列超时

假设现在短信发送量很多,消息太多太多了,可能处理不过来,那么假设短信验证码本身就是5分钟内失效的,但是5分钟过后还没有发出去,mq消息还是在队列中没有被消费者消费,那么这条消息其实是作废的,没有用的。而且本身用户已经等了那么久了都没收到,所以我们能不能索性设定一个时间为60s,60秒还没有消费消息,那么就不消费了呗,让用户再次发送一个请求完事。

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

此时我们可以使用ttl这个特性。

  • TTL:time to live 存活时间,和redis的ttl是一个道理
  • 如果消息到到ttl的时候,还没有被消费,则自动清除
  • RabbitMQ可以对消息或者整个队列设置ttl

代码实现,生产端配置超时ttl时间(单位:毫秒)

1. 对整个队列设置ttl:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

把消费者的监听去除,不要进行消费:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'sms_queue' in vhost '/': received the value '10000' of type 'signedint' but current is none, 

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

但是10秒过后,没有消费超时了,则自动丢弃

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

2. 对单独一个消息进行ttl设置

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

10秒过后,消息过期了抛弃,如下:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

注意:如果同时设置了两种消息过期方式和的话,时间短的会优先触发。

RabbitMQ 死信队列的实现

前面讲了ttl,那么在这里就必须来提一嘴死信队列

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

  • 死信队列:DLX,dead letter exchange。
  • 当一个消息了以后,就会被发送到死信交换机DLX

前面我们使用了ttl超时机制,如果我们队列绑定了DLX死信交换机,那么超时后,消息不会被抛弃,而是会进入到死信交换机,死信交换机绑定了其他队列(称之为死信队列),那么这个时候我们就可以处理那些被抛弃的消息了。

此外除了ttl超时进入死信队列,还有两种情况也会进入到死信队列:

  • 队列消息长度已经到达限制,我一个队列设置最多1000条消息的量,后续进入的消息就抛弃了,就像我只能是10碗饭,你再盛饭给我吃就吃不下了,只能扔了。
  • 消费者手动驳回消息,nack或者reject,并且requeue为false,就像我吃饭的时候,一不小心里面进了一只苍蝇,我不会把这碗饭重新放入饭盆里吧,只能扔了。

代码实现

配置死信交换机和队列:(步骤和之前创建队列交换机以及绑定关系是一致的)

/ * RabbitMQ 的配置类 死信队列 */ @Configuration public class RabbitMQSMSConfig_Dead { // 定义交换机的名称 public static final String SMS_EXCHANGE_DEAD = "sms_exchange_dead"; // 定义队列的名称 public static final String SMS_QUEUE_DEAD = "sms_queue_dead"; // 统一定义路由key public static final String ROUTING_KEY_SMS_DEAD = "dead.sms.display"; // 创建交换机 @Bean(SMS_EXCHANGE_DEAD) public Exchange exchange() { return ExchangeBuilder .topicExchange(SMS_EXCHANGE_DEAD) .durable(true) .build(); } // 创建队列 @Bean(SMS_QUEUE_DEAD) public Queue queue() { // return new Queue(SMS_QUEUE); return QueueBuilder .durable(SMS_QUEUE_DEAD) .build(); } // 创建绑定关系 @Bean public Binding smsDeadBinding(@Qualifier(SMS_EXCHANGE_DEAD) Exchange exchange, @Qualifier(SMS_QUEUE_DEAD) Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("dead.sms.*") .noargs(); } } 

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

死信队列监听的消费者:

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

10秒种后剩余6条消息没有被消费则进入死信队列

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

ttl+死信队列可以实现延迟队列,但是后期我们会使用延迟插件安装来实现延迟队列,更加方便。

手动nack以及reject课后去尝试一下。

本章小结

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

【MQ】大白话告诉你什么是MQ,没有比这还详细还易懂的文章了吧,以RabbitMQ为例,从小白到大神

我们为什么会选择使用mq呢,因为在一个大型项目里,涉及发短信的地方太多了,如果每个地方都这么用异步任务来一下,会显得有些乱,而且每个项目里都有,使用mq以后,可以在一个单独的项目中,专门负责用于进行短信发送的消费,这样就更加解耦了,更加体现单一职责了。如此一来,mq可以更好的管理消费者,或者说他有更细的细粒度。甚至说消费者服务可以有很多,我们还能创建一个子嵌套聚合工程进行消费者管理。

异步任务 放入到api里,两点不好,一个是大量业务在api共用工程,不太好,第二个,本质上项目没有做到解耦,打包还是在一起的,虽然业务异步,但是代码并未解耦。而mq可以用一个专有的短信监听服务,专门异步发送所有短信场景,这才是比较 ok的。

作业

手动ack重回队列发生死循环的时候,如何设定参数进行手动上报预警。

  • 需要使用到Nacos分布式配置,redis计数,短信或邮箱发送

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/111312.html

(0)
上一篇 2026-01-25 17:45
下一篇 2026-01-25 18:10

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信