RabbitMQ死信队列详解

RabbitMQ死信队列详解分别从 TTL 过期 消息积压 消息被拒三个角度来介绍死信队列 死信队列

大家好,欢迎来到IT知识分享网。

什么是死信队列

由于特定的原因导致 Queue 中的某些消息无法被消费,这类消费异常的数据将会保存在死信队列中防止消息丢失,例如用户在商城下单成功并点击支付后,在指定时间未支付时的订单自动失效
死信队列只不过是绑定在死信交换机上的队列。死信交换机只不过是用来接受死信的交换机,可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。


来源

  1. 消息的存活时间到了,ttl过期
  2. 队列积压的消息达到最大长度(在队列中等待时间最久的消息会成为死信)
  3. 消息被拒(消费方返回nack进行否定应答)且不重新加入队列(requeue=false)

演示

架构图

死信队列案例示意图
死信队列标记


TTL过期

  1. 在生产者方进行指定为当前发送消息的过期时间,缺点是消息即使过期也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的如果当前队列有严重的消息积压情况,已过期的消息依旧会被积压在队列中,如果队列配置了消息积压上限,将导致后续应当正常消费的消息全部进入死信队列 )
  2. 在队列指定为所有到达该队列的消息的过期时间,时间从消息入队列开始计算,只要超过了队列的超时时间配置,消息会自动清除
package com.example.rabbitmoslimiting.demos; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitUtils { 
     private static final ConnectionFactory connectionFactory; //放到静态代码块中,在类加载时执行,只执行一次。达到工厂只创建一次,每次获取是新连接的效果 static { 
     //创建连接工厂 connectionFactory = new ConnectionFactory(); connectionFactory.setHost("101.133.141.74"); //设置MQ的主机地址 connectionFactory.setPort(5672); //设置MQ服务端口 connectionFactory.setVirtualHost("study"); //设置Virtual Hosts(虚拟主机) connectionFactory.setUsername("guest"); //设置MQ管理人的用户名(要在Web版先配置,保证该用户可以管理设置的虚拟主机) connectionFactory.setPassword("guest"); //设置MQ管理人的密码 } //定义提供连接对象的方法,封装 public static Connection getConnection(){ 
     try { 
     //创建连接对象并返回 return connectionFactory.newConnection(); } catch (Exception e) { 
     e.printStackTrace(); } return null; } //关闭通道和关闭连接工具类的方法 public static void closeConnectionAndChannel(Channel channel, Connection connection){ 
     try { 
     if (channel!=null) { 
     channel.close(); } if (connection!=null){ 
     connection.close(); } } catch (Exception e) { 
     e.printStackTrace(); } } } 

生产者:

package com.dmbjz.one; import com.dmbjz.utils.RabbitUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.nio.charset.StandardCharsets; /* 死信队列TTL案例 生产者 */ public class Provider { 
     private static final String EXCHANGE_NAME = "normal_exchange"; //正常交换机名称 private static final String KEY = "zhangsan"; //普通队列 RoutingKey public static void main(String[] args) throws Exception { 
     Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //声明交换机 /*死信队列 设置TTL消息过期时间 单位毫秒*/ AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder() .expiration("20000") .build(); /*模拟消息循环发送*/ for(int i = 1; i < 11; i++) { 
     String message = "INFO " + i; channel.basicPublish(EXCHANGE_NAME,KEY,properties,message.getBytes(StandardCharsets.UTF_8)); } } } 

消费者C1:

package com.dmbjz.one; import com.dmbjz.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; /* 死信队列TTL案例 消费者C1 */ public class ConsumerC1 { 
     private static final String EXCHANGE_NAME = "normal_exchange"; //正常交换机名称 private static final String DEAD_EXCHANGE_NAME = "dead_exchange"; //死信队列交换机名称 private static final String KEY = "zhangsan"; //普通队列 RoutingKey private static final String DEAD_KEY = "lisi"; //死信队列 RoutingKey private static final String QUEUE_NAME = "normal-queue"; //普通队列名称 private static final String DEAD_QUEUE_NAME = "dead-queue"; //死信队列名称 public static void main(String[] args) throws IOException { 
     Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); /*声明死信和普通交换机,正常交换机已被生产者声明,实际可以省略第一行代码*/ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); /*创建队列 * 通过额外参数实现什么情况下转发到死信队列 ?,key都是固定的 * 1、TTL过期时间设置(一般由生产者指定) * 2、死信交换机的名称 * 3、死信交换机的RoutingKey * */ Map<String,Object> arguments = new HashMap<>(8); arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); //死信交换机的名称 arguments.put("x-dead-letter-routing-key",DEAD_KEY); //死信交换机的RoutingKey // arguments.put("x-dead-letter-ttl",10000); 指定消息的有效时间为20秒(一般为生产者指定) channel.queueDeclare(QUEUE_NAME,false,false,false,arguments); channel.queueDeclare(DEAD_QUEUE_NAME,false,false,false,null); /*绑定队列*/ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,KEY); channel.queueBind(DEAD_QUEUE_NAME,DEAD_EXCHANGE_NAME,DEAD_KEY); DeliverCallback successBack = (consumerTag, message) -> { 
     System.out.println("C1用户接收到的信息为:"+new String(message.getBody())); }; CancelCallback cnaelBack = a->{ 
     System.out.println("C1用户进行取消消费操作!"); }; channel.basicConsume(QUEUE_NAME,true,successBack,cnaelBack); } } 

