WebFlux响应式框架快速入门

WebFlux响应式框架快速入门传统的 SpringMVC 是基于 ServletAPI 的框架

大家好,欢迎来到IT知识分享网。


一、WebFlux简介

1.是什么?

WebFlux的官网地址

PS:非阻塞请参照此文章《彻底理解非阻塞IO(NIO)》

2.为何诞生?

部分原因是需要一种非阻塞式的Web技术栈,以少量线程处理并发,并使用更少的硬件资源进行伸缩。

另一部分原因是函数式编程已经有了基础。带注释的REST控制器或单元测试,在Java 8中增加的lambda表达式为Java的函数式api创造了基础。

3.特点

在理解WebFlux的特点前,我们先了解下响应式宣言。

3.1 响应式宣言(The Reactive Manifesto)

1. 快速响应(Responsive)
系统在各种情况下都会尽全力保证及时响应。它是可用性和实用性的基础, 还意味着问题能被迅速发现,并得到有效处理。

2. 回弹性(Resilient)
系统在面临故障时依然保持快速响应。它是通过复制(replication)、抑制(containment)、隔离(isolation)和委托(delegation)来实现的。
故障被抑制在单个组件中, 且各组件相互隔离, 使系统在部分失败和恢复时,可以不影响整体的功能。每个组件的恢复都委托给另一个(外部)组件, 高可用在必要时通过复制来保证。

3. 可伸缩性(Elastic)
系统在不同的负载下都能保持快速的响应。响应式系统可以根据服务的请求量, 动态增加或减少相应的资源。

4. 消息驱动(Message Driven)
响应式系统依赖异步的消息传递, 以确定各种组件的边界, 并确保松耦合(loose coupling)、隔离性(isolation)、位置透明性(location transparency), 并提供将错误封装为消息的手段。

3.2 Reactive(响应式)特点的关系

3.3 WebFlux的特点

1. 异步和非阻塞
支持了应用层的异步和底层的IO非阻塞。
在应用层,利用Reactive Stream定义的异步组件,实现了异步调用。
在底层,默认使用了Netty的NIO,实现了(同步)非阻塞。
这不会使程序运行更快,但是可以用少量的线程承受住高负载,节省线程内存资源和减少线程上下文切换CPU耗时,进而提升吞吐量。



2. 函数式编程
使用Lambda表达式和函数式接口来定义请求处理程序。
WebFlux.Fn是一种轻量级函数式编程模型,其中函数用于路由和处理请求,契约设计为不可变。它是基于注释的编程模型的另一种选择,但在其他方面运行在相同的Reactive Core基础上。
例如,RouterFunction相当于@RequestMapping注释。


3. 去servlet
允许可以不基于servlet API。
默认的Netty容器,不基于Servlet API。Servlet3.1支持了异步、非阻塞通信,因此,也可以选择使用Tomcat等容器,走Servlet API,但是,必须要使用WebFlux的框架代码。


二、应用接入

1.Reactor

1.1 Reactor是什么?

1.2 Mono和Flux

2.WebFlux实例

如果你是springboot的WebMVC项目,很容易就可以改为WebFlux项目。

2.1 pom.xml

2.2 Controller

2.允许返回非Mono、非Flux类型
但这样就是阻塞代码了。

3.非阻塞返回Mono、Flux

4.HttpServletRequest变成了ServerHttpRequest
response也是类似的变化。

2.3 map和flatMap

map: 对值进行映射转换, 返回值
如下图, 把list转换为RvResponseEntity
在这里插入图片描述

flatMap: 对值进行映射转换, 返回Mono/Flux
如下图, 把respStr转换为Mono
在这里插入图片描述

2.4 null

1.要避免null
XX.map(o->{…})
如果XX或者o是null, 代码运行时会报错

2.不能用empty()代替null

Flux.empty().map(objectFlux -> { 
      System.out.println("哈哈"); return "1"; }).count().subscribe(System.out::println); 

3.时刻考虑empty()
由于empty会导致map逻辑不执行,必须考虑empty的处理,使用defaultIfEmtpy/switchIfEmpty

2.5 性能


三、理论支撑

1.Amdahl定律

先了解几个概念:

Amdahl定律:
部件改进后,使得整个系统加速。系统的加速比:

Sn = 1 / ( (1-Fe)+Fe/Se)

