大家好,欢迎来到IT知识分享网。
文章目录
大部分情况下,我们可能都是在 Spring Boot 或者 Spring Cloud 环境下使用 RabbitMQ,因此本文我也主要从这两个方面来和大家分享 RabbitMQ 的用法。
一、RabbitMQ 架构简介
二、准备工作
大家知道,RabbitMQ 是 AMQP 阵营里的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖,如下:
项目创建成功后,在 application.properties 中配置 RabbitMQ 的基本连接信息,如下:
spring.rabbitmq.host=localhost spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.port=5672
接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
三、消息收发
1. Hello World
先来看看队列的定义:
@Configuration public class HelloWorldConfig {
public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue"; @Bean Queue queue1() {
return new Queue(HELLO_WORLD_QUEUE_NAME); } }
再来看看消息消费者的定义:
@Component public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(String msg) {
System.out.println("msg = " + msg); } }
消息发送:
@SpringBootTest class RabbitmqdemoApplicationTests {
@Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() {
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello"); } }
这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息routing key相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
2. Work queues
这种情况是这样的:
先来看并发能力的配置,如下:
@Component public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(String msg) {
System.out.println("receive = " + msg); } @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10") public void receive2(String msg) {
System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName()); } }
可以看到,第二个消费者我配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去消费消息。
消息发送方式如下:
@SpringBootTest class RabbitmqdemoApplicationTests {
@Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello"); } } }
当然消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息,配置手动 ack 的方式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费代码如下:
@Component public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(Message message,Channel channel) throws IOException {
System.out.println("receive="+message.getPayload()); channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true); } @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10") public void receive2(Message message, Channel channel) throws IOException {
System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName()); channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true); } }
此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息。
这就是 Work queues 这种情况。
3. Publish/Subscrite
再来看发布订阅模式,这种情况是这样:
3.1. Direct
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。DirectExchange 的配置如下:
@Configuration public class RabbitDirectConfig {
public final static String DIRECTNAME = "javaboy-direct"; @Bean Queue queue() {
return new Queue("hello-queue"); } @Bean DirectExchange directExchange() {
return new DirectExchange(DIRECTNAME, true, false); } @Bean Binding binding() {
return BindingBuilder.bind(queue()) .to(directExchange()).with("direct"); } }
- 首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除。
- 创建一个Binding对象将Exchange和Queue绑定在一起。
- DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可。
再来看看消费者:
@Component public class DirectReceiver {
@RabbitListener(queues = "hello-queue") public void handler1(String msg) {
System.out.println("DirectReceiver:" + msg); } }
通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息。然后在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送,如下:
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests {
@Autowired RabbitTemplate rabbitTemplate; @Test public void directTest() {
rabbitTemplate.convertAndSend("hello-queue", "hello direct!"); } }
3.2. Fanout
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:
@Configuration public class RabbitFanoutConfig {
public final static String FANOUTNAME = "sang-fanout"; @Bean FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUTNAME, true, false); } @Bean Queue queueOne() {
return new Queue("queue-one"); } @Bean Queue queueTwo() {
return new Queue("queue-two"); } @Bean Binding bindingOne() {
return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo() {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); } }
在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创建两个消费者,如下:
@Component public class FanoutReceiver {
@RabbitListener(queues = "queue-one") public void handler1(String message) {
System.out.println("FanoutReceiver:handler1:" + message); } @RabbitListener(queues = "queue-two") public void handler2(String message) {
System.out.println("FanoutReceiver:handler2:" + message); } }
两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests {
@Autowired RabbitTemplate rabbitTemplate; @Test public void fanoutTest() {
rabbitTemplate .convertAndSend(RabbitFanoutConfig.FANOUTNAME, null, "hello fanout!"); } }
注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null。
3.3. Topic
TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。TopicExchange 配置如下:
@Configuration public class RabbitTopicConfig {
public final static String TOPICNAME = "sang-topic"; @Bean TopicExchange topicExchange() {
return new TopicExchange(TOPICNAME, true, false); } @Bean Queue xiaomi() {
return new Queue("xiaomi"); } @Bean Queue huawei() {
return new Queue("huawei"); } @Bean Queue phone() {
return new Queue("phone"); } @Bean Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomi()).to(topicExchange()) .with("xiaomi.#"); } @Bean Binding huaweiBinding() {
return BindingBuilder.bind(huawei()).to(topicExchange()) .with("huawei.#"); } @Bean Binding phoneBinding() {
return BindingBuilder.bind(phone()).to(topicExchange()) .with("#.phone.#"); } }
- 首先创建 TopicExchange,参数和前面的一致。然后创建三个 Queue,第一个 Queue 用来存储和 “xiaomi” 有关的消息,第二个 Queue 用来存储和 “huawei” 有关的消息,第三个 Queue 用来存储和 “phone” 有关的消息。
- 将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。
接下来针对三个 Queue 创建三个消费者,如下:
@Component public class TopicReceiver {
@RabbitListener(queues = "phone") public void handler1(String message) {
System.out.println("PhoneReceiver:" + message); } @RabbitListener(queues = "xiaomi") public void handler2(String message) {
System.out.println("XiaoMiReceiver:"+message); } @RabbitListener(queues = "huawei") public void handler3(String message) {
System.out.println("HuaWeiReceiver:"+message); } }
然后在单元测试中进行消息的发送,如下:
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests {
@Autowired RabbitTemplate rabbitTemplate; @Test public void topicTest() {
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.news","小米新闻.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "huawei.news","华为新闻.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.phone","小米手机.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "huawei.phone","华为手机.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "phone.news","手机新闻.."); } }
根据 RabbitTopicConfig 中的配置,第一条消息将被路由到名称为 “xiaomi” 的 Queue 上,第二条消息将被路由到名为 “huawei” 的 Queue 上,第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,最后一条消息则将被路由到名为 “phone” 的 Queue 上。
3.4. Header
HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关,配置如下:
@Configuration public class RabbitHeaderConfig {
public final static String HEADERNAME = "javaboy-header"; @Bean HeadersExchange headersExchange() {
return new HeadersExchange(HEADERNAME, true, false); } @Bean Queue queueName() {
return new Queue("name-queue"); } @Bean Queue queueAge() {
return new Queue("age-queue"); } @Bean Binding bindingName() {
Map<String, Object> map = new HashMap<>(); map.put("name", "sang"); return BindingBuilder.bind(queueName()) .to(headersExchange()).whereAny(map).match(); } @Bean Binding bindingAge() {
return BindingBuilder.bind(queueAge()) .to(headersExchange()).where("age").exists(); } }
这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,第一个 bindingName 方法中,whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,就把该消息路由到名为 “name-queue” 的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含 age,不管 age 的值是多少,都将消息路由到名为 “age-queue” 的 Queue 上。
接下来创建两个消息消费者:
@Component public class HeaderReceiver {
@RabbitListener(queues = "name-queue") public void handler1(byte[] msg) {
System.out.println("HeaderReceiver:name:" + new String(msg, 0, msg.length)); } @RabbitListener(queues = "age-queue") public void handler2(byte[] msg) {
System.out.println("HeaderReceiver:age:" + new String(msg, 0, msg.length)); } }
注意这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法,这里消息的发送也和 routingkey 无关,如下:
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests {
@Autowired RabbitTemplate rabbitTemplate; @Test public void headerTest() {
Message nameMsg = MessageBuilder .withBody("hello header! name-queue".getBytes()) .setHeader("name", "sang").build(); Message ageMsg = MessageBuilder .withBody("hello header! age-queue".getBytes()) .setHeader("age", "99").build(); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg); } }
这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去。
4. Routing
这种情况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。
5. Topics
这种情况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写时注意格式 xxx.xxx.xxx 去编写。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/118871.html