消费者C2:

package com.dmbjz.one; import com.dmbjz.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /* 死信队列TTL案例 消费者C2 */ public class ConsumerC2 { 
     private static final String DEAD_EXCHANGE_NAME = "dead_exchange"; //死信队列交换机名称 private static final String DEAD_KEY = "lisi"; //死信队列 RoutingKey private static final String DEAD_QUEUE_NAME = "dead-queue"; //死信队列名称 public static void main(String[] args) throws IOException { 
     Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); /*声明队列和普通交换机并进行绑定,由于消费者C1已经声明过了,这里实际可以省略这三行代码*/ channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE_NAME,false,false,false,null); channel.queueBind(DEAD_QUEUE_NAME,DEAD_EXCHANGE_NAME,DEAD_KEY); DeliverCallback successBack = (consumerTag, message) -> { 
     System.out.println("C2用户接收到的信息为:"+new String(message.getBody())); }; CancelCallback cnaelBack = a->{ 
     System.out.println("C2用户进行取消消费操作!"); }; channel.basicConsume(DEAD_QUEUE_NAME,true,successBack,cnaelBack); } } 

效果演示:
  1. 执行消费者C1,创建出所有交换机和队列绑定后停止运行。
  2. 执行生产者,等待10秒钟后查看控制台,消息全部通过交换机进入死信队列
  3. 运行消费者C2,死信队列消息被成功消费

20秒后消息全部进入死信队列
运行消费者C2,死信队列内的消息被全部取出


队列消息积压达到最大长度

在绑定死信队列的消费者端添加队列的消息积压长度限制即可,核心代码为TTL案例的消费者C1基础上再添加Map参数

//指定队列能够积压消息的大小,超出该范围的消息将进入死信队列 arguments.put("x-max-length",6); 

生产者:

package com.dmbjz.maxlength; import com.dmbjz.utils.RabbitUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.nio.charset.StandardCharsets; /* 死信队列 队列达到最大长度案例 生产者 */ public class Provider { 
      private static final String EXCHANGE_NAME = "normal_exchange"; //正常交换机名称 private static final String KEY = "zhangsan"; //普通队列 RoutingKey public static void main(String[] args) throws Exception { 
      Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //声明交换机 /*循环消息发送*/ for(int i = 1; i < 11; i++) { 
      String message = "INFO " + i; channel.basicPublish(EXCHANGE_NAME,KEY,null,message.getBytes(StandardCharsets.UTF_8)); //发送超级VIP消息 } } } 

消费者C1:
消费者C2的代码与TTL案例中保持一致

package com.dmbjz.maxlength; import com.dmbjz.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; /* 死信队列 队列达到最大长度案例 消费者C1 */ public class ConsumerC1 { 
      private static final String EXCHANGE_NAME = "normal_exchange"; //正常交换机名称 private static final String DEAD_EXCHANGE_NAME = "dead_exchange"; //死信队列交换机名称 private static final String KEY = "zhangsan"; //普通队列 RoutingKey private static final String DEAD_KEY = "lisi"; //死信队列 RoutingKey private static final String QUEUE_NAME = "normal-queue"; //普通队列名称 private static final String DEAD_QUEUE_NAME = "dead-queue"; //死信队列名称 public static void main(String[] args) throws IOException { 
      Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); /*声明死信和普通交换机,正常交换机已被生产者声明,实际可以省略第一行代码*/ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); /*创建队列 * 通过额外参数实现什么情况下转发到死信队列 ?,key都是固定的 * 1、TTL过期时间设置(一般由生产者指定) * 2、死信交换机的名称 * 3、死信交换机的RoutingKey * */ Map<String,Object> arguments = new HashMap<>(8); arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); //死信交换机的名称 arguments.put("x-dead-letter-routing-key",DEAD_KEY); //死信交换机的RoutingKey arguments.put("x-max-length",6); //指定正常队列的长度,超出该范围的消息将进入死信队列 channel.queueDeclare(QUEUE_NAME,false,false,false,arguments); channel.queueDeclare(DEAD_QUEUE_NAME,false,false,false,null); /*绑定队列*/ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,KEY); channel.queueBind(DEAD_QUEUE_NAME,DEAD_EXCHANGE_NAME,DEAD_KEY); DeliverCallback successBack = (consumerTag, message) -> { 
      System.out.println("C1用户接收到的信息为:"+new String(message.getBody())); }; CancelCallback cnaelBack = a->{ 
      System.out.println("C1用户进行取消消费操作!"); }; channel.basicConsume(QUEUE_NAME,true,successBack,cnaelBack); } } 

