JMQ的两种消息模式(点对点消息模式、订阅模式)

JMQ的两种消息模式(点对点消息模式、订阅模式)一 JMQ 的两种消息模式消息列队有两种消息模式 一种是点对点的消息模式 还有一种就是订阅的模式 1 1 点对点的消息模式点对点的模式主要建立在一个队列上面 当连接一个列队的时候 发送端不需要知道接收端是否正在接收 可

大家好,欢迎来到IT知识分享网。

一:JMQ的两种消息模式

消息列队有两种消息模式,一种是点对点的消息模式,还有一种就是订阅的模式.

 

1.1:点对点的消息模式

 

点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息

 

1.2:订阅模式

 

订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是,发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。

 

 

 

 

二:点对点的实现代码

这里使用java来实现一下ActiveMQ的点对点模式。

ActiveMQ版本为 5.13.3

项目使用MAVEN来构建

 <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> </dependencies>

都是当前最新的版本

 

2.1:点对点的发送端

package com.test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /  * 消息的生产者(发送者)  * @author liang  *  */ public class JMSProducer {     //默认连接用户名     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;     //默认连接密码     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;     //默认连接地址     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;     //发送的消息数量     private static final int SENDNUM = 10;     public static void main(String[] args) {         //连接工厂         ConnectionFactory connectionFactory;         //连接         Connection connection = null;         //会话 接受或者发送消息的线程         Session session;         //消息的目的地         Destination destination;         //消息生产者         MessageProducer messageProducer;         //实例化连接工厂         connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);         try {             //通过连接工厂获取连接             connection = connectionFactory.createConnection();             //启动连接             connection.start();             //创建session             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);             //创建一个名称为HelloWorld的消息队列             destination = session.createQueue("HelloWorld");             //创建消息生产者             messageProducer = session.createProducer(destination);             //发送消息             sendMessage(session, messageProducer);             session.commit();         } catch (Exception e) {             e.printStackTrace();         }finally{             if(connection != null){                 try {                     connection.close();                 } catch (JMSException e) {                     e.printStackTrace();                 }             }         }     }     /      * 发送消息      * @param session      * @param messageProducer  消息生产者      * @throws Exception      */     public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{         for (int i = 0; i < JMSProducer.SENDNUM; i++) {             //创建一条文本消息             TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);             System.out.println("发送消息:Activemq 发送消息" + i);             //通过消息生产者发出消息             messageProducer.send(message);         }     } } 

 

 

2.2:点对点的接收端

 

package com.test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /  * 消息的消费者(接受者)  * @author liang  *  */ public class JMSConsumer {     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址     public static void main(String[] args) {         ConnectionFactory connectionFactory;//连接工厂         Connection connection = null;//连接         Session session;//会话 接受或者发送消息的线程         Destination destination;//消息的目的地         MessageConsumer messageConsumer;//消息的消费者         //实例化连接工厂         connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);         try {             //通过连接工厂获取连接             connection = connectionFactory.createConnection();             //启动连接             connection.start();             //创建session             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);             //创建一个连接HelloWorld的消息队列             destination = session.createQueue("HelloWorld");             //创建消息消费者             messageConsumer = session.createConsumer(destination);             while (true) {                 TextMessage textMessage = (TextMessage) messageConsumer.receive();                 if(textMessage != null){                     System.out.println("收到的消息:" + textMessage.getText());                 }else {                     break;                 }             }         } catch (JMSException e) {             e.printStackTrace();         }     } } 

 

 

 

 

 

三:订阅/发布模式的实现代码

3.1:订阅模式的发送端

package com.ding;

 
    import javax.jms.Connection;  
    import javax.jms.ConnectionFactory;  
    import javax.jms.DeliveryMode;  
    import javax.jms.Destination;  
    import javax.jms.JMSException;  
    import javax.jms.MapMessage;  
    import javax.jms.MessageProducer;  
    import javax.jms.Session;  
    import javax.jms.TextMessage;  
      
    import org.apache.activemq.ActiveMQConnectionFactory;  
      
    public class publisher {  
      
        // 单例模式  
      
        // 1、连接工厂  
        private ConnectionFactory connectionFactory;  
        // 2、连接对象  
        private Connection connection;  
        // 3、Session对象  
        private Session session;  
        // 4、生产者  
        private MessageProducer messageProducer;  
      
        public publisher() {  
      
            try {  
                this.connectionFactory = new ActiveMQConnectionFactory(“zhangsan”,  
                        “123”, “tcp://localhost:61616”);  
                this.connection = connectionFactory.createConnection();  
                this.connection.start();  
                // 不使用事务  
                // 设置客户端签收模式  
                this.session = this.connection.createSession(false,  
                        Session.AUTO_ACKNOWLEDGE);  
                this.messageProducer = this.session.createProducer(null);  
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
      
        }  
      
        public Session getSession() {  
            return this.session;  
        }  
      
        public void send1(/* String QueueName, Message message */) {  
            try {  
      
                Destination destination = this.session.createTopic(“topic1”);  
                MapMessage msg1 = this.session.createMapMessage();  
                msg1.setString(“name”, “张三”);  
                msg1.setInt(“age”, 22);  
      
                MapMessage msg2 = this.session.createMapMessage();  
                msg2.setString(“name”, “李四”);  
                msg2.setInt(“age”, 25);  
      
                MapMessage msg3 = this.session.createMapMessage();  
                msg3.setString(“name”, “张三”);  
                msg3.setInt(“age”, 30);  
      
                // 发送消息到topic1  
                this.messageProducer.send(destination, msg1,  
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
                this.messageProducer.send(destination, msg2,  
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
                this.messageProducer.send(destination, msg3,  
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
      
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
        }  
      
        public void send2() {  
            try {  
                Destination destination = this.session.createTopic(“topic1”);  
                TextMessage message = this.session.createTextMessage(“我是一个字符串”);  
                // 发送消息  
                this.messageProducer.send(destination, message,  
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
      
        }  
      
        public static void main(String[] args) {  
            publisher producer = new publisher();  
            producer.send1();  
      
        }  
      
    } 





































































































 

订阅模式的接受端

 

 

四:发送消息的数据类型

上面的代码演示,全部都是发送字符串,但是ActiveMQ支持哪些数据呢?

大家可以看一下  javax.jms.Message 这个接口,只要是这个接口的数据,都可以被发送。

 

或者这样看起来有点麻烦,那么看到上面的代码,创建消息,是通过session这个对象来创建的,那我们来看一下这里有哪些可以被创建的呢?

 //纯字符串的数据  session.createTextMessage(); //序列化的对象  session.createObjectMessage(); //流,可以用来传递文件等  session.createStreamMessage(); //用来传递字节  session.createBytesMessage(); //这个方法创建出来的就是一个map,可以把它当作map来用,当你看了它的一些方法,你就懂了  session.createMapMessage(); //这个方法,拿到的是javax.jms.Message,是所有message的接口 session.createMessage();

 

 

4.1:传递javabean对象

传递一个java对象,可能是最多的使用方式了,而且这种数据接收与使用都方便,那么,下面的代码就来演示下如何发送一个java对象

当然了,这个对象必须序列化,也就是实现Serializable接口

 

 //通过这个方法,可以把一个对象发送出去,当然,这个对象需要序列化,因为一切在网络在传输的,都是字节 ObjectMessage obj = session.createObjectMessage(); for(int i = 0 ; i < 100 ; i ++){ Person p = new Person(i,"名字"); obj.setObject(p); producer.send(obj); }

 

那么在接收端要怎么接收这个对象呢?

 

 //实现一个消息的监听器 //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { //同样的,强转为ObjectMessage,然后拿到对象,强转为Person Person p = (Person) ((ObjectMessage)message).getObject(); System.out.println(p); } catch (JMSException e) { e.printStackTrace(); } } });

 

 

 

 

4.2:发送文件

 

发送文件,这里用BytesMessage

 BytesMessage bb = session.createBytesMessage(); bb.writeBytes(new byte[]{2});

至于这里的new Byte[]{2},肯定不是这样写的,从文件里面拿流出来即可

 

 

接收的话

 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { BytesMessage bm = (BytesMessage)message; FileOutputStream out = null; try { out = new FileOutputStream("d:/1.ext"); } catch (FileNotFoundException e2) { e2.printStackTrace(); } byte[] by = new byte[1024]; int len = 0 ; try { while((len = bm.readBytes(by))!= -1){ out.write(by,0,len); } } catch (JMSException | IOException e1) { e1.printStackTrace(); } } });

 

 

 

 

 

 

五:ActiveMQ的应用

5.1:保证消息的成功处理

消息发送成功后,接收端接收到了消息。然后进行处理,但是可能由于某种原因,高并发也好,IO阻塞也好,反正这条消息在接收端处理失败了。而点对点的特性是一条消息,只会被一个接收端给接收,只要接收端A接收成功了,接收端B,就不可能接收到这条消息,如果是一些普通的消息还好,但是如果是一些很重要的消息,比如说用户的支付订单,用户的退款,这些与金钱相关的,是必须保证成功的,那么这个时候要怎么处理呢?

 

我们可以使用  CLIENT_ACKNOWLEDGE  模式

 

之前其实就有提到当创建一个session的时候,需要指定其事务,及消息的处理模式,当时使用的是

 

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

AUTO_ACKNOWLEDGE 

这一个代码的是,当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端接收到同样的消息。

那么,它还有另外一个模式,那就是 CLIENT_ACKNOWLEDGE

这行要写在接收端里面,不是写在发送端的

 

session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

 

这行代码以后,如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确定了消息。

那么要怎么确认消息呢?

在接收端接收到消息的时候,调用javax.jms.Message的acknowledge方法

@Override public void onMessage(Message message) { try { //获取到接收的数据 String text = ((TextMessage)message).getText(); System.out.println(text); //确认接收,并成功处理了消息  message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } }

这样,当消息处理成功之后,确认消息,如果不确定,activemq将会发给下一个接收端处理

 注意:只在点对点中有效,订阅模式,即使不确认,也不会保存消息

 

 

5.2:避免消息队列的并发

JMQ设计出来的原因,就是用来避免并发的,和沟通两个系统之间的交互。

 

5.2.1:主动接收队列消息

 

先看一下之前的代码:

 //实现一个消息的监听器 //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { //获取到接收的数据 String text = ((TextMessage)message).getText(); System.out.println(text); //确认接收,并成功处理了消息  message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } });

之前的代码里面,实现了一个监听器,监听消息的传递,这样只要每有一个消息,都会即时的传递到程序中。

但是,这样的处理,在高并发的时候,因为它是被动接收,并没有考虑到程序的处理能力,可能会压跨系统,那要怎么办呢?

 

答案就是把被动变为主动,当程序有着处理消息的能力时,主动去接收一条消息进行处理

 

实现的代码如下:

      if(当程序有能力处理){ 
  //当程序有能力处理时接收 Message receive = consumer.receive();            //这个可以设置超时时间,超过则不等待消息              recieve.receive(10000); //其实receive是一个阻塞式方法,一定会拿到值的 if(null != receive){ String text = ((TextMessage)receive).getText(); receive.acknowledge(); System.out.println(text); }else{ //没有值嘛 //  } } 

 

通过上面的代码,就可以让程序自已判断,自己是否有能力接收这条消息,如果不能接收,那就给别的接收端接收,或者等自己有能力处理的时候接收

 

 

5.2.2:使用多个接收端

ActiveMQ是支持多个接收端的,如果当程序无法处理这么多数据的时候,可以考虑多个线程,或者增加服务器来处理。

 

 

 

5.3:消息有效期的管理

这样的场景也是有的,一条消息的有效时间,当发送一条消息的时候,可能希望这条消息在指定的时间被处理,如果超过了指定的时间,那么这条消息就失效了,就不需要进行处理了,那么我们可以使用ActiveMQ的设置有效期来实现

 

代码如下:

 TextMessage msg = session.createTextMessage("哈哈"); for(int i = 0 ; i < 100 ; i ++){ //设置该消息的超时时间 producer.setTimeToLive(i * 1000); producer.send(msg); }

 

这里每一条消息的有效期都是不同的,打开ip:8161/admin/就可以查看到,里面的消息越来越少了。

 

过期的消息是不会被接收到的。

 

过期的消息会从队列中清除,并存储到ActiveMQ.DLQ这个队列里面,这个稍后会解释。

 

 

5.4:过期消息,处理失败的消息如何处理

 过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。

这个队列是ActiveMQ自动创建的。

如果需要查看这些未被处理的消息,可以进入这个队列中查看

//指定一个目的地,也就是一个队列的位置 destination = session.createQueue("ActiveMQ.DLQ");

这样就可以进入队列中,然后实现接口,或者通过receive()方法,就可以拿到未被处理的消息,从而保证正确的处理

 

 

六:ActiveMQ的安全配置

6.1:管理后台的密码设置

我们都知道,打开ip:8161/admin/ 就是activemq的管理控制台,它的默认账号和密码都是admin,在生产环境肯定需要更改密码的,这要怎么做呢?

在activemq/conf/jetty.xml中找到

 <pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="admin" /> <!-- 把这个改为true,当然,高版本的已经改为了true --> <property name="authenticate" value="true" /> </bean>

高版本的已经默认成为了true。所以我们直接进行下一步即可

在activemq/conf/jetty-realm.properties文件中配置,打开如下

 --------------------------------------------------------------------------- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0  Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --------------------------------------------------------------------------- # Defines users that can access the web (console, demo, etc.) # username: password [,rolename ...] #用户名,密码,角色 admin: admin, admin user: user, user

注意:大家重点看倒数第二行,那里三个分别是用户名,密码,角色,其中admin角色是固定的

 

 

6.2:生产消费者的连接密码

注意:activemq默认是不需要密码,生产消费者就可以连接的

我们需要经过配置,才能设置密码,这一步在生产环境中一定要配置

<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins>

打开这个文件如下

 --------------------------------------------------------------------------- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0  Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --------------------------------------------------------------------------- # Defines credentials that will be used by components (like web console) to access the broker #账号 activemq.username=admin #密码 activemq.password= guest.password=password

 

 

这样就配置完毕了。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/120365.html

(0)
上一篇 2025-10-30 11:00
下一篇 2025-10-30 11:15

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信