【ZMQ】ZMQ/ZeroMQ简介、三种消息模式demo程序

【ZMQ】ZMQ/ZeroMQ简介、三种消息模式demo程序ZeroMQ 也称为 MQ 0MQ 或 zmq 看起来像是一个可嵌入的网络库 但它的作用类似于一个并发框架

大家好,欢迎来到IT知识分享网。

一、什么是ZMQ

ZeroMQ(也称为ÖMQ、0MQ或zmq)看起来像是一个可嵌入的网络库,但它的作用类似于一个并发框架。它为您提供了在进程内、进程间、TCP和多播等各种传输中传递原子消息的套接字。您可以使用扇出、发布订阅、任务分发和请求回复等模式将套接字N到N连接起来。它的速度足以成为集群产品的结构。它的异步I/O模型为您提供了可扩展的多核应用程序,构建为异步消息处理任务。它有许多语言API,并在大多数操作系统上运行。

ZeroMQ(也拼写为ÖMQ、0MQ或ZMQ)是一个高性能异步消息传递库,旨在用于分布式或并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。

ZeroMQ支持多种传输(TCP、进程内、进程间、多播、WebSocket等)上的通用消息传递模式(发布/订阅、请求/回复、客户端/服务器等),使进程间消息传递与线程间消息传递一样简单。这使您的代码保持清晰、模块化和极易扩展。

ZeroMQ是由大量贡献者开发的。有许多流行编程语言的第三方绑定,以及C#和Java的本机端口。

二、ZMQ的特点

1、组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须现有服务端启动,在启动客户端,否则会报错。

2、ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。

3、ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。

4、ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。

5、ZMQ提供了多种模式进行消息路由,如请求-应答模式(REQ/RES),发布-订阅模式(P/S)和推拉模式(P/P)等,这些模式可以用来搭建网络拓扑结构。

6、ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。同时ZeroMQ不在乎目的是否存在。

7、TCP的通信拓扑是一对一的,而ZMQ可以是一对一、一对多、多对一或者多对多。

8、ZeroMQ传输的是消息,TCP传输字节。

三、Demo程序代码

3.1 发布-订阅模式(P/S)demo

发布端:

package com.example.demozmq.zmq; import java.nio.charset.StandardCharsets; import java.util.Random; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class ZmqPublisher { 
    public static void main(String[] args) throws InterruptedException { 
    Context context = ZMQ.context(1); Socket socket = context.socket(ZMQ.PUB); // 绑定端口 socket.bind("tcp://*:5556"); Random random = new Random(1000); while (true) { 
    // 随机生成一个整数 int value = random.nextInt(); // 将整数作为消息发布到通道上 byte[] topic = "value".getBytes(StandardCharsets.UTF_8); byte[] data = Integer.toString(value).getBytes(StandardCharsets.UTF_8); socket.sendMore(topic); socket.send(data); System.out.println("发送整数:| " + value); Thread.sleep(3000); } } } 

订阅端:

package com.example.demozmq.zmq; import java.nio.charset.StandardCharsets; import org.zeromq.ZMQ; public class ZmqSubscriber { 
    public static void main(String[] args) { 
    ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.SUB); // 连接服务端 socket.connect("tcp://localhost:5556"); // 订阅主题的value socket.subscribe("value".getBytes(StandardCharsets.UTF_8)); while (true) { 
    // 从通道上接收消息 byte[] topic = socket.recv(); byte[] data = socket.recv(); int value = Integer.parseInt(new String(data)); System.out.println("订阅的主题:" + new String(topic)); System.out.println(System.currentTimeMillis() + "接收到的整数:" + value); } } } 

3.2 请求-应答模式(REQ/RES)demo

请求端:

package com.example.demozmq.zmq; import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.zeromq.ZMQ; @SpringBootTest public class ZmqClient { 
    @Test public void testSendMessage2Server() { 
    ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.REQ); // 连接服务器 socket.connect("tcp://localhost:5555"); // 发送消息 String text = "你好,我是客户端。"; byte[] bytes = text.getBytes(StandardCharsets.UTF_8); socket.send(bytes); // 等待回复 byte[] reply = socket.recv(0); String response = new String(reply); System.out.println("接收到服务器的消息是:" + response); } public static void main(String[] args) throws InterruptedException { 
    ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.REQ); // 连接服务器 socket.connect("tcp://localhost:5555"); for (int i = 0; i < 10000; i++) { 
    // 发送消息 String text = "你好,我是客户端。" + i; byte[] bytes = text.getBytes(StandardCharsets.UTF_8); socket.send(bytes); // 等待回复 byte[] reply = socket.recv(0); String response = new String(reply); System.out.println("接收到服务器的消息是:" + response); Thread.sleep(5000); } } } 

响应端:

package com.example.demozmq.zmq; import java.nio.charset.Charset; import org.zeromq.ZMQ; public class ZmqServer { 
    public static void main(String[] args) { 
    try { 
    ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.REP); // 绑定端口 socket.bind("tcp://*:5555"); while (true) { 
    // 等待接收消息 byte[] request = socket.recv(); String reqTest = new String(request); System.out.println("接收到的消息:" + reqTest); // 返回消息给客户端 byte[] reply = (System.currentTimeMillis() + "你好啊,我是服务端。").getBytes(Charset.defaultCharset()); socket.send(reply, 0); } } catch (Exception e) { 
    throw new RuntimeException(e); } } } 

3.3 推拉模式(P/P)demo

推拉模式,PUSH发送。PULL方接收。PUSH可以和多个PULL建立连接,PUSH发送的数据被顺序发送给PULL方。如果是多个PULL,假如第一条消息发送给PULL1,那么第二条消息就会发送给PULL2,第三条又会发给PULL1,一直循环。发送消息的时候也是按照这个顺序发送,保证数据能够准确到达目的地。

推送消息端:

package com.example.demozmq.zmq; import java.nio.charset.StandardCharsets; import org.zeromq.ZMQ; public class ZmqPush { 
    public static void main(String[] args) throws InterruptedException { 
    ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.PUSH); socket.connect("ipc://fjs"); for (int i = 1; i <= 10000; i++) { 
    socket.send(("hello【" + i + "】").getBytes(StandardCharsets.UTF_8)); System.out.println("已发送" + i + "次"); Thread.sleep(3000); } socket.close(); context.term(); } } 

拉取消息端:

package com.example.demozmq.zmq; import org.zeromq.ZMQ; public class ZmqPullServer { 
    public static void main(String[] args) { 
    ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.PULL); socket.bind("ipc://fjs"); while (true) { 
    byte[] data = socket.recv(); System.out.println("拉取的数据:" + new String(data)); } } } 

以上代码可以直接复制使用,如有错误请多多指教!

本文完结!

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/114754.html

(0)
上一篇 2025-12-06 21:33
下一篇 2025-12-06 22:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信