大家好,欢迎来到IT知识分享网。
微服务 SpringCloud Stream消息驱动
1. Stream概述
1.1 Stream简介
屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型。
1.2 Stream官网介绍
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/
1.3 Stream适用场景
也就是当你项目中同时存在RabbitMQ与kafka的时候,可以引入SpringCloud Stream可以实现屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型。
1.4 Stream设计思想
1.5 Stream通信方式
1.6 Stream标准流程
Spring Cloud Stream三大核心组件
Binder:很方便的连接中间件,屏蔽差异
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置。
Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
1.7 Stream编码API和常用注解
2. Stream案例说明
3. Stream生产者
引入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloudboot3</artifactId> <groupId>com.hello.springcloud</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8801</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--基础配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
yml
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
主启动
/ * @author zrj * @date 2021/2/7 * @since V1.0 / @SpringBootApplication public class StreamMqApplication8801 {
public static void main(String[] args) {
SpringApplication.run( StreamMqApplication8801.class, args ); } }
service
/ * @author zrj * @date 2021/2/7 * @since V1.0 / public interface IMessageProvider {
/ * 发送消息 */ String send(); }
serviceImpl
/ * @author zrj * @date 2021/2/7 * @since V1.0 / @EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider {
/ * 消息发送管道 * <p> * 注意:这样定义 private MessageChannel messageChannel; * 会出现如下异常 * org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sendMessageController': * Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: * Error creating bean with name 'messageProviderImpl': Injection of resource dependencies failed; */ @Resource private MessageChannel output; @Override public String send() {
String serial = UUID.randomUUID().toString(); output.send( MessageBuilder.withPayload( serial ).build() ); System.out.println( "*serial: " + serial ); return null; } }
controller
/ * @author zrj * @date 2021/2/7 * @since V1.0 / @RestController public class SendMessageController {
@Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() {
return messageProvider.send(); } }
4. Stream消费者
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloudboot3</artifactId> <groupId>com.hello.springcloud</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-consumer8803</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--基础配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
yml
server: port: 8803 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: groupA eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: receive-8803.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
主启动
/ * @author zrj * @date 2021/2/7 * @since V1.0 / @SpringBootApplication public class StreamMqApplication8803 {
public static void main(String[] args) {
SpringApplication.run( StreamMqApplication8803.class, args ); } }
业务类
/ * @author zrj * @date 2021/2/7 * @since V1.0 / @RestController @EnableBinding(Sink.class) public class ReceiveMessageListenerController {
@Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) {
System.out.println( "消费者2号,----->接受到的消息: " + message.getPayload() + "\t port: " + serverPort ); } }
5. Stream分组消费与持久化
5.1 Stream分组消费
5.2 Stream持久化
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/105135.html


