AMQ基础

AMQ基础一对一

大家好,欢迎来到IT知识分享网。

一、消息中间件模式分部

queue一对一实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。

topic一对多实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。

详见 自己连接

二、 代码

  1. 添加activemq的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 
  1. 配置 yml
spring.activemq.broker-url=tcp://172.16.154.27:61616 spring.activemq.user=admin spring.activemq.password= spring.activemq.in-memory=true spring.activemq.pooled=false 
  1. 工厂配置文件,整合amq
@Configuration public class ActiveMQConfig { 
    @Bean public JmsListenerContainerFactory<?> queueListenerFactory(@Qualifier("activeMQConnectionFactory") ActiveMQConnectionFactory connectionFactory) { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(); container.setConcurrentConsumers(3); container.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("3-15"); //连接数 factory.setRecoveryInterval(1000L); //重连间隔时间 factory.setSessionAcknowledgeMode(4); return factory; } @Bean public ActiveMQConnectionFactory activeMQConnectionFactory(){ 
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setTrustAllPackages(true); connectionFactory.setRedeliveryPolicy(redeliveryPolicy()); return connectionFactory; } @Bean public RedeliveryPolicy redeliveryPolicy(){ 
    RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy(); //是否在每次尝试重新发送失败后,增长这个等待时间 redeliveryPolicy.setUseExponentialBackOff(true); //重发次数,默认为6次 redeliveryPolicy.setMaximumRedeliveries(5); //重发时间间隔,默认为1秒 redeliveryPolicy.setInitialRedeliveryDelay(1); //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value redeliveryPolicy.setBackOffMultiplier(2); //是否避免消息碰撞 redeliveryPolicy.setUseCollisionAvoidance(false); //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效 redeliveryPolicy.setMaximumRedeliveryDelay(-1); return redeliveryPolicy; } @Bean public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){ 
    JmsTemplate jmsTemplate=new JmsTemplate(); jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化 jmsTemplate.setConnectionFactory(activeMQConnectionFactory); jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式 return jmsTemplate; } } 
  1. 定义 queue。(也可以topic)
@Configuration public class QueueConfig { 
    @Bean(name="telQueue") public Queue telQueue() { 
    return new ActiveMueue(MESSAGE_TEL); } @Bean(name = "emailQueue") public Queue emailQueue() { 
    return new ActiveMueue(MESSAGE_EMAIL); } } 
  1. 定义生产者—-使用jmsTemplate
@Slf4j @Component public class QueueSender { 
    @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Resource(name = "telQueue") private Queue telQueue; @Resource(name = "emailQueue") private Queue emailQueue; public void sendTel(final MessageTel message){ 
    log.info("发送短信message={}",message); this.jmsMessagingTemplate.convertAndSend(telQueue,message); } public void sendEmail(final MessageEMail message){ 
    log.info("发送email message={}",message); this.jmsMessagingTemplate.convertAndSend(emailQueue,message); } } 
  1. 定义消费者—使用@JmsListener。@JmsListener可指定多个destination 监听多个队列
@Slf4j @Component public class QueueReceiver { 
    @Autowired private MailUtil mailUtil; @Autowired private SendSMSUtil sendSMSUtil; / * 发送短信 * * @param messageTel */ @JmsListener(destination = QueueConfig.MESSAGE_TEL, containerFactory = "queueListenerFactory") public void receiveTel1(MessageTel messageTel) { 
    log.info("-----receive tel message-----"); } / * 发送邮件 * * @param messageEMail */ @JmsListener(destination = QueueConfig.MESSAGE_EMAIL, containerFactory = "queueListenerFactory") // 可监听多个队列 // @JmsListeners(value = {@JmsListener(destination = "T1"), @JmsListener(destination = "T2")}) public void receiveEmail(MessageEMail messageEMail) { 
    log.info("-----receive email message-----"); } } 

三、JMSTemplate

生产者:JmsTemplate 消费者:@JmsListener(destination = "ActiveMueue") 

四、优先级

可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。
如果不指定优先级,默认级别是4。需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。

五、配置信息

amq也可以在application.properires配置消息队列,并在启动类添加@EnableJms开启消息队列。

# failover:(tcp://localhost:61616,tcp://localhost:61617) # tcp://localhost:61616 spring.activemq.broker-url=tcp://localhost:61616 #true 表示使用内置的MQfalse则连接服务器 spring.activemq.in-memory=false #true表示使用连接池;false时,每发送一条数据创建一个连接 spring.activemq.pool.enabled=true #连接池最大连接数 spring.activemq.pool.max-connections=10 #空闲的连接过期时间,默认为30秒 spring.activemq.pool.idle-timeout=30000 #强制的连接过期时间,与idleTimeout的区别在于:idleTimeout是在连接空闲一段时间失效,而expiryTimeout不管当前连接的情况,只要达到指定时间就失效。默认为0,never spring.activemq.pool.expiry-timeout=0 

六、监听器功能

@SendTo将消息处理完继续发送继续处理

@Component public class MessageListener { 
    @JmsListener(destination = "order.queue.message") @SendTo("other.queue") private String receive(String message){ 
    System.out.println("自动接收到order.queue.message消息:"+ message); // 注意是将此方法的返回值返回到 新的队列中 return message; } @JmsListener(destination = "other.queue") private void receiveOther(String message){ 
    System.out.println("接收到other.queue继续发送的消息:" + message); } } 

参考链接~~~

七、持久化

7.1 速度

持久化消息非常慢

默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。

7.2 持久化消息

设置2G左右的持久化文件限制,大量生产持久化消息直到文件达到最大限制,此时生产者阻塞,但消费者可正常连接并消费消息,等消息消费掉一部分,文件删除又腾出空间之后,生产者又可继续发送消息,服务自动恢复正常。

设置2G左右的临时文件限制,大量生产非持久化消息并写入临时文件,在达到最大限制时,生产者阻塞,消费者可正常连接但不能消费消息,或者原本慢速消费的消费者,消费突然停止。整个系统可连接,但是无法提供服务,就这样挂了。

具体原因不详,解决方案:尽量不要用非持久化消息,非要用的话,将临时文件限制尽可能的调大。

八、mq阻塞(amq、rmq)

九、mq消息中间件工作流程

在这里插入图片描述

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/129463.html

(0)
上一篇 2025-08-23 22:10
下一篇 2025-08-23 22:15

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信