分布式锁—2.Redisson的可重入锁一

分布式锁—2.Redisson的可重入锁一大纲 1 Redisson 可重入锁 RedissonLock 概述 2 可重入锁源码之创建 RedissonClie 实例 3 可重入锁源码之 lua 脚本加锁逻辑 4 可重入锁源码之 WatchDog 维持加锁逻辑 5 可重入锁源码之可重入加锁逻辑 6 可重入锁

大家好,欢迎来到IT知识分享网。

大纲

1.Redisson可重入锁RedissonLock概述

2.可重入锁源码之创建RedissonClient实例

3.可重入锁源码之lua脚本加锁逻辑

4.可重入锁源码之WatchDog维持加锁逻辑

5.可重入锁源码之可重入加锁逻辑

6.可重入锁源码之锁的互斥阻塞逻辑

7.可重入锁源码之释放锁逻辑

8.可重入锁源码之获取锁超时与锁超时自动释放逻辑

9.可重入锁源码总结

1.Redisson可重入锁RedissonLock概述

(1)在pom.xml里引入依赖

(2)构建RedissonClient并使用Redisson

(3)Redisson可重入锁RedissonLock简单使用

(1)在pom.xml里引入依赖

 
   
    
    
      org.redisson 
     
    
      redisson 
     
    
      3.16.8 
     
    
  

(2)构建RedissonClient并使用Redisson

参考官网中文文档,连接上3主3从的Redis Cluster。

//https://github.com/redisson/redisson/wiki/目录 public class Application { public static void main(String[] args) throws Exception { //连接3主3从的Redis CLuster Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://192.168.1.110:7001") .addNodeAddress("redis://192.168.1.110:7002") .addNodeAddress("redis://192.168.1.110:7003") .addNodeAddress("redis://192.168.1.111:7001") .addNodeAddress("redis://192.168.1.111:7002") .addNodeAddress("redis://192.168.1.111:7003"); //创建RedissonClient实例 RedissonClient redisson = Redisson.create(config); //获取可重入锁 RLock lock = redisson.getLock("myLock"); lock.lock(); lock.unlock(); RMap 
  
    map = redisson.getMap("myMap"); map.put("foo", "bar"); map = redisson.getMap("myMap"); System.out.println(map.get("foo")); } } 
  

(3)Redisson可重入锁RedissonLock简单使用

Redisson可重入锁RLock实现了
java.util.concurrent.locks.Lock接口
,同时还提供了异步(Async)、响应式(Reactive)和RxJava2标准的接口。

RLock lock = redisson.getLock("myLock"); //最常见的使用方法 lock.lock();

如果设置锁的超时时间不合理,导致超时时间已到锁还没能主动释放,但实际上锁却被Redis节点通过过期时间释放了,这会有问题。

为了避免这种情况,Redisson内部提供了一个用来监控锁的WatchDog。WatchDog的作用是在Redisson实例被关闭前不断地延长锁的有效期

WatchDog检查锁的默认超时时间是30秒,可通过
Config.lockWatchdogTimeout
来指定。

RLocktryLock方法提供了leaseTime参数来指定加锁的超时时间,超过这个时间后锁便自动被释放。

//如果没有主动释放锁的话,10秒后将会自动释放锁 lock.lock(10, TimeUnit.SECONDS); //加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }

RLock完全符合Java的Lock规范,即只有拥有锁的进程才能解锁,其他进程解锁则会抛出
IllegalMonitorStateException错误。如果
需要其他进程也能解锁,那么可以使用分布式信号量Semaphore

2.可重入锁源码之创建RedissonClient实例

(1)初始化与Redis的连接管理器ConnectionManager

(2)初始化Redis的命令执行器CommandExecutor

使用Redisson.create()方法可以根据配置创建一个RedissonClient实例因为Redisson类会实现RedissonClient接口,而创建RedissonClient实例的主要工作其实就是:

一.初始化与Redis的连接管理器ConnectionManager

二.初始化Redis的命令执行器CommandExecutor

(1)初始化与Redis的连接管理器ConnectionManager

