大家好,欢迎来到IT知识分享网。
文章目录
一、WebFlux简介
1.是什么?
WebFlux的官网地址
PS:非阻塞请参照此文章《彻底理解非阻塞IO(NIO)》
2.为何诞生?
部分原因是需要一种非阻塞式的Web技术栈,以少量线程处理并发,并使用更少的硬件资源进行伸缩。
另一部分原因是函数式编程已经有了基础。带注释的REST控制器或单元测试,在Java 8中增加的lambda表达式为Java的函数式api创造了基础。
3.特点
在理解WebFlux的特点前,我们先了解下响应式宣言。
3.1 响应式宣言(The Reactive Manifesto)
1. 快速响应(Responsive)
系统在各种情况下都会尽全力保证及时响应。它是可用性和实用性的基础, 还意味着问题能被迅速发现,并得到有效处理。
2. 回弹性(Resilient)
系统在面临故障时依然保持快速响应。它是通过复制(replication)、抑制(containment)、隔离(isolation)和委托(delegation)来实现的。
故障被抑制在单个组件中, 且各组件相互隔离, 使系统在部分失败和恢复时,可以不影响整体的功能。每个组件的恢复都委托给另一个(外部)组件, 高可用在必要时通过复制来保证。
3. 可伸缩性(Elastic)
系统在不同的负载下都能保持快速的响应。响应式系统可以根据服务的请求量, 动态增加或减少相应的资源。
4. 消息驱动(Message Driven)
响应式系统依赖异步的消息传递, 以确定各种组件的边界, 并确保松耦合(loose coupling)、隔离性(isolation)、位置透明性(location transparency), 并提供将错误封装为消息的手段。
3.2 Reactive(响应式)特点的关系
3.3 WebFlux的特点
1. 异步和非阻塞
支持了应用层的异步和底层的IO非阻塞。
在应用层,利用Reactive Stream定义的异步组件,实现了异步调用。
在底层,默认使用了Netty的NIO,实现了(同步)非阻塞。
这不会使程序运行更快,但是可以用少量的线程承受住高负载,节省线程内存资源和减少线程上下文切换CPU耗时,进而提升吞吐量。
2. 函数式编程
使用Lambda表达式和函数式接口来定义请求处理程序。
WebFlux.Fn是一种轻量级函数式编程模型,其中函数用于路由和处理请求,契约设计为不可变。它是基于注释的编程模型的另一种选择,但在其他方面运行在相同的Reactive Core基础上。
例如,RouterFunction相当于@RequestMapping注释。
3. 去servlet
允许可以不基于servlet API。
默认的Netty容器,不基于Servlet API。Servlet3.1支持了异步、非阻塞通信,因此,也可以选择使用Tomcat等容器,走Servlet API,但是,必须要使用WebFlux的框架代码。
二、应用接入
1.Reactor
1.1 Reactor是什么?
1.2 Mono和Flux
2.WebFlux实例
如果你是springboot的WebMVC项目,很容易就可以改为WebFlux项目。
2.1 pom.xml
2.2 Controller
2.允许返回非Mono、非Flux类型
但这样就是阻塞代码了。
3.非阻塞返回Mono、Flux
4.HttpServletRequest变成了ServerHttpRequest
response也是类似的变化。
2.3 map和flatMap
map: 对值进行映射转换, 返回值
如下图, 把list转换为RvResponseEntity
flatMap: 对值进行映射转换, 返回Mono/Flux
如下图, 把respStr转换为Mono
2.4 null
1.要避免null
XX.map(o->{…})
如果XX或者o是null, 代码运行时会报错
2.不能用empty()代替null
Flux.empty().map(objectFlux -> {
System.out.println("哈哈"); return "1"; }).count().subscribe(System.out::println);
3.时刻考虑empty()
由于empty会导致map逻辑不执行,必须考虑empty的处理,使用defaultIfEmtpy/switchIfEmpty
2.5 性能
三、理论支撑
1.Amdahl定律
先了解几个概念:
Amdahl定律:
部件改进后,使得整个系统加速。系统的加速比:
Sn = 1 / ( (1-Fe)+Fe/Se)
Amdahl定律是指:
2.USL(Universal Scalability Law)
USL:
X(N)= X (1).N /(1+α(N-1)+β.N(N-1))
N表示并行度, X(N)表示吞吐量, α表示竞争系数(例如:0.02),β表示一致性系数(例如:0.00006)
USL指出,由于共享资源竞争和数据为了一致性产生的额外同步,系统可伸缩性存在临界点,临界点之后吞吐量开始降低。
基于USL,我们解释了WebFlux提升吞吐量的重要原因,就是:
利用NIO控制了线程数,进而减少了线程上下文切换等资源竞争
四、代码分析
1.观察者模式
在WebFlux的Reactive Stream的实现中,使用了观察者模式。
1.1 观察者模式(Observer Pattern)
核心步骤:
1.Subject, 添加观察者
例如:
subject.addObserver(Observer observer)
2.Subject, 在subject.change()中通知观察者
例如:
public void change(){
XXX notifyAllObservers(); } private void notifyAllObservers(){
for (Object obs:observers){
observer.update(); } }
1.2 Reactive Streams的发布订阅
对比一下:
观察者模式:
被观察者(或者叫主题Subject)发生改变时,通知所有的观察者(Observer)。
Reactive Streams:
发布者(Publisher)发布数据时,通知订阅者(Subscriber)。
Reactor Streams的核心步骤:
1. Publisher, 添加订阅者(即观察者)
publisher.subscribe(subscriber)
发布者.订阅消费者,此处subscribe的实际含义是添加订阅者。
和MQ的写法对比下:
consumer.subscribe(topic)
消费者.订阅发布者的主题, 虽然都是subscribe,但是MQ的写法更贴近订阅的含义。
2. Publisher, request()中通知订阅者(即观察者)
Subscription用于连接Publisher和Subscriber。
在这一步中,和传统的观察者模式不同,
这是为了背压,请看下面的背压介绍~
2.背压
2.1 什么是背压( Back Pressure )
背压是,消费者反馈给生产者,生产的速率太高难以消费的机制。
2.2 背压的实现
2.3 Reactor的背压使用
Flux.range(0, 100).log().subscribe(new BaseSubscriber<Integer>() {
// 通过重写BaseSubscriber的方法来自定义Subscriber; @Override protected void hookOnSubscribe(Subscription subscription) {
// hookOnSubscribe定义在订阅的时候执行的操作; System.out.println("Subscribed and make a request..."); request(1); // 订阅时首先向上游请求1个元素; } @Override protected void hookOnNext(Integer value) {
// hookOnNext定义每次在收到一个元素的时候的操作; System.out.println("Get value [" + value + "]"); // 打印收到的元素; request(50); // 打印收到的元素; } });
2.limitRate
limitRage(2),效果等同于request(2),每次只要求Publisher生产2条数据。
Flux.range(0, 100).log().limitRate(2).subscribe(integer -> {
System.out.println(integer); });
3.onBackPressureBuffer/ onBackPressureDrop等
把发布的数据先放到buffer中,再从buffer中获取数据进行消费。
Flux.range(0, 100).log().onBackpressureBuffer(2).subscribe(integer -> {
System.out.println(integer); });
2.4 Spring框架的背压
1. Spring知道应该request多少业务数据?
Spring框架(如图)未提供业务侧,进行背压的编写。
Spring不知道应用服务应该request多少数据。
Spring使用TCP的流量控制来做基于字节的背压。
2. 跨网络的业务数据背压怎么做?
使用新的协议,协议中指出request多少,来实现业务应用级别的流量控制,例如:Rsocket。
五、发展现状
1.技术支持
2.WebFlux适用场景
1. WebFlux能干嘛?
官网:WebFlux并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。
实际,高并发下,BIO高并发的响应时间会变长,而WebFlux的平均响应时间相对稳定,效果如同缩短了响应时间。
2. 适用于IO密集型场景,主要是指网络IO密集
例如:微服务网关,网关用http进行服务转发,适合使用webClient实现非阻塞调用。
3. 不适用于CPU密集型场景
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/113120.html



