大家好,欢迎来到IT知识分享网。
本文所关联代码仓库:https://gitee.com/sulbert/actor-demo
一、Akka介绍
Akka是用Scala语言编写的Actor模型的一种实现。
(一)Actor模型
(二)Akka极简运用
1. 引入依赖
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.13</artifactId> <version>2.8.6</version> </dependency>
2. 初始化 ActorSystem
3. 创建ActorRef
ActorRef是Actor的代理对象,我们无法直接操作Actor对象,只能通过ActorRef来操作Actor。需要通过ActorSystem创建ActorRef
ActorRef actorRef1 = system.actorOf(Actor1初始化属性) ActorRef actorRef2 = system.actorOf(Actor2初始化属性) ActorRef actorRef3 = system.actorOf(Actor3初始化属性) ...
4. 使用ActorRef发送消息
4.1 tell方法
receiverRef.tell(message, senderRef);
其中:
● receiverRef:接收消息的ActorRef
● message:消息内容
● senderRef:发送消息的ActorRef
○ 如果收件人需要知道这封信是谁发的,那么就可以把senderRef设置为当前的ActorRef ○ 如果收件人没必要知道这封信是谁发的,那么就可以把senderRef设置为ActorRef.noSender()
4.2 ask方法
Future<Object> future = Patterns.ask(receiverRef, message, timeout);
其中:
● receiverRef:接收消息的ActorRef
● message:消息内容
● timeout:超时时间
● future:返回一个Future对象,与jdk的Future有些区别,该对象有回调事件。具体使用参考单元测试
二、自定义Actor
参考了TB的actor,修改了部分内容。
(一)Actor系统架构
说明:
- Actor消息入口的几种情况:
系统初始化时自动发送消息。
用户使用业务上下文对象来发送消息。
系统初始化或用户手动创建Actor定时任务。
在TB中,系统初始化时会创建一个Actor去定时发送与其他服务的session存活性检测消息,我们系统中没有这个需求,所以就去掉了。 - 关于Actor业务系统
每个Actor都应当定义自己的ActorId和ActorMsg,只有确定了这两者,Actor系统的消息分发器才能正确地分发消息。
ActorId包含两层含义:
① ActorBizType biztype(): 指定Actor的业务类型。
② Serializable id(): 在当前业务类型中,id是唯一的。
ActorMsg则按照业务需求自行定义。这里特别说明一下,如果该消息是某个租户下的消息,那么定义ActorMsg时,要给其添加tenantId属性,否则分发器无法找到对应的租户进行消息分发。 - 消息分发,一个消息进入actor系统后,都是经由RootActor分发到租户或非租户Actor,再分发到相应Actor处理.
- Actor系统的内容先忽略,下文再进行分析
- 关于BizContextAwareActor
该类提供了Actor业务上下文对象,该对象可以用来发消息。在上述消息分发示意图中,所有Actor都继承自BizContextAwareActor,因此才拥有找到下一级Actor,并发送消息的能力。此外,Actor在处理具体业务时,也能通过业务上下文对象中封装的业务Service进行业务处理。
(二)目录说明
. ├── ActorStarter.java # SpringBoot启动累 ├── base # Actor系统与业务系统交界的基础目录 │ ├── ActorBizContext.java # Actor业务上下文对象 │ └── BizContextAwareActor.java # 业务上下文Actor基类,所有业务Actor都应当继承该类 ├── demo # 该目录下的内容用于演示 │ ├── actors │ │ ├── calc │ │ │ ├── CalcActor.java │ │ │ ├── CalcActorId.java │ │ │ ├── CalcMsg.java │ │ │ ├── TenantCalcActor.java │ │ │ ├── TenantCalcActorId.java │ │ │ └── TenantCalcMsg.java │ │ ├── root # 根actor目录,该Actor是全局唯一的 │ │ │ ├── ActorSysProperties.java │ │ │ ├── RootActor.java │ │ │ └── RootInitMsg.java │ │ └── tenant │ │ ├── NonTenantActor.java │ │ ├── TenantActor.java │ │ ├── TenantActorId.java │ │ └── TenantChangedMsg.java │ ├── entity │ │ └── TenantEntity.java │ ├── enumeration │ │ └── CalcType.java │ ├── lifecycle # Actor生命周期管理目录 │ │ └── ActorLifecycle.java │ └── service │ ├── TenantService.java │ └── impl │ └── TenantServiceImpl.java ├── model # Actor模型 │ ├── Actor.java # Actor接口,定义初始化方法与消息执行方法 │ ├── ActorCreator.java # 用于创建ActorId和Actor │ ├── ActorId.java # 用于标识Actor的唯一性 │ ├── ActorMsg.java # Actor处理的消息 │ ├── ActorRef.java # Actor的代理 │ ├── ActorSysContext.java # Actor系统上下文,实现类为ActorMailBox,用于代理Actor执行消息发送和消费 │ ├── ActorSystem.java # Actor系统,定义了消息分发器,定义了父子Actor的创建行为,管理分发器、actor、actor父子关系等 │ ├── core │ │ ├── AbstractActor.java │ │ ├── AbstractActorCreator.java │ │ ├── AbstractActorId.java │ │ ├── ActorDispatcher.java # 消息分发器,每条消息都经由分发器传递到邮箱 │ │ ├── ActorMailbox.java # Actor邮箱,持有消息队列,可以执行消息的收发 │ │ ├── ActorSystemSettings.java │ │ ├── DefaultActorSystem.java │ │ ├── InitFailureStrategy.java │ │ └── ProcessFailureStrategy.java │ ├── enumeration │ │ ├── ActorBizType.java # actor业务类型枚举,我的案例中是随便写了几个。真正使用时,需要再推敲推敲 │ │ ├── ActorMsgType.java # 消息类型枚举,通常都是一个Actor对应一种消息类型 │ │ └── ActorStopReason.java │ └── exceptions │ ├── ActorException.java │ └── ActorNotRegisteredException.java
(三)Actor系统类图
(四)初始化过程
part1: 在 ActorLifecycle 这个类中进行
- 读取配置,并构建为系统设置对象
- 基于系统设置对象创建ActorSystem
- 创建消息分发器(dispatcher)
- 创建RootActor
- 把ActorSystem和RootActor对象添加到业务上下文中
part2: 在part1中第4步创建RootActor的过程包含以下内容:
- 通过ActorCreator来创建Actor
- 创建邮箱:当前Actor对应的ActorMailbox
- 初始化actor:将邮箱设置为当前actor的“actorSysContext”
- 如果没有在初始化actor过程中kill程序,则会尝试消费当前actor邮箱中的消息
(五)使用说明
- 自定义消息:需要实现ActorMsg接口
/ * 计算消息 * @param a 第一个数 * @param b 第二个数 * @param calcType 计算方式枚举:加减乘除 */ public record CalcMsg(int a, int b, CalcType calcType) implements ActorMsg {
/ * 能标识消息唯一性即可,这里仅作为例子 */ public String getCalcId() {
return a + calcType.name() + b; } @Override public ActorMsgType getMsgType() {
return ActorMsgType.CALC; } }
- 注入业务上下文对象:ActorBizContext
- 调用tell或者tellImportant方法,参数为自定义的消息
@SpringBootTest( classes = ActorStarter.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class CustomActorTest {
@Resource private ActorBizContext actorBizContext; @Test public void testNonTenant() {
actorBizContext.tell(new CalcMsg(10, 2, CalcType.ADD)); actorBizContext.tell(new CalcMsg(10, 2, CalcType.SUBTRACT)); actorBizContext.tell(new CalcMsg(10, 2, CalcType.MULTIPLY)); actorBizContext.tell(new CalcMsg(10, 2, CalcType.DIVIDE)); actorBizContext.tellImportant(new CalcMsg(15, 5, CalcType.ADD)); actorBizContext.tellImportant(new CalcMsg(15, 5, CalcType.SUBTRACT)); actorBizContext.tellImportant(new CalcMsg(15, 5, CalcType.MULTIPLY)); actorBizContext.tellImportant(new CalcMsg(15, 5, CalcType.DIVIDE)); } @Test public void testTenant() {
actorBizContext.tell(new TenantCalcMsg(1L, 10, 2, CalcType.ADD)); actorBizContext.tell(new TenantCalcMsg(1L, 10, 2, CalcType.SUBTRACT)); actorBizContext.tell(new TenantCalcMsg(2L, 10, 2, CalcType.MULTIPLY)); actorBizContext.tell(new TenantCalcMsg(2L, 10, 2, CalcType.DIVIDE)); actorBizContext.tellImportant(new TenantCalcMsg(1L, 15, 5, CalcType.ADD)); actorBizContext.tellImportant(new TenantCalcMsg(1L, 15, 5, CalcType.SUBTRACT)); actorBizContext.tellImportant(new TenantCalcMsg(2L, 15, 5, CalcType.MULTIPLY)); actorBizContext.tellImportant(new TenantCalcMsg(2L, 15, 5, CalcType.DIVIDE)); } }
与案例一中的Akka特性不同,我们参考TB自定义的Actor消息接收者无法直接得知消息发送者(要知道也可以,只是需要对接口做些调整)。
此外,消息处理过程中,由于设置状态采用了CAS锁机制,所以在高并发情况下CPU比较容易飙升。可以通过下述测试,监控系统CPU来观察到效果
@Test public void testPerformance() throws InterruptedException {
for (int i = 0; i < ; i++) {
actorBizContext.tell(new CalcMsg(i, i + 1, randomCalcType())); } TimeUnit.SECONDS.sleep(10); } private CalcType randomCalcType() {
return CalcType.values()[new java.util.Random().nextInt(CalcType.values().length)]; }
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/125000.html