微服务 SpringCloud Stream消息驱动

微服务 SpringCloud Stream消息驱动微服务 SpringCloudS 消息驱动消息驱动概述 1 概念描述 2 官网介绍 3 适用场景 4 设计思想标准 MQ 为什么用 CloudStreamS 中消息通信方式遵循发布 订阅模式 5 SpringCloud

大家好,欢迎来到IT知识分享网。

1. Stream概述

1.1 Stream简介

屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型。

1.2 Stream官网介绍

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/

1.3 Stream适用场景

也就是当你项目中同时存在RabbitMQ与kafka的时候,可以引入SpringCloud Stream可以实现屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型。

1.4 Stream设计思想

1.5 Stream通信方式

1.6 Stream标准流程

在这里插入图片描述
Spring Cloud Stream三大核心组件
Binder:很方便的连接中间件,屏蔽差异
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置。
Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。



1.7 Stream编码API和常用注解

在这里插入图片描述

2. Stream案例说明

3. Stream生产者

引入依赖

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloudboot3</artifactId> <groupId>com.hello.springcloud</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8801</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--基础配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project> 

yml

server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址 

主启动

/ * @author zrj * @date 2021/2/7 * @since V1.0 / @SpringBootApplication public class StreamMqApplication8801 { 
    public static void main(String[] args) { 
    SpringApplication.run( StreamMqApplication8801.class, args ); } } 

service

/ * @author zrj * @date 2021/2/7 * @since V1.0 / public interface IMessageProvider { 
    / * 发送消息 */ String send(); } 

serviceImpl

 / * @author zrj * @date 2021/2/7 * @since V1.0 / @EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { 
    / * 消息发送管道 * <p> * 注意:这样定义 private MessageChannel messageChannel; * 会出现如下异常 * org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sendMessageController': * Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: * Error creating bean with name 'messageProviderImpl': Injection of resource dependencies failed; */ @Resource private MessageChannel output; @Override public String send() { 
    String serial = UUID.randomUUID().toString(); output.send( MessageBuilder.withPayload( serial ).build() ); System.out.println( "*serial: " + serial ); return null; } } 

controller

 / * @author zrj * @date 2021/2/7 * @since V1.0 / @RestController public class SendMessageController { 
    @Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() { 
    return messageProvider.send(); } } 

4. Stream消费者

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloudboot3</artifactId> <groupId>com.hello.springcloud</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-consumer8803</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--基础配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project> 

yml

server: port: 8803 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: groupA eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: receive-8803.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址 

主启动

 / * @author zrj * @date 2021/2/7 * @since V1.0 / @SpringBootApplication public class StreamMqApplication8803 { 
    public static void main(String[] args) { 
    SpringApplication.run( StreamMqApplication8803.class, args ); } } 

业务类

 / * @author zrj * @date 2021/2/7 * @since V1.0 / @RestController @EnableBinding(Sink.class) public class ReceiveMessageListenerController { 
    @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { 
    System.out.println( "消费者2号,----->接受到的消息: " + message.getPayload() + "\t port: " + serverPort ); } } 

5. Stream分组消费与持久化

5.1 Stream分组消费

5.2 Stream持久化

在这里插入图片描述

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/105135.html

(0)
上一篇 2026-02-16 20:21
下一篇 2026-02-17 21:11

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信