大家好,欢迎来到IT知识分享网。
目录
1:什么是Strem
Spring Cloud Stream是一个构建消息驱动微服务框架。他的作用是屏蔽消息中间件的技术差异,就像jdbc连接不同的数据库一样,我们不直接调用消息中间件,我们通过Strem来使用消息中间来发送消息。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。
2:Strem架构
2.1:传统的消息对列架构
生产者产生消息,消费者消费消息,但是消息对列插件有很多,在分布式项目中,我们要熟悉各种技术,增加学习成本,所以引入了Strem,来屏蔽中间的技术差异
2.2:strem的消息对列架构
1:input消息接受者
2:outpu消息发送者
3:Binder:方便连接中间件,屏蔽差异
4:Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
5:Source、Sink:简单的可理解为参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
组成 | 说明 |
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder |
Binder是应用于消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder 通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
3:案例介绍
3.1:8001生产者
pom文件配置
<dependencies> <!--stream的rabbit新增配置--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- spring cloud config 客户端包 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <!--添加消息总线RabbitMQ支持--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <!--Spring web 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--Springcloud的hystrix组件 服务降级 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <!--引入openfeignjar--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!--Springcloud的eureka组件 client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--图形监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
配置:application.yml
#端口 server: port: 8001 #eureka集群注册 eureka: client: serviceUrl: defaultZone: http://eservice2:8082/eureka/,http://eservice1:8081/eureka/ register-with-eureka: true fetch-registry: true spring: application: name: producer8001 cloud: stream: binders: #在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的命名,用于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关环境配置 spring: tabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 output: #此处代表生产者 destination: huyijuExchange #表示要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,文本则为 text/plain binder: defaultRabbit #设置要绑定的消息服务的具体设置
消息接口及实现:
public interface SendMessage { public String send(String msg); } @EnableBinding(Source.class) //消息推送者固定注解 public class SendMessageImpl implements SendMessage { @Resource //注入指定名字通道 output MessageChannel output; @Override public String send(String msg) { String serial = UUID.randomUUID().toString(); //发送消息 output.send(MessageBuilder.withPayload(serial).build()); System.out.println("发送者8001发送消息:"+serial); return null; } }
Controller:
@RestController public class SendMessageController { @Autowired SendMessageImpl sendMessage; @RequestMapping(value = "/send") public String sendMessage() { //调用接口发送消息 return sendMessage.send("测试"); } }
启动类:
@SpringBootApplication @EnableEurekaClient public class Main8001 { public static void main(String[] args) { SpringApplication.run(Main8001.class, args); } }
3.2:8002、8003消费者配置一致
<dependencies> <!--stream的rabbit新增配置--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- spring cloud config 客户端包 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <!--添加消息总线RabbitMQ支持--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <!--Spring web 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--Springcloud的hystrix组件 服务降级 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <!--引入openfeignjar--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!--Springcloud的eureka组件 client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--图形监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
配置:application.yml
#8003和8002端口 server: port: 8003 #eureka集群注册 eureka: client: serviceUrl: defaultZone: http://eservice2:8082/eureka/,http://eservice1:8081/eureka/ register-with-eureka: true fetch-registry: true spring: application: name: consumer1 cloud: stream: binders: #在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的命名,用于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关环境配置 spring: tabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 input: #此处代表消费者 destination: huyijuExchange #表示要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,文本则为 text/plain binder: defaultRabbit #设置要绑定的消息服务的具体设置 group: groupA #添加group可以持久化,名字一致的话,只能单个消费者消费
Listener:
/ * @author :huyiju * @date :2020-05-13 18:31 */ @Component @EnableBinding(Sink.class) //消费者注解 public class MessageListener { @StreamListener(Sink.INPUT)//消费者注解监听 public void receive(Message<String> message){ System.out.println("消费值8003接收到消息:"+message.getPayload()); } }
启动类:
@SpringBootApplication @EnableEurekaClient public class Main8003 { public static void main(String[] args) { SpringApplication.run(Main8003.class, args); } }
测试:启动eureka,启动RabbitMQ,启动8801生产者和8802消费者:
访问:http://localhost:8001/send 发送消息
4:总结消息持久化
配置了group属性的模块支持消息持久化,名字相同不会重复消费
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/127768.html