Amdahl定律是指:

2.USL(Universal Scalability Law)

USL:

X(N)= X (1).N /(1+α(N-1)+β.N(N-1))

N表示并行度, X(N)表示吞吐量, α表示竞争系数(例如:0.02),β表示一致性系数(例如:0.00006)

USL指出,由于共享资源竞争和数据为了一致性产生的额外同步,系统可伸缩性存在临界点,临界点之后吞吐量开始降低。

基于USL,我们解释了WebFlux提升吞吐量的重要原因,就是:

利用NIO控制了线程数,进而减少了线程上下文切换等资源竞争


四、代码分析

1.观察者模式

在WebFlux的Reactive Stream的实现中,使用了观察者模式。

1.1 观察者模式(Observer Pattern)

核心步骤:
1.Subject, 添加观察者
例如:

subject.addObserver(Observer observer) 

2.Subject, 在subject.change()中通知观察者
例如:

 public void change(){ 
        XXX notifyAllObservers(); } private void notifyAllObservers(){ 
        for (Object obs:observers){ 
        observer.update(); } } 

1.2 Reactive Streams的发布订阅

对比一下:

观察者模式:
被观察者(或者叫主题Subject)发生改变时,通知所有的观察者(Observer)。

Reactive Streams:
发布者(Publisher)发布数据时,通知订阅者(Subscriber)。

Reactor Streams的核心步骤:
1. Publisher, 添加订阅者(即观察者)

 publisher.subscribe(subscriber) 

发布者.订阅消费者,此处subscribe的实际含义是添加订阅者

和MQ的写法对比下:

 consumer.subscribe(topic) 

消费者.订阅发布者的主题, 虽然都是subscribe,但是MQ的写法更贴近订阅的含义。

2. Publisher, request()中通知订阅者(即观察者)
Subscription用于连接Publisher和Subscriber。
在这里插入图片描述
在这一步中,和传统的观察者模式不同,
这是为了背压,请看下面的背压介绍~



2.背压

2.1 什么是背压( Back Pressure )

背压是,消费者反馈给生产者,生产的速率太高难以消费的机制。

2.2 背压的实现

2.3 Reactor的背压使用

Flux.range(0, 100).log().subscribe(new BaseSubscriber<Integer>() { 
        // 通过重写BaseSubscriber的方法来自定义Subscriber; @Override protected void hookOnSubscribe(Subscription subscription) { 
        // hookOnSubscribe定义在订阅的时候执行的操作; System.out.println("Subscribed and make a request..."); request(1); // 订阅时首先向上游请求1个元素; } @Override protected void hookOnNext(Integer value) { 
        // hookOnNext定义每次在收到一个元素的时候的操作; System.out.println("Get value [" + value + "]"); // 打印收到的元素; request(50); // 打印收到的元素; } }); 

2.limitRate
limitRage(2),效果等同于request(2),每次只要求Publisher生产2条数据。

Flux.range(0, 100).log().limitRate(2).subscribe(integer -> { 
        System.out.println(integer); }); 

3.onBackPressureBuffer/ onBackPressureDrop等
把发布的数据先放到buffer中,再从buffer中获取数据进行消费。

Flux.range(0, 100).log().onBackpressureBuffer(2).subscribe(integer -> { 
        System.out.println(integer); }); 

2.4 Spring框架的背压

1. Spring知道应该request多少业务数据?
Spring框架(如图)未提供业务侧,进行背压的编写。
在这里插入图片描述

Spring不知道应用服务应该request多少数据。

Spring使用TCP的流量控制来做基于字节的背压。

2. 跨网络的业务数据背压怎么做?
使用新的协议,协议中指出request多少,来实现业务应用级别的流量控制,例如:Rsocket。


五、发展现状

1.技术支持

2.WebFlux适用场景

1. WebFlux能干嘛?
官网:WebFlux并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。
实际,高并发下,BIO高并发的响应时间会变长,而WebFlux的平均响应时间相对稳定,效果如同缩短了响应时间。

2. 适用于IO密集型场景,主要是指网络IO密集
例如:微服务网关,网关用http进行服务转发,适合使用webClient实现非阻塞调用。

3. 不适用于CPU密集型场景

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/113120.html

(0)
上一篇 2026-01-14 10:10
下一篇 2026-01-14 10:20

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信