效果演示:
  1. 执行消费者C1,创建出所有交换机和队列绑定后停止运行。
  2. 执行生产者,由于通道积压的六条消息从未被消费。剩余消息进入死信队列
  3. 运行消费者C2,死信队列消息被成功消费

消息积压六条,剩下四条消息进入死信队列,LIM为限制长度标记
启动消费者C2,死信队列消息被消费


消息拒绝应答

在绑定死信队列的消费者端添加需要拒绝应答的消息判断即可,核心代码为消息拒绝应答

/* requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中,未配置则进行丢弃操作*/ channel.basicReject(message.getEnvelope().getDeliveryTag(),false); 

生产者:

package com.dmbjz.noack; import com.dmbjz.utils.RabbitUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.nio.charset.StandardCharsets; /* 死信队列 队列达到最大长度案例 生产者 */ public class Provider { 
       private static final String EXCHANGE_NAME = "normal_exchange"; //正常交换机名称 private static final String KEY = "zhangsan"; //普通队列 RoutingKey public static void main(String[] args) throws Exception { 
       Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //声明交换机 /*循环消息发送*/ for(int i = 1; i < 11; i++) { 
       String message = "INFO " + i; channel.basicPublish(EXCHANGE_NAME,KEY,null,message.getBytes(StandardCharsets.UTF_8)); //发送超级VIP消息 } } } 

消费者C1:
消费者C2的代码和TTL案例保持一致

package com.dmbjz.noack; import com.dmbjz.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; /* 死信队列 队列达到最大长度案例 消费者C1 */ public class ConsumerC1 { 
       private static final String EXCHANGE_NAME = "normal_exchange"; //正常交换机名称 private static final String DEAD_EXCHANGE_NAME = "dead_exchange"; //死信队列交换机名称 private static final String KEY = "zhangsan"; //普通队列 RoutingKey private static final String DEAD_KEY = "lisi"; //死信队列 RoutingKey private static final String QUEUE_NAME = "normal-queue"; //普通队列名称 private static final String DEAD_QUEUE_NAME = "dead-queue"; //死信队列名称 public static void main(String[] args) throws IOException { 
       Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); /*声明死信和普通交换机,正常交换机已被生产者声明,实际可以省略第一行代码*/ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); /*创建队列 * 通过额外参数实现什么情况下转发到死信队列 ?,key都是固定的 * 1、TTL过期时间设置(一般由生产者指定) * 2、死信交换机的名称 * 3、死信交换机的RoutingKey * */ Map<String,Object> arguments = new HashMap<>(8); arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); //死信交换机的名称 arguments.put("x-dead-letter-routing-key",DEAD_KEY); //死信交换机的RoutingKey channel.queueDeclare(QUEUE_NAME,false,false,false,arguments); channel.queueDeclare(DEAD_QUEUE_NAME,false,false,false,null); /*绑定队列*/ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,KEY); channel.queueBind(DEAD_QUEUE_NAME,DEAD_EXCHANGE_NAME,DEAD_KEY); DeliverCallback successBack = (consumerTag, message) -> { 
       String info = new String(message.getBody(),"UTF-8"); if(info.equals("INFO 5")){ 
       System.out.println("C1用户拒绝的信息为:"+new String(message.getBody())); /* requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中,未配置则进行丢弃操作*/ channel.basicReject(message.getEnvelope().getDeliveryTag(),false); }else{ 
       System.out.println("C1用户接收到的信息为:"+new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } }; CancelCallback cnaelBack = a->{ 
       System.out.println("C1用户进行取消消费操作!"); }; channel.basicConsume(QUEUE_NAME,false,successBack,cnaelBack); } } 

效果演示:
  1. 执行消费者C1
  2. 执行生产者,等待10秒钟后查看控制台,消息5被拒绝接收进入死信队列
  3. 运行消费者C2,死信队列消息被成功消费

消息5被消费者C1拒绝接收
死信队列中的消息5被消费者C2消费

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/98747.html

(0)
上一篇 2026-02-07 13:33
下一篇 2023-08-26 16:33

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信