大家好,欢迎来到IT知识分享网。
1、什么是akka
akka是一个构建高并发、分布式、可容错、事件驱动的开发库,它可以使构建高并发分布式应用更加容易。
akka的特性:
- 易于构建并行和分布式应用: akka的核心组件是actor,actor被设计成独立的,每个actor之间互相不影响,没有了线程间资源共享的问题。
- 可靠性: akka中的actor之间有层级关系,actor有专门的进程监护。
- 高性能: actor占用内存非常小,1GB内存中可以保存上百万的actor,相对于传统的线程模型有很高的性能提升,java21已经推出了虚拟线程也可以支持海量任务处理了。
2、什么是actor
actor可以理解为做事情的,当系统收到一个需要处理的任务时,就会交给actor进行处理,actor具有层级关系,是一个树形结构,父节点负责子actor的创建、销毁和状态管理,在akka中,所有actor的根是ActorSystem,在系统中,ActorSystem需要被设计成单例模式。
3、使用案例
在使用时,要注意akka的版本,高版本中的一些api与低版本的api并不通用(大版本号好像是2.6),这里需要注意。目前很多软件都是使用低版本的api,这里就总结一些低版本api的使用:
需求如下:计算两个数的加减乘除,master端提供两个数和运算符,将任务提交到worker端进行计算,worker端按照要求将计算结果返回给master端。
- 引入依赖:
<!-- 基础actor依赖 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.3.15</version> </dependency> <!-- 集群依赖 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster_2.11</artifactId> <version>2.3.15</version> </dependency> <!-- 远程调用依赖 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.11</artifactId> <version>2.3.15</version> </dependency>
- 构建一个公用类:存储两个数、运算符、运算结果,类结构如下:
import java.io.Serializable; public class MathCalculate implements Serializable {
/ * 数字1 */ private int num1; / * 数字2 */ private int num2; / * 计算结果 */ private int result; / * 操作类型 */ private OperateEnum type; / * 计算结果对应的运算符 */ private String op; public MathCalculate() {
} public MathCalculate(int result, OperateEnum type) {
this.result = result; this.type = type; } public MathCalculate(int num1, int num2, OperateEnum type) {
this.num1 = num1; this.num2 = num2; this.type = type; } public MathCalculate(int num1, int num2, int result, OperateEnum type) {
this.num1 = num1; this.num2 = num2; this.result = result; this.type = type; } public int getNum1() {
return num1; } public void setNum1(int num1) {
this.num1 = num1; } public int getNum2() {
return num2; } public void setNum2(int num2) {
this.num2 = num2; } public int getResult() {
return result; } public void setResult(int result) {
this.result = result; } public OperateEnum getType() {
return type; } public void setType(OperateEnum type) {
this.type = type; } public String getOp() {
return op; } public void setOp(String op) {
this.op = op; } public static enum OperateEnum {
ADD("+"), SUBTRACT("-"), MULTIPLY("*"), DIVIDE("/"), RESULT("="), ; private String val; OperateEnum(String val) {
this.val = val; } public String getVal() {
return val; } } }
- master端:master端创建一个akka环境,指定服务地址和端口号,并继承Actor用于处理结果信息:
master端的actor内容:
import akka.actor.UntypedActor; import com.alibaba.fastjson.JSONObject; public class MasterResultActor extends UntypedActor {
@Override public void preStart() throws Exception {
System.out.println("MasterResultActor invoker"); } @Override public void onReceive(Object message) {
MathCalculate math = null; if(message instanceof String) {
String json = (String) message; math = JSONObject.parseObject(json, MathCalculate.class); } else if(message instanceof MathCalculate) {
math = (MathCalculate) message; } System.out.println("calculate result: " + math.getNum1() + " " + math.getOp() + " " + math.getNum2() + " = " + math.getResult()); } }
master端的主类内容:
import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.HashMap; import java.util.Map; public class MasterCalculateTest {
public static void main(String[] args) {
Map<String, Object> propMap = new HashMap<>(); propMap.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); propMap.put("akka.remote.netty.tcp.hostname", "127.0.0.1"); propMap.put("akka.remote.netty.tcp.port", 8989); Config config = ConfigFactory.parseMap(propMap); ActorSystem system = ActorSystem.create("master-server", config); ActorRef masterActor = system.actorOf(Props.create(MasterResultActor.class), "master-actor"); ActorSelection actorSelection = system.actorSelection("akka.tcp://math-worker@127.0.0.1:8899/user/worker-actor"); for(int i =0; i < 5; i++) {
final int num = i; new Thread(() -> actorSelection.tell(new MathCalculate(num, num, MathCalculate.OperateEnum.ADD), masterActor)).start(); } } }
- worker端:worker端也需要创建一个akka环境,用于接收master端的任务,并且创建一个actor用于计算任务结果:
worker端的actor内容:
import akka.actor.UntypedActor; import com.alibaba.fastjson.JSONObject; public class WorkerCalculateActor extends UntypedActor {
@Override public void onReceive(Object message) {
// System.out.println("sender info : " + getSender().path().toString()); MathCalculate math = null; if(message instanceof String) {
String json = (String) message; math = JSONObject.parseObject(json, MathCalculate.class); } else if(message instanceof MathCalculate) {
math = (MathCalculate) message; } if(math != null) {
int result = 0; switch (math.getType()) {
case ADD: result = math.getNum1() + math.getNum2(); break; case SUBTRACT: result = math.getNum1() - math.getNum2(); break; case DIVIDE: result = math.getNum1() / math.getNum2(); break; case MULTIPLY: result = math.getNum1() * math.getNum2(); break; case RESULT: result = math.getResult(); break; } if(math.getType() != MathCalculate.OperateEnum.RESULT) {
MathCalculate rs = new MathCalculate(math.getNum1(), math.getNum2(), result, MathCalculate.OperateEnum.RESULT); rs.setOp(math.getType().getVal()); getSender().tell(rs, this.getSelf()); System.out.println("worker calculator: " + math.getNum1() + " " + math.getType().getVal() + " " + math.getNum2() + " = " + result); } } } }
worker端的主类:
import akka.actor.ActorSystem; import akka.actor.Props; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.HashMap; import java.util.Map; public class MathWorkerTest {
public static void main(String[] args) {
Map<String, Object> propMap = new HashMap<>(); propMap.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); propMap.put("akka.remote.netty.tcp.hostname", "127.0.0.1"); propMap.put("akka.remote.netty.tcp.port", 8899); Config config = ConfigFactory.parseMap(propMap); ActorSystem system = ActorSystem.create("math-worker", config); system.actorOf(Props.create(WorkerCalculateActor.class), "worker-actor"); } }
上面就是一个akka使用示例,运行示例代码首先启动worker端,等待master端下发任务进行计算,当worker端收到任务并生成计算结果了,在将计算结果返回给master,在这个过程中,都是通过 tell 方法进行交互的。
4、相关api介绍
4.1 actor的获取
在使用akka框架时,最重要的就是actor的创建和调用,下面总结一下都可以通过哪些api可以获取到actor:
1、通过 ActorSystem 的 actorOf() 方法获取到actor,这个api是将一个actor实例化并将它注入到系统中,这样他就可以被本地或远程调用了:
ActorSystem system = ActorSystem.create("master-server", config); ActorRef masterActor = system.actorOf(Props.create(MasterResultActor.class), "master-actor");
2、通过 ActorSystem 的 actorSelection() 方法获取actor,这种方法可以获取到一个已经注入到系统中的actor或一个远程的actor,通过该actor就可以发送任务:
ActorSystem system = ActorSystem.create("master-server", config); ActorSelection actorSelection = system.actorSelection("akka.tcp://math-worker@127.0.0.1:8899/user/worker-actor");
3、在actor实例内部,可以通过 this.getContext() 获取到应用的上下文,通过上下文环境像 ActorSystem 一样也可以创建actor:
ActorRef actor1 = this.getContext().actorOf(Props.create(MasterResultActor.class), "actor-name1"); ActorSelection actor2 = this.getContext().actorSelection("akka.tcp://math-worker@127.0.0.1:8899/user/worker-actor");
4、在actor实现类内部,可以获取到自身actor或消息发送者actor:
ActorRef self = this.getSelf(); ActorRef sender = this.getSender();
4.2 tell()方法
在akka中进行服务间通信,需要调用actor的tell方法:
remoteActor.tell("Hello,world!", localActor);
这里的remoteActor就是要调用的远程actor,localActor是本地actor,当远端执行完成后,就可以在里面调用this.getSender()获取到它,将结果通知回来。
4.3 远程url结构
在akka中调用远程actor时,就需要构建远程actor的链接,它的格式大概如下:
"akka.tcp://math-worker@127.0.0.1:8899/user/worker-actor"
akka.tcp : 这个格式是固定的协议头部。
math-worker : 这个是不确定的,它其实就是ActorSystem创建时指定的名称,如果名称不同在构建url时也要进行相应的修改。
127.0.0.1:8899 : 这个是服务的IP和端口号,在构建ActorSystem时指定。
/user/ : 固定格式。
worker-actor : 注册ActorRef时指定的actor名称,actor就是根据这个名称查找。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/118925.html