Redis的配置类Config被封装连接管理器ConnectionManager中,后续可以通过连接管理器ConnectionManager获取Redis的配置类Config

public class Application { public static void main(String[] args) throws Exception { Config config = new Config(); config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001"); //创建RedissonClient实例 RedissonClient redisson = Redisson.create(config); ... } } //创建RedissonClient实例的源码 public class Redisson implements RedissonClient { protected final Config config;//Redis配置类 protected final ConnectionManager connectionManager;//Redis的连接管理器 protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器 ... public static RedissonClient create(Config config) { return new Redisson(config); } protected Redisson(Config config) { this.config = config; Config configCopy = new Config(config); //根据Redis配置类Config实例创建和Redis的连接管理器 connectionManager = ConfigSupport.createConnectionManager(configCopy); RedissonObjectBuilder objectBuilder = null; if (config.isReferenceEnabled()) { objectBuilder = new RedissonObjectBuilder(this); } //创建Redis的命令执行器 commandExecutor = new CommandSyncService(connectionManager, objectBuilder); evictionScheduler = new EvictionScheduler(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor); } ... } public class ConfigSupport { ... //创建Redis的连接管理器 public static ConnectionManager createConnectionManager(Config configCopy) { //生成UUID UUID id = UUID.randomUUID(); ... if (configCopy.getClusterServersConfig() != null) { validate(configCopy.getClusterServersConfig()); //返回ClusterConnectionManager实例 return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id); } ... } ... } public class ClusterConnectionManager extends MasterSlaveConnectionManager { public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { super(config, id); ... this.natMapper = cfg.getNatMapper(); //将Redis的配置类Config封装在ConnectionManager中 this.config = create(cfg); initTimer(this.config); Throwable lastException = null; List 
  
    failedMasters = new ArrayList 
   
     (); for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); //异步连接Redis节点 CompletionStage 
    
      connectionFuture = connectToNode(cfg, addr, addr.getHost()); ... //通过connectionFuture阻塞获取建立好的连接 RedisConnection connection = connectionFuture.toCompletableFuture().join(); ... List 
     
       nodes = connection.sync(clusterNodesCommand); ... CompletableFuture<Collection 
      
        > partitionsFuture = parsePartitions(nodes); Collection 
       
         partitions = partitionsFuture.join(); List<CompletableFuture 
        
          > masterFutures = new ArrayList<>(); for (ClusterPartition partition : partitions) { if (partition.isMasterFail()) { failedMasters.add(partition.getMasterAddress().toString()); continue; } if (partition.getMasterAddress() == null) { throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address."); } CompletableFuture 
         
           masterFuture = addMasterEntry(partition, cfg); masterFutures.add(masterFuture); } CompletableFuture 
          
            masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0])); masterFuture.join(); ... } ... } ... } public class MasterSlaveConnectionManager implements ConnectionManager { protected final String id;//初始化时为UUID private final Map 
           
             nodeConnections = new ConcurrentHashMap<>(); ... protected MasterSlaveConnectionManager(Config cfg, UUID id) { this.id = id.toString();//传入的是UUID this.cfg = cfg; ... } protected final CompletionStage 
            
              connectToNode(NodeType type, BaseConfig 
              cfg, RedisURI addr, String sslHostname) { RedisConnection conn = nodeConnections.get(addr); if (conn != null) { if (!conn.isActive()) { closeNodeConnection(conn); } else { return CompletableFuture.completedFuture(conn); } } //创建Redis客户端连接实例 RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname); //向Redis服务端发起异步连接请求,这个future会层层往外返回 CompletionStage 
             
               future = client.connectAsync(); return future.thenCompose(connection -> { if (connection.isActive()) { if (!addr.isIP()) { RedisURI address = new RedisURI(addr.getScheme() + "://" + connection.getRedisClient().getAddr().getAddress().getHostAddress() + ":" + connection.getRedisClient().getAddr().getPort()); nodeConnections.put(address, connection); } nodeConnections.put(addr, connection); return CompletableFuture.completedFuture(connection); } else { connection.closeAsync(); CompletableFuture 
              
                f = new CompletableFuture<>(); f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); return f; } }); } //创建Redis客户端连接实例 @Override public RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) { RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname); return RedisClient.create(redisConfig); } ... } //Redisson主要使用Netty去和Redis服务端建立连接 public final class RedisClient { private final Bootstrap bootstrap; private final Bootstrap pubSubBootstrap; ... public static RedisClient create(RedisClientConfig config) { return new RedisClient(config); } private RedisClient(RedisClientConfig config) { ... bootstrap = createBootstrap(copy, Type.PLAIN); pubSubBootstrap = createBootstrap(copy, Type.PUBSUB); this.commandTimeout = copy.getCommandTimeout(); } private Bootstrap createBootstrap(RedisClientConfig config, Type type) { Bootstrap bootstrap = new Bootstrap() .resolver(config.getResolverGroup()) .channel(config.getSocketChannelClass()) .group(config.getGroup()); bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type)); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout()); bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive()); bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()); config.getNettyHook().afterBoostrapInitialization(bootstrap); return bootstrap; } //向Redis服务端发起异步连接请求 public RFuture 
               
                 connectAsync() { CompletableFuture 
                
                  addrFuture = resolveAddr(); CompletableFuture 
                 
                   f = addrFuture.thenCompose(res -> { CompletableFuture 
                  
                    r = new CompletableFuture<>(); //Netty的Bootstrap发起连接 ChannelFuture channelFuture = bootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(final ChannelFuture future) throws Exception { if (bootstrap.config().group().isShuttingDown()) { IllegalStateException cause = new IllegalStateException("RedisClient is shutdown"); r.completeExceptionally(cause); return; } if (future.isSuccess()) { RedisConnection c = RedisConnection.getFrom(future.channel()); c.getConnectionPromise().whenComplete((res, e) -> { bootstrap.config().group().execute(new Runnable() { @Override public void run() { if (e == null) { if (!r.complete(c)) { c.closeAsync(); } } else { r.completeExceptionally(e); c.closeAsync(); } } }); }); } else { bootstrap.config().group().execute(new Runnable() { public void run() { r.completeExceptionally(future.cause()); } }); } } }); return r; }); return new CompletableFutureWrapper<>(f); } ... } 
                   
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
  

(2)初始化Redis的命令执行器CommandExecutor

首先,CommandSyncService继承自CommandAsyncService类。

而CommandAsyncService类实现了CommandExecutor接口。

然后,ConnectionManager连接管理器会封装在命令执行器CommandExecutor中。

所以,通过CommandExecutor命令执行器可以获取连接管理器ConnectionManager

//Redis命令的同步执行器CommandSyncService public class CommandSyncService extends CommandAsyncService implements CommandExecutor { //初始化CommandExecutor public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) { super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT); } public 
  
    R read(String key, RedisCommand 
   
     command, Object... params) { return read(key, connectionManager.getCodec(), command, params); } public 
    
      R read(String key, Codec codec, RedisCommand 
     
       command, Object... params) { RFuture 
      
        res = readAsync(key, codec, command, params); return get(res); } public 
       
         R evalRead(String key, RedisCommand 
        
          evalCommandType, String script, List<Object> keys, Object... params) { return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params); } public 
         
           R evalRead(String key, Codec codec, RedisCommand 
          
            evalCommandType, String script, List<Object> keys, Object... params) { RFuture 
           
             res = evalReadAsync(key, codec, evalCommandType, script, keys, params); return get(res); } public 
            
              R evalWrite(String key, RedisCommand 
             
               evalCommandType, String script, List<Object> keys, Object... params) { return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params); } public 
              
                R evalWrite(String key, Codec codec, RedisCommand 
               
                 evalCommandType, String script, List<Object> keys, Object... params) { RFuture 
                
                  res = evalWriteAsync(key, codec, evalCommandType, script, keys, params); return get(res); } } //Redis命令的异步执行器CommandAsyncService public class CommandAsyncService implements CommandAsyncExecutor { //Redis连接管理器 final ConnectionManager connectionManager; final RedissonObjectBuilder objectBuilder; final RedissonObjectBuilder.ReferenceType referenceType; public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) { this.connectionManager = connectionManager; this.objectBuilder = objectBuilder; this.referenceType = referenceType; } @Override public 
                 
                   V getNow(CompletableFuture 
                  
                    future) { try { return future.getNow(null); } catch (Exception e) { return null; } } @Override public 
                   
                     R read(String key, Codec codec, RedisCommand 
                    
                      command, Object... params) { RFuture 
                     
                       res = readAsync(key, codec, command, params); return get(res); } @Override public 
                      
                        RFuture 
                       
                         readAsync(String key, Codec codec, RedisCommand 
                        
                          command, Object... params) { NodeSource source = getNodeSource(key); return async(true, source, codec, command, params, false, false); } private NodeSource getNodeSource(String key) { int slot = connectionManager.calcSlot(key); return new NodeSource(slot); } public 
                         
                           RFuture 
                          
                            async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand 
                           
                             command, Object[] params, boolean ignoreRedirect, boolean noRetry) { CompletableFuture 
                            
                              mainPromise = createPromise(); RedisExecutor 
                             
                               executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry); executor.execute(); return new CompletableFutureWrapper<>(mainPromise); } @Override public 
                              
                                V get(RFuture 
                               
                                 future) { if (Thread.currentThread().getName().startsWith("redisson-netty")) { throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners"); } try { return future.toCompletableFuture().get(); } catch (InterruptedException e) { future.cancel(true); Thread.currentThread().interrupt(); throw new RedisException(e); } catch (ExecutionException e) { throw convertException(e); } } ... } 
                                
                               
                              
                             
                            
                           
                          
                         
                        
                       
                      
                     
                    
                   
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
  

3.可重入锁源码之lua脚本加锁逻辑

(1)通过Redisson.getLock()方法获取一个RedissonLock实例

(2)加锁时的执行流程

(3)加锁时执行的lua脚本

(4)执行加锁lua脚本的命令执行器逻辑

(5)如何根据slot值获取对应的节点

(1)通过Redisson.getLock()方法获取一个RedissonLock实例

Redisson.getLock()方法中,会传入命令执行器CommandExecutor创建一个RedissonLock实例,而命令执行器CommandExecutor是在执行Redisson.create()方法时初始化好的,所以命令执行器CommandExecutor会被封装RedissonLock实例中。

因此,通过RedissonLock实例可以获取一个命令执行器CommandExecutor,通过命令执行器CommandExecutor可获取连接管理器ConnectionManager,通过连接管理器ConnectionManager可获取Redis的配置信息类Config,通过Redis的配置信息类Config可以获取各种配置信息

RedissonLock类继承自实现了RLock接口的RedissonBaseLock类。在RedissonLock的构造方法里面,有个internalLockLeaseTime变量,这个internalLockLeaseTime变量WatchDog看门狗有关系。interlnalLockLeaseTime的默认值是30000毫秒,即30秒

public class Application { public static void main(String[] args) throws Exception { Config config = new Config(); config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001"); //创建RedissonClient实例 RedissonClient redisson = Redisson.create(config); //获取可重入锁 RLock lock = redisson.getLock("myLock"); lock.lock(); ... } } //创建Redisson实例 public class Redisson implements RedissonClient { protected final Config config;//Redis配置类 protected final ConnectionManager connectionManager;//Redis的连接管理器 protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器 ... public static RedissonClient create(Config config) { return new Redisson(config); } protected Redisson(Config config) { ... //根据Redis配置类Config实例创建和Redis的连接管理器 connectionManager = ConfigSupport.createConnectionManager(configCopy); //创建Redis的命令执行器 commandExecutor = new CommandSyncService(connectionManager, objectBuilder); ... } ... @Override public RLock getLock(String name) { return new RedissonLock(commandExecutor, name); } ... } //创建RedissonLock实例 //通过RedissonLock实例可以获取一个命令执行器CommandExecutor; public class RedissonLock extends RedissonBaseLock { protected long internalLockLeaseTime; protected final LockPubSub pubSub; final CommandAsyncExecutor commandExecutor; public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; //与WatchDog有关的internalLockLeaseTime //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间 this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } ... } //创建Redis的命令执行器 //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager public class CommandAsyncService implements CommandAsyncExecutor { final ConnectionManager connectionManager; ... public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) { this.connectionManager = connectionManager; this.objectBuilder = objectBuilder; this.referenceType = referenceType; } @Override public ConnectionManager getConnectionManager() { return connectionManager; } ... } //创建Redis的连接管理器 //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config public class ClusterConnectionManager extends MasterSlaveConnectionManager { ... public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { super(config, id); ... } ... } //创建Redis的连接管理器 //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config public class MasterSlaveConnectionManager implements ConnectionManager { private final Config cfg; protected final String id;//初始化时为UUID ... protected MasterSlaveConnectionManager(Config cfg, UUID id) { this.id = id.toString();//传入的是UUID this.cfg = cfg; ... } @Override public Config getCfg() { return cfg; } ... } //配置信息类Config中的lockWatchdogTimeout变量初始化为30秒,该变量与WatchDog有关 public class Config { private long lockWatchdogTimeout = 30 * 1000; ... //This parameter is only used if lock has been acquired without leaseTimeout parameter definition. //Lock expires after "lockWatchdogTimeout" if watchdog didn't extend it to next "lockWatchdogTimeout" time interval. //This prevents against infinity locked locks due to Redisson client crush or any other reason when lock can't be released in proper way. //Default is 30000 milliseconds public Config setLockWatchdogTimeout(long lockWatchdogTimeout) { this.lockWatchdogTimeout = lockWatchdogTimeout; return this; } public long getLockWatchdogTimeout() { return lockWatchdogTimeout; } }

默认情况下,调用RedissonLock.lock()方法加锁时,传入的leaseTime为-1此时锁的超时时间会设为lockWatchdogTimeout默认的30秒,从而避免出现死锁的情况。

public class RedissonLock extends RedissonBaseLock { ... //加锁 @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); ... } //解锁 @Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } ... }

(2)加锁时的执行流程

首先会调用RedissonLock的tryAcquire()方法处理异步RFuture相关,然后调用RedissonLock的tryAcquireAsync()方法对执行脚本的结果进行处理,接着调用
RedissonLock.tryLockInnerAsync方法执行加锁的lua脚本。

public class RedissonLock extends RedissonBaseLock { protected long internalLockLeaseTime; protected final LockPubSub pubSub; final CommandAsyncExecutor commandExecutor; public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; //与WatchDog有关的internalLockLeaseTime //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间 this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } ... //加锁 @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //线程ID,用来生成设置Hash的值 long threadId = Thread.currentThread().getId(); //尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); //ttl为null说明加锁成功 if (ttl == null) { return; } //加锁失败时的处理 CompletableFuture 
  
    future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { commandExecutor.getNow(future).getLatch().acquire(); } else { commandExecutor.getNow(future).getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(commandExecutor.getNow(future), threadId); } } ... private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法 //可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步 return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private 
   
     RFuture 
    
      tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture 
     
       ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime //internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage 
      
        f = ttlRemainingFuture.thenApply(ttlRemaining -> { //加锁返回的ttlRemaining为null表示加锁成功 if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); } //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒 
       
         RFuture 
        
          tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand 
         
           command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()),//锁的名字:KEYS[1] unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒 getLockName(threadId)//ARGV[2],值为UUID + 线程ID ); } ... } public abstract class RedissonBaseLock extends RedissonExpirable implements RLock { final String id; final String entryName; final CommandAsyncExecutor commandExecutor; public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } protected String getLockName(long threadId) { return id + ":" + threadId; } ... } abstract class RedissonExpirable extends RedissonObject implements RExpirable { ... RedissonExpirable(CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); } ... } public abstract class RedissonObject implements RObject { protected final CommandAsyncExecutor commandExecutor; protected String name; protected final Codec codec; public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) { this.codec = codec; this.commandExecutor = commandExecutor; if (name == null) { throw new NullPointerException("name can't be null"); } setName(name); } ... protected final 
          
            V get(RFuture 
           
             future) { //下面会调用CommandAsyncService.get()方法 return commandExecutor.get(future); } ... } public class CommandAsyncService implements CommandAsyncExecutor { ... @Override public 
            
              V get(RFuture 
             
               future) { if (Thread.currentThread().getName().startsWith("redisson-netty")) { throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners"); } try { return future.toCompletableFuture().get(); } catch (InterruptedException e) { future.cancel(true); Thread.currentThread().interrupt(); throw new RedisException(e); } catch (ExecutionException e) { throw convertException(e); } } ... } 
              
             
            
           
          
         
        
       
      
     
    
  

(3)加锁时执行的lua脚本

public class RedissonLock extends RedissonBaseLock { //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒 
  
    RFuture 
   
     tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand 
    
      command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()),//锁的名字:KEYS[1],比如"myLock" unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒 getLockName(threadId)//ARGV[2],值为UUID + 线程ID ); } ... } 
     
    
  

首先执行Redis的exists命令,判断key为锁名的Hash值是否不存在,也就是判断key为锁名myLock的Hash值是否存在。

一.如果key为锁名的Hash值不存在,那么就进行如下加锁处理

首先通过Redis的hset命令设置一个key为锁名的Hash值。该Hash值的key为锁名value是一个映射。也就是在value值中会有一个field为UUID + 线程IDvalue为1的映射。比如:hset myLock UUID:ThreadID 1,lua脚本中的ARGV[2]就是由UUID + 线程ID组成的唯一值。

然后通过Redis的pexpire命令设置key为锁名的Hash值的过期时间,也就是设置key为锁名的Hash值的过期时间为30秒。比如:pexpire myLock 30000。所以默认情况下,myLock这个锁在30秒后就会自动过期。

二.如果key为锁名的Hash值存在,那么就执行如下判断处理

首先通过Redis的hexists命令判断在key为锁名的Hash值里,field为UUID + 线程ID的映射是否已经存在。

如果在key为锁名的Hash值里,field为UUID + 线程ID的映射存在,那么就通过Redis的hincrby命令,对field为UUID + 线程ID的value值进行递增1。比如:hincrby myLock UUID:ThreadID 1。也就是在key为myLock的Hash值里,把field为UUID:ThreadID的value值从1累加到2,发生这种情况的时候往往就是当前线程锁进行了重入接着执行:pexpire myLock 30000再次将myLock的有效期设置为30秒。

如果在key为锁名的Hash值里,field为UUID + 线程ID的映射不存在,发生这种情况的时候往往就是其他线程获取不到这把锁而产生互斥。那么就通过Redis的pttl命令,返回key为锁名的Hash值的剩余存活时间,因为不同线程的ARGV[2]是不一样的ARGV[2] = UUID + 线程ID

(4)执行加锁lua脚本的命令执行器逻辑

RedissonLock的tryLockInnerAsync()方法中,会通过RedissonBaseLock的evalWriteAsync()方法执行lua脚本,即通过CommandAsyncService的evalWriteAsync()方法执行lua脚本。

在CommandAsyncService的evalWriteAsync()方法中,首先会执行CommandAsyncService的getNodeSource()方法获取对应的节点。然后执行CommandAsyncService的evalAsync()方法执行lua脚本

在CommandAsyncService的getNodeSource()方法中,会根据key进行CRC16运算,然后再对16384取模计算出key的slot值。然后根据这个slot值创建一个NodeSource实例进行返回。

在CommandAsyncService的evalAsync()方法中,会将获得的NodeSource实例封装到Redis执行器RedisExecutor里。然后执行RedisExecutor,实现将脚本请求发送给对应的Redis节点处理。

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock { //从外部传入的:在创建实现了RedissonClient的Redisson实例时,初始化的命令执行器CommandExecutor final CommandAsyncExecutor commandExecutor; public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } ... protected 
  
    RFuture 
   
     evalWriteAsync(String key, Codec codec, RedisCommand 
    
      evalCommandType, String script, List<Object> keys, Object... params) { //获取可用的节点,并继续封装一个命令执行器CommandBatchService MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName()); int availableSlaves = entry.getAvailableSlaves(); CommandBatchService executorService = createCommandBatchService(availableSlaves); //通过CommandAsyncService.evalWriteAsync方法执行lua脚本 RFuture 
     
       result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params); if (commandExecutor instanceof CommandBatchService) { return result; } //异步执行然后获取结果 RFuture<BatchResult 
      > future = executorService.executeAsync(); CompletionStage 
      
        f = future.handle((res, ex) -> { if (ex == null && res.getSyncedSlaves() != availableSlaves) { throw new CompletionException(new IllegalStateException("Only " + res.getSyncedSlaves() + " of " + availableSlaves + " slaves were synced")); } return result.getNow(); }); return new CompletableFutureWrapper<>(f); } private CommandBatchService createCommandBatchService(int availableSlaves) { if (commandExecutor instanceof CommandBatchService) { return (CommandBatchService) commandExecutor; } BatchOptions options = BatchOptions.defaults().syncSlaves(availableSlaves, 1, TimeUnit.SECONDS); return new CommandBatchService(commandExecutor, options); } ... } public class CommandBatchService extends CommandAsyncService { ... public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) { this(executor.getConnectionManager(), options, executor.getObjectBuilder(), RedissonObjectBuilder.ReferenceType.DEFAULT); } private CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) { super(connectionManager, objectBuilder, referenceType); this.options = options; } ... } public class CommandAsyncService implements CommandAsyncExecutor { final ConnectionManager connectionManager; final RedissonObjectBuilder objectBuilder; final RedissonObjectBuilder.ReferenceType referenceType; public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) { this.connectionManager = connectionManager; this.objectBuilder = objectBuilder; this.referenceType = referenceType; } ... @Override public 
       
         RFuture 
        
          evalWriteAsync(String key, Codec codec, RedisCommand 
         
           evalCommandType, String script, List<Object> keys, Object... params) { //获取key对应的节点 NodeSource source = getNodeSource(key); //让对应的节点执行lua脚本请求 return evalAsync(source, false, codec, evalCommandType, script, keys, false, params); } //获取key对应的Redis Cluster节点 private NodeSource getNodeSource(String key) { //先计算key对应的slot值 int slot = connectionManager.calcSlot(key); //返回节点实例 return new NodeSource(slot); } //执行lua脚本 private 
          
            RFuture 
           
             evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand 
            
              evalCommandType, String script, List<Object> keys, boolean noRetry, Object... params) { if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) { CompletableFuture 
             
               mainPromise = new CompletableFuture<>(); Object[] pps = copy(params); CompletableFuture 
              
                promise = new CompletableFuture<>(); String sha1 = calcSHA(script); RedisCommand cmd = new RedisCommand(evalCommandType, "EVALSHA"); List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length); args.add(sha1); args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); //将根据key进行CRC16运算然后对16384取模获取到的NodeSource实例,封装到Redis执行器RedisExecutor中 RedisExecutor 
               
                 executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd, args.toArray(), promise, false, connectionManager, objectBuilder, referenceType, noRetry); //通过执行Redis执行器RedisExecutor,来实现将lua脚本请求发送给对应的Redis节点进行处理 executor.execute(); ... } ... } ... } public class ClusterConnectionManager extends MasterSlaveConnectionManager { public static final int MAX_SLOT = 16384;//Redis Cluster默认有16384个slot ... //对key进行CRC16运算,然后再对16384取模 @Override public int calcSlot(String key) { if (key == null) { return 0; } int start = key.indexOf('{'); if (start != -1) { int end = key.indexOf('}'); if (end != -1 && start + 1 < end) { key = key.substring(start + 1, end); } } int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; return result; } ... } 
                
               
              
             
            
           
          
         
        
       
      
     
    
  

(5)如何根据slot值获取对应的节点

因为最后会执行封装了NodeSource实例的RedisExecutorexcute()方法,而NodeSource实例中又会封装了锁名key对应的slot值,所以RedisExecutor的excute()方法可以通过getConnection()方法获取对应节点的连接。

其中RedisExecutor的getConnection()方法会调用到
MasterSlaveConnectionManager的connectionWriteOp()方法,该方法又会通过调用ConnectionManager的getEntry()方法根据slot值获取节点,也就是由
ClusterConnectionManager的getEntry()方法获取Redis的主节点

其实在初始化连接管理器ClusterConnectionManager时,就已经根据配置初始化好哪些slot映射到那个Redis主节点了。

public class RedisExecutor 
  
    { NodeSource source; ... public void execute() { ... //异步获取建立好的Redis连接 CompletableFuture 
   
     connectionFuture = getConnection().toCompletableFuture(); ... } protected CompletableFuture 
    
      getConnection() { ... connectionFuture = connectionManager.connectionWriteOp(source, command); return connectionFuture; } ... } public class MasterSlaveConnectionManager implements ConnectionManager { ... @Override public CompletableFuture 
     
       connectionWriteOp(NodeSource source, RedisCommand 
       command) { MasterSlaveEntry entry = getEntry(source); ... } private MasterSlaveEntry getEntry(NodeSource source) { if (source.getRedirect() != null) { return getEntry(source.getAddr()); } MasterSlaveEntry entry = source.getEntry(); if (source.getRedisClient() != null) { entry = getEntry(source.getRedisClient()); } if (entry == null && source.getSlot() != null) { //根据slot获取Redis的主节点 entry = getEntry(source.getSlot()); } return entry; } ... } public class ClusterConnectionManager extends MasterSlaveConnectionManager { //slot和Redis主节点的原子映射数组 private final AtomicReferenceArray 
      
        slot2entry = new AtomicReferenceArray<>(MAX_SLOT); //Redis客户端连接和Redis主节点的映射关系 private final Map 
       
         client2entry = new ConcurrentHashMap<>(); ... @Override public MasterSlaveEntry getEntry(int slot) { //根据slot获取Redis的主节点 return slot2entry.get(slot); } ... //初始化连接管理器ClusterConnectionManager时 //就已经根据配置初始化好那些slot映射到那个Redis主节点了 public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { ... for (String address : cfg.getNodeAddresses()) { ... CompletableFuture<Collection 
        
          > partitionsFuture = parsePartitions(nodes); Collection 
         
           partitions = partitionsFuture.join(); List<CompletableFuture 
          
            > masterFutures = new ArrayList<>(); for (ClusterPartition partition : partitions) { ... CompletableFuture 
           
             masterFuture = addMasterEntry(partition, cfg); masterFutures.add(masterFuture); } ... } ... } private CompletableFuture 
            
              addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { ... CompletionStage 
             
               connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName); connectionFuture.whenComplete((connection, ex1) -> { //成功连接时的处理 if (ex1 != null) { log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); result.completeExceptionally(ex1); return; } MasterSlaveServersConfig config = create(cfg); config.setMasterAddress(partition.getMasterAddress().toString()); //创建Redis的主节点 MasterSlaveEntry entry; if (config.checkSkipSlavesInit()) { entry = new SingleEntry(ClusterConnectionManager.this, config); } else { Set 
              
                slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet()); config.setSlaveAddresses(slaveAddresses); entry = new MasterSlaveEntry(ClusterConnectionManager.this, config); } CompletableFuture 
               
                 f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress()), configEndpointHostName); f.whenComplete((masterClient, ex3) -> { if (ex3 != null) { log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3); result.completeExceptionally(ex3); return; } //为创建的Redis的主节点添加slot值 for (Integer slot : partition.getSlots()) { addEntry(slot, entry); lastPartitions.put(slot, partition); } ... }); }); ... } //添加slot到对应节点的映射关系 private void addEntry(Integer slot, MasterSlaveEntry entry) { MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry); if (oldEntry != entry) { entry.incReference(); shutdownEntry(oldEntry); } client2entry.put(entry.getClient(), entry); } ... } 
                
               
              
             
            
           
          
         
        
       
      
     
    
  

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/174376.html

(0)
上一篇 2025-03-24 09:00
下一篇 2025-03-24 09:05

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信