蚂蚁金服分布式事务框架DTX源码学习

蚂蚁金服分布式事务框架DTX源码学习文章目录一 前言二 DTX 简介三 角色四 服务发起者与参与者 DTX 客户端启动流程 1 项目启动 创建 dtx 动态代理 2 初始化 DtxClient 客户端的 init 方法五 服务发起以及参与流程一 前言之前因工作原因 公司

大家好,欢迎来到IT知识分享网。

一、前言

之前因工作原因(公司购买了阿里的金融中间服务),接触一段DTX,通过反编译看了一些DTX的源码并记录一些笔记,但是并不完全,由于DTX-SERVER阿里只是提供了一个很简单的测试demo,加上时间等因素,主要只是看了下客户端的启动流程,至于DTX的发起以及参与流程并没有详细跟踪,仅作为自己学习过程中的一点记录和历程。

二、DTX简介

三、角色

DTX框架主要以下三个角色
1.服务参与者:以sofa-rpc方式发布服务的服务提供者,走的是bolt协议
2.DTX-SERVER:DTX事务管理控制器,是一个独立的服务,相当于一个事务管理中间件
3.服务发起者:以sofa-rpc方式外调服务的服务消费者,走的是bolt协议

四、服务发起者与参与者DTX客户端启动流程

1、项目启动,创建dtx动态代理

服务发起者或参与者(sofaboot项目)启动后,spring会扫描com.alipay.sofa.dtx.client.aop.ComponentScanner类(此类上没有spring组件注解,需手动在xml中配置),创建dtx动态代理

public class ComponentScanner extends AbstractAutoProxyCreator implements ApplicationContextAware,PriorityOrdered, InitializingBean, ApplicationListener<ApplicationEvent>, BeanFactoryPostProcessor { ....... @Override public void afterPropertiesSet() throws Exception { initDtxClient(); } / * 初始化 dtx-client * @throws Exception */ private void initDtxClient() throws Exception { //初始化dtx 客户端 DtxClientFactory.initDtxClient(applicationContext); this.inited.set(true); logger.info("DTX client init finish."); } ....... } 

2、初始化DtxClient客户端的init()方法

ComponentScanner实现了InitializingBean接口,spring初始化bean的时候会执行afterPropertiesSet()方法,初始化dtx 客户端,接下来会执行com.alipay.dtx.client.core.DtxClient类的init()方法

public class DtxClient implements ServersRefreshHandler { ........ / * 初始化网络,注册资源 * @throws Exception */ private void init() throws Exception { //初始化rpc ,建立长连接 rpcClient = DtxClientFactory.getRpcClient(); ((DefaultRpcClient)rpcClient).setServersRefreshHandler(this); ((DefaultRpcClient)rpcClient).setServerMessageHandler(DtxClientFactory.getServerMessageHandler(applicationContext)); //dtx-rpc client init rpcClient.init(); initReourceManager(); //消息发送 messageSender = DefaultMessageSender.getInstance(); ((DefaultMessageSender)messageSender).setRpcClient(rpcClient); userDefinedResourceManager.setMessageSender(messageSender); autoResourceManager.setMessageSender(messageSender); DtxServiceImpl.setDtxTM(DtxTransactionManager.getInstance()); } / * 初始化资源管理器 */ private void initReourceManager(){ //初始化资源管理器 userDefinedResourceManager = UserDefinedResourceManager.getInstance(); userDefinedResourceManager.setApplicationContext(applicationContext); userDefinedResourceManager.init(rpcClient); autoResourceManager = DtxClientFactory.getAutoResourceManager(); autoResourceManager.init(rpcClient); if(CollectionUtil.isNotEmpty(serverConnections)) { userDefinedResourceManager.onServerConnectionsRefresh(serverConnections) ; autoResourceManager.onServerConnectionsRefresh(serverConnections); } } ........ } / * user-defined resource manager * * 用户自定义资源管理 * * @author zhangsen * */ public class UserDefinedResourceManager extends AbstractResourceManager implements UserDefinedResourceListener,StateResolverManager { ........ / * 初始化 */ public void init(IRpcClient rpcClient) { setRpcClient(rpcClient); //将所有资源注册至所有dtx-server registerResource(); //消息发送 messageSender = DefaultMessageSender.getInstance(); ((DefaultMessageSender)messageSender).setRpcClient(rpcClient); //设置自定义资源监听器,当有新的自定义资源时,触发自定义资源重新想所有dtx-server注册 UserDefinedResourceHolder.getInstance().setUserDefinedResourceListener(this); inited = true; } / * 向dtx-server注册所有的自定义资源 */ @Override public synchronized boolean registerResource() { boolean result = true; //回查 资源 if(stateResolverDesc != null){ String resourceId = stateResolverDesc.getAppName(); try { //自定义资源注册至所有dtx-server boolean ret = registerResourceToAllServer(resourceId, NetworkUtil.getLocalIP(), ResourceType.STATE_RESOLVER); if(ret){ stateResolverHolders.put(resourceId, stateResolverDesc); } logger.info("state-resolver resource registered, register result:"+ ret +",resourceId:" + resourceId + ", state resolver detail:" + JSON.toJSONString(stateResolverDesc)); result &= ret; } catch (Exception e) { logger.error("register state-resolver resource error, state resolver detail" + JSON.toJSONString(stateResolverDesc), e); result &= false; } } //自定义资源 if(actions == null || actions.size() == 0){ logger.warn("no user-defined resource need to register."); return false; } Map<String,UserDefinedResourceDesc> tempRegistedResourceHolders = new ConcurrentHashMap<String,UserDefinedResourceDesc>(); // 自定义资源注册 for(ActionDesc action : actions) { String serviceId = action.getServiceId(); UserDefinedResourceDesc userDefinedResourceHolder = new UserDefinedResourceDesc(); userDefinedResourceHolder.setPrepareMethod(action.getPrepareMethod()); userDefinedResourceHolder.setCommitMethod(action.getCommitMethod()); userDefinedResourceHolder.setRollbackMethod(action.getRollbackMethod()); userDefinedResourceHolder.setActionName(action.getActionName()); userDefinedResourceHolder.setTargetBean(action.getTargetBean()); userDefinedResourceHolder.setTccType(action.getTccType()); userDefinedResourceHolder.setInterfaceClass(action.getInterfaceClass()); userDefinedResourceHolder.setTargetBeanName(action.getTargetBeanName()); userDefinedResourceHolder.setDataSourceBeanId(action.getDataSourceBeanId()); try { //自定义资源注册至所有dtx-server boolean ret = registerResourceToAllServer(serviceId, NetworkUtil.getLocalIP(), ResourceType.USER_DEFINE); if(ret){ tempRegistedResourceHolders.put(serviceId, userDefinedResourceHolder); } logger.info("user-defined resource registered, register result:"+ ret +",resourceId:" + serviceId ); result &= ret; } catch (Exception e) { logger.error("register user-defined resource error", e); result &= false; } } registedResourceHolders.clear(); registedResourceHolders.putAll(tempRegistedResourceHolders); return result; } ........ } 

1、DtxClientFactory.getRpcClient()会返回一个com.alipay.dtx.rpc.client.impl.DefaultRpcClient对象(单例),用于订阅dtx-server地址,建立到dtx-server的长连接;

2、setServersRefreshHandler方法设置了dtx-server发生变化的处理器,当dtx-server发生变化后,新建连接、 断开重连、dtx-server地址重新推送等一系列操作;

3、setServerMessageHandler方法设置了消息处理器com.alipay.dtx.resourceManager.handle.ServerMessageHandler,用于处理从dtx-server返回的消息;

4、initReourceManager初始化资源管理器,向dtx-server注册自定义资源以及自动资源并设置消息发送器DefaultMessageSender

下面进入rpcClient.init()看下:

public class DefaultRpcClient implements IRpcClient,ServerAddressListener { ........ @Override public void init() throws Exception { logger.info("DTX rpc init start."); //长连接管理器 connectionManager = RpcConnectionManagerImpl.getInstance(serverMessageHandler); connectionManager.setServersRefreshHandler(serversRefreshHandler); //服务器地址管理 serverAddressManaager = ServerAddressManager.getInstance(); serverAddressManaager.setAddressListener(this); serverAddressManaager.init(); inited.set(true); logger.info("dtx-rpc init finish."); } / * 接收dtx-server的ip地址;创建 到dtx-server的长连接 */ @Override public boolean onAddressMessage(List<ServerInfo> serverAdress) { logger.info("receive dtx-server address : " + JSON.toJSONString(serverAdress)); boolean connResult = onServerAddressChanged(serverAdress); boolean connResultForDtxSdk = onServerAddressChangedForDtxSdk(serverAdress); return connResultForDtxSdk && connResult; } / * 收到dtx-server地址,创建长连接 * @param serverAdress * @return */ private boolean onServerAddressChanged(List<ServerInfo> serverAdresses) { //检查并创建 长连接 if(serverAdresses == null || serverAdresses.size() == 0 ){ logger.warn("dtx-server ip list is empty."); return false; } try{ //创建并保存连接 boolean ret = connectionManager.connect(serverAdresses, RuntimeConfiguration.getServerPort()); if(ret) { //dtx-server 重新链接完成,回调dtx-server刷新处理器 serversRefreshHandler.onServerConnectionsRefresh(connectionManager.getConnections()); logger.info("create long-connection success. dtx-server:" + JSON.toJSONString(serverAdresses)); }else { logger.warn("creating long-connection to server has some failed. dtx-server:" + JSON.toJSONString(serverAdresses)); } return ret; }catch(Throwable t){ logger.error("create long-connection error,serverAdresses:" + JSON.toJSONString(serverAdresses), t); } return false; } ........ } * 建立、维护 长连接 public class RpcConnectionManagerImpl implements RpcConnectionManager { ........ / * 单实例 */ private static RpcConnectionManagerImpl instance; public synchronized static RpcConnectionManagerImpl getInstance(MessageHandler messageHandler){ if(instance == null) { instance = new RpcConnectionManagerImpl(messageHandler); instance.init(); } return instance; } / * 初始化 */ private void init(){ //bolt client init System.setProperty(Configs.CONN_RECONNECT_SWITCH, "true"); rpcClient = new RpcClient(); // rpcClient.enableReconnectSwitch(); rpcClient.init(); //server message listner rpcClient.registerUserProcessor(new ServerMessageProcessor(this.messageHandler, StateResolverRequest.class.getName())); rpcClient.registerUserProcessor(new ServerMessageProcessor(this.messageHandler, BranchCommitRequest.class.getName())); rpcClient.registerUserProcessor(new ServerMessageProcessor(this.messageHandler, BranchRolbackRequest.class.getName())); rpcClient.registerUserProcessor(new JsonMsgUserProcessor(this.messageHandler)); //异步批量消息发送线程 new OnwaySender(this).start(); //鉴权 authorization = AuthorizationImpl.getInstance(rpcClient); //初始化完成 isInited.set(true); } / * 待批量发送的消息; */ protected final static BlockingQueue<Message> batchMessageQueue = new ArrayBlockingQueue<Message>(RpcConstants.DEFAULT_QUEUE_SIZE); / * 异步批量消息发送线程 * * @author zhangsen * */ public static class OnwaySender extends Thread { private RpcConnectionManager rpcConnectionManager; public OnwaySender(RpcConnectionManager rpcConnectionManagerImpl){ this.setDaemon(true); this.setName("dtx-onway-sender"); this.rpcConnectionManager = rpcConnectionManagerImpl; } @Override public void run() { while (true) { try{ //获取待批量发送消息 List<Message> list = new ArrayList<Message>(); synchronized(batchMessageQueue) { int i = 0; while((i++) < RpcConstants.DEFAULT_BATCH_SIZE) { Message msg = batchMessageQueue.poll(); if(msg != null) { list.add(msg); }else { //队列已无数据 break; } } } //消息发送 if(list.size() != 0) { BatchMessage batchMessage = new BatchMessage(list); //消息发送 Message ret = null; try{ ret = rpcConnectionManager.invoke(null, batchMessage, RpcConstants.DTX_REQUEST_TIMEOUT*2, RpcConstants.DEFAULT_RETRY_NUM*3); logger.debug("Batch message sent, batchMessage:" + JSON.toJSONString(batchMessage) + ", result:" + JSON.toJSONString(ret)); }catch(Throwable t){ logger.error("Batch message sent error, batchMessage:" + JSON.toJSONString(batchMessage), t); } }else { logger.debug("Batch message sent, batchMessage: 0"); } }catch(Throwable t){ logger.error("Onway-sender error", t); } try{ //30s 周期 Thread.sleep(RpcConstants.DEFAULT_ONWAY_SENDERR_PERIOD); }catch(Exception e){ logger.error("Thread.sleep error", e); } } } } / * 获取 或者 创建 长连接 * @param serverIP * @return */ private synchronized Connection getConnection(String serverIP, Map<String, Connection> tempConnectionMaps){ if(StringUtils.isEmpty(serverIP)){ return null; } //链接是否已经创建 Connection conn = tempConnectionMaps.get(serverIP); if(conn != null && conn.isFine()) { //链接已创建,无需重复创建 return conn; } tempConnectionMaps.remove(serverIP); //需要重新创建链接 String ip = new StringBuilder().append(serverIP).append(":").append(serverPort).toString(); try { //创建或者获取长链接 conn = rpcClient.getConnection(ip , RpcConstants.CONNECTION_TIMEOUT); //向dtx-server注册实例id,send instanceId and clientIp ClientConnectRequest connectRequest = new ClientConnectRequest(RuntimeConfiguration.getInstanceId(), NetworkUtil.getLocalIP(), RuntimeConfiguration.getZone(), RuntimeConfiguration.getDataCenter()); try { connectRequest.setTracerId(RuntimeFramework.getInstance().getTracerId()); } catch (Exception t) { logger.error("getTracerId error", t); } //鉴权上下文 connectRequest.setAuthorizationContext(authorization.getAuthorizationContext()); Response ret = (Response) rpcClient.invokeSync(conn, connectRequest, RpcConstants.CONNECTION_TIMEOUT); //鉴权 if(ret != null && ret.isSuccess()) { //save connection logger.info("create connection to " + ip + " success."); tempConnectionMaps.put(serverIP, conn); return conn; } else { logger.error("create connection to " + ip + " failed. ret: " + JSON.toJSONString(ret)); } } catch(Throwable t) { logger.error("create connection to " + ip + " error." , t); } return null; } ........ } / * dtx-server ip 地址管理 * */ public class ServerAddressManager { ........ / * 初始化地址管理器 */ public void init() { try { //先从系统配置中获取地址 if(hasServerHosts()){ List<String> l = getServerHosts() ; List<ServerInfo> serverAddress = new ArrayList<ServerInfo>(); if(l != null){ for( String s : l){ ServerInfo sInfo = new ServerInfo(); sInfo.setServerIp(s); serverAddress.add(sInfo); } } serverAddressListner.onAddressMessage(serverAddress); }else{ //从注册器中获取 AddressSubscribe.subscribeServerAddress(serverAddressListner); } }catch(Throwable t){ logger.error("init dtx-server list error", t); throw new RuntimeException(ErrorCode.getErrorDesc(ErrorCode.DTX302),t); } } ........ } public class RpcClient { ........ / * Initialization. */ public void init() { if (this.addressParser == null) { this.addressParser = new RpcAddressParser(); } this.connectionManager.setAddressParser(this.addressParser); this.connectionManager.init(); this.rpcRemoting = new RpcClientRemoting(new RpcCommandFactory(), this.addressParser, this.connectionManager); this.taskScanner.add(this.connectionManager); this.taskScanner.start(); if (globalSwitch.isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) { if (monitorStrategy == null) { ScheduledDisconnectStrategy strategy = new ScheduledDisconnectStrategy(); connectionMonitor = new DefaultConnectionMonitor(strategy, this.connectionManager); } else { connectionMonitor = new DefaultConnectionMonitor(monitorStrategy, this.connectionManager); } connectionMonitor.start(); logger.warn("Switch on connection monitor"); } if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) { reconnectManager = new ReconnectManager(connectionManager); connectionEventHandler.setReconnectManager(reconnectManager); logger.warn("Switch on reconnect manager"); } } ........ } 

1、获取长连接管理器(单例)并初始化,观看以上代码可以得知,new了一个com.alipay.remoting.rpc.RpcClient对象,并初始化了这个Rpc客户端,设置地址解析器,初始化连接管理器connectionManager,新建rpc远程客户端对象以及RpcTaskScanner(rpc任务扫描器,定时扫描移除过期连接池连接),开启全局监控以及重连开关,初始化Rpc客户端后,向RpcConnectionFactory 的userProcessors对象中注册用户自定义处理器,这里的消息类型有四种:1>状态处理请求StateResolverRequest 2>分支提交请求 3>分支合并请求 4>Json字符串消息;

2、最后开启RpcConnectionManagerImpl 类中的异步批量消息发送线程OnwaySender,不停地从batchMessageQueue 消息队列中获取消息,发送到dtx-server;

3、长连接管理器初始化完毕后,开始初始化服务器地址管理,首先获取dtx-server服务器地址,然后调用DefaultRpcClient对象的onAddressMessage方法,获取或创建链接,并向dtx-server注册实例id,发送instanceid和clientip信息,如果建立连接成功并返回结果,则将连接放回连接池,等待下一次调用;

以下是连接管理器初始化和rpc连接工厂初始化源码:

public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager, Scannable { ........ @/ * @see com.alipay.remoting.ConnectionManager#init() */ public void init() { this.connectionEventHandler.setConnectionManager(this); this.connectionEventHandler.setConnectionEventListener(connectionEventListener); this.connectionFactory.init(connectionEventHandler); } ........ } public class RpcConnectionFactory implements ConnectionFactory { ........ @Override public void init(final ConnectionEventHandler connectionEventHandler) { bootstrap = new Bootstrap(); bootstrap.group(workerGroup).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, SystemProperties.tcp_nodelay()) .option(ChannelOption.SO_REUSEADDR, SystemProperties.tcp_so_reuseaddr()) .option(ChannelOption.SO_KEEPALIVE, SystemProperties.tcp_so_keepalive()); // init netty write buffer water mark initWriteBufferWaterMark(); // init byte buf allocator if (SystemProperties.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } else { this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); } final boolean idleSwitch = SystemProperties.tcp_idle_switch(); final int idleTime = SystemProperties.tcp_idle(); final RpcHandler rpcHandler = new RpcHandler(userProcessors); final HeartbeatHandler heartbeatHandler = new HeartbeatHandler(); bootstrap.handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", new RpcProtocolDecoder( RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH)); pipeline.addLast( "encoder", new ProtocolCodeBasedEncoder(ProtocolCode .fromBytes(RpcProtocolV2.PROTOCOL_CODE))); if (idleSwitch) { pipeline.addLast("idleStateHandler", new IdleStateHandler(idleTime, idleTime, 0, TimeUnit.MILLISECONDS)); pipeline.addLast("heartbeatHandler", heartbeatHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", rpcHandler); } }); } @Override public void registerUserProcessor(UserProcessor<?> processor) { if (processor == null || StringUtils.isBlank(processor.interest())) { throw new RuntimeException("User processor or processor interest should not be blank!"); } UserProcessor<?> preProcessor = this.userProcessors.putIfAbsent(processor.interest(), processor); if (preProcessor != null) { String errMsg = "Processor with interest key [" + processor.interest() + "] has already been registered to rpc client, can not register again!"; throw new RuntimeException(errMsg); } } ........ } 

为连接事件处理器设置监听器,并将处理器作为参数初始化rpc连接工厂,基于netty初始化channel,监听Channel的各种动作以及状态的改变,例如连接事件处理以及rpc事件处理,

以上则是服务发起者以及服务参与者启动DTX客户端初始化流程,由于DTX-SERVER并未开源,这里仅分析服务发起者以及服务参与者客户端的启动流程。

五、服务发起以及参与流程

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/154284.html

(0)
上一篇 2025-02-26 20:25
下一篇 2025-02-26 20:26

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信