大家好,欢迎来到IT知识分享网。
大纲
1.Redisson可重入锁RedissonLock概述
2.可重入锁源码之创建RedissonClient实例
3.可重入锁源码之lua脚本加锁逻辑
4.可重入锁源码之WatchDog维持加锁逻辑
5.可重入锁源码之可重入加锁逻辑
6.可重入锁源码之锁的互斥阻塞逻辑
7.可重入锁源码之释放锁逻辑
8.可重入锁源码之获取锁超时与锁超时自动释放逻辑
9.可重入锁源码总结
1.Redisson可重入锁RedissonLock概述
(1)在pom.xml里引入依赖
(2)构建RedissonClient并使用Redisson
(3)Redisson可重入锁RedissonLock简单使用
(1)在pom.xml里引入依赖
org.redisson
redisson
3.16.8
(2)构建RedissonClient并使用Redisson
参考官网中文文档,连接上3主3从的Redis Cluster。
//https://github.com/redisson/redisson/wiki/目录 public class Application { public static void main(String[] args) throws Exception { //连接3主3从的Redis CLuster Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://192.168.1.110:7001") .addNodeAddress("redis://192.168.1.110:7002") .addNodeAddress("redis://192.168.1.110:7003") .addNodeAddress("redis://192.168.1.111:7001") .addNodeAddress("redis://192.168.1.111:7002") .addNodeAddress("redis://192.168.1.111:7003"); //创建RedissonClient实例 RedissonClient redisson = Redisson.create(config); //获取可重入锁 RLock lock = redisson.getLock("myLock"); lock.lock(); lock.unlock(); RMap
map = redisson.getMap("myMap"); map.put("foo", "bar"); map = redisson.getMap("myMap"); System.out.println(map.get("foo")); } }
(3)Redisson可重入锁RedissonLock简单使用
Redisson可重入锁RLock实现了
java.util.concurrent.locks.Lock接口,同时还提供了异步(Async)、响应式(Reactive)和RxJava2标准的接口。
RLock lock = redisson.getLock("myLock"); //最常见的使用方法 lock.lock();
如果设置锁的超时时间不合理,导致超时时间已到时锁还没能主动释放,但实际上锁却被Redis节点通过过期时间释放了,这会有问题。
为了避免这种情况,Redisson内部提供了一个用来监控锁的WatchDog。WatchDog的作用是在Redisson实例被关闭前,不断地延长锁的有效期。
WatchDog检查锁的默认超时时间是30秒,可通过
Config.lockWatchdogTimeout来指定。
RLock的tryLock方法提供了leaseTime参数来指定加锁的超时时间,超过这个时间后锁便自动被释放。
//如果没有主动释放锁的话,10秒后将会自动释放锁 lock.lock(10, TimeUnit.SECONDS); //加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
RLock完全符合Java的Lock规范,即只有拥有锁的进程才能解锁,其他进程解锁则会抛出
IllegalMonitorStateException错误。如果需要其他进程也能解锁,那么可以使用分布式信号量Semaphore。
2.可重入锁源码之创建RedissonClient实例
(1)初始化与Redis的连接管理器ConnectionManager
(2)初始化Redis的命令执行器CommandExecutor
使用Redisson.create()方法可以根据配置创建一个RedissonClient实例,因为Redisson类会实现RedissonClient接口,而创建RedissonClient实例的主要工作其实就是:
一.初始化与Redis的连接管理器ConnectionManager
二.初始化Redis的命令执行器CommandExecutor
(1)初始化与Redis的连接管理器ConnectionManager
Redis的配置类Config会被封装在连接管理器ConnectionManager中,后续可以通过连接管理器ConnectionManager获取Redis的配置类Config。
public class Application { public static void main(String[] args) throws Exception { Config config = new Config(); config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001"); //创建RedissonClient实例 RedissonClient redisson = Redisson.create(config); ... } } //创建RedissonClient实例的源码 public class Redisson implements RedissonClient { protected final Config config;//Redis配置类 protected final ConnectionManager connectionManager;//Redis的连接管理器 protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器 ... public static RedissonClient create(Config config) { return new Redisson(config); } protected Redisson(Config config) { this.config = config; Config configCopy = new Config(config); //根据Redis配置类Config实例创建和Redis的连接管理器 connectionManager = ConfigSupport.createConnectionManager(configCopy); RedissonObjectBuilder objectBuilder = null; if (config.isReferenceEnabled()) { objectBuilder = new RedissonObjectBuilder(this); } //创建Redis的命令执行器 commandExecutor = new CommandSyncService(connectionManager, objectBuilder); evictionScheduler = new EvictionScheduler(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor); } ... } public class ConfigSupport { ... //创建Redis的连接管理器 public static ConnectionManager createConnectionManager(Config configCopy) { //生成UUID UUID id = UUID.randomUUID(); ... if (configCopy.getClusterServersConfig() != null) { validate(configCopy.getClusterServersConfig()); //返回ClusterConnectionManager实例 return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id); } ... } ... } public class ClusterConnectionManager extends MasterSlaveConnectionManager { public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { super(config, id); ... this.natMapper = cfg.getNatMapper(); //将Redis的配置类Config封装在ConnectionManager中 this.config = create(cfg); initTimer(this.config); Throwable lastException = null; List
failedMasters = new ArrayList
(); for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); //异步连接Redis节点 CompletionStage
connectionFuture = connectToNode(cfg, addr, addr.getHost()); ... //通过connectionFuture阻塞获取建立好的连接 RedisConnection connection = connectionFuture.toCompletableFuture().join(); ... List
nodes = connection.sync(clusterNodesCommand); ... CompletableFuture<Collection
> partitionsFuture = parsePartitions(nodes); Collection
partitions = partitionsFuture.join(); List<CompletableFuture
> masterFutures = new ArrayList<>(); for (ClusterPartition partition : partitions) { if (partition.isMasterFail()) { failedMasters.add(partition.getMasterAddress().toString()); continue; } if (partition.getMasterAddress() == null) { throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address."); } CompletableFuture
masterFuture = addMasterEntry(partition, cfg); masterFutures.add(masterFuture); } CompletableFuture
masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0])); masterFuture.join(); ... } ... } ... } public class MasterSlaveConnectionManager implements ConnectionManager { protected final String id;//初始化时为UUID private final Map
nodeConnections = new ConcurrentHashMap<>(); ... protected MasterSlaveConnectionManager(Config cfg, UUID id) { this.id = id.toString();//传入的是UUID this.cfg = cfg; ... } protected final CompletionStage
connectToNode(NodeType type, BaseConfig
cfg, RedisURI addr, String sslHostname) { RedisConnection conn = nodeConnections.get(addr); if (conn != null) { if (!conn.isActive()) { closeNodeConnection(conn); } else { return CompletableFuture.completedFuture(conn); } } //创建Redis客户端连接实例 RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname); //向Redis服务端发起异步连接请求,这个future会层层往外返回 CompletionStage
future = client.connectAsync(); return future.thenCompose(connection -> { if (connection.isActive()) { if (!addr.isIP()) { RedisURI address = new RedisURI(addr.getScheme() + "://" + connection.getRedisClient().getAddr().getAddress().getHostAddress() + ":" + connection.getRedisClient().getAddr().getPort()); nodeConnections.put(address, connection); } nodeConnections.put(addr, connection); return CompletableFuture.completedFuture(connection); } else { connection.closeAsync(); CompletableFuture
f = new CompletableFuture<>(); f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); return f; } }); } //创建Redis客户端连接实例 @Override public RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) { RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname); return RedisClient.create(redisConfig); } ... } //Redisson主要使用Netty去和Redis服务端建立连接 public final class RedisClient { private final Bootstrap bootstrap; private final Bootstrap pubSubBootstrap; ... public static RedisClient create(RedisClientConfig config) { return new RedisClient(config); } private RedisClient(RedisClientConfig config) { ... bootstrap = createBootstrap(copy, Type.PLAIN); pubSubBootstrap = createBootstrap(copy, Type.PUBSUB); this.commandTimeout = copy.getCommandTimeout(); } private Bootstrap createBootstrap(RedisClientConfig config, Type type) { Bootstrap bootstrap = new Bootstrap() .resolver(config.getResolverGroup()) .channel(config.getSocketChannelClass()) .group(config.getGroup()); bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type)); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout()); bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive()); bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()); config.getNettyHook().afterBoostrapInitialization(bootstrap); return bootstrap; } //向Redis服务端发起异步连接请求 public RFuture
connectAsync() { CompletableFuture
addrFuture = resolveAddr(); CompletableFuture
f = addrFuture.thenCompose(res -> { CompletableFuture
r = new CompletableFuture<>(); //Netty的Bootstrap发起连接 ChannelFuture channelFuture = bootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(final ChannelFuture future) throws Exception { if (bootstrap.config().group().isShuttingDown()) { IllegalStateException cause = new IllegalStateException("RedisClient is shutdown"); r.completeExceptionally(cause); return; } if (future.isSuccess()) { RedisConnection c = RedisConnection.getFrom(future.channel()); c.getConnectionPromise().whenComplete((res, e) -> { bootstrap.config().group().execute(new Runnable() { @Override public void run() { if (e == null) { if (!r.complete(c)) { c.closeAsync(); } } else { r.completeExceptionally(e); c.closeAsync(); } } }); }); } else { bootstrap.config().group().execute(new Runnable() { public void run() { r.completeExceptionally(future.cause()); } }); } } }); return r; }); return new CompletableFutureWrapper<>(f); } ... }
(2)初始化Redis的命令执行器CommandExecutor
首先,CommandSyncService继承自CommandAsyncService类。
而CommandAsyncService类实现了CommandExecutor接口。
然后,ConnectionManager连接管理器会封装在命令执行器CommandExecutor中。
所以,通过CommandExecutor命令执行器可以获取连接管理器ConnectionManager。
//Redis命令的同步执行器CommandSyncService public class CommandSyncService extends CommandAsyncService implements CommandExecutor { //初始化CommandExecutor public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) { super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT); } public
R read(String key, RedisCommand
command, Object... params) { return read(key, connectionManager.getCodec(), command, params); } public
R read(String key, Codec codec, RedisCommand
command, Object... params) { RFuture
res = readAsync(key, codec, command, params); return get(res); } public
R evalRead(String key, RedisCommand
evalCommandType, String script, List<Object> keys, Object... params) { return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params); } public
R evalRead(String key, Codec codec, RedisCommand
evalCommandType, String script, List<Object> keys, Object... params) { RFuture
res = evalReadAsync(key, codec, evalCommandType, script, keys, params); return get(res); } public
R evalWrite(String key, RedisCommand
evalCommandType, String script, List<Object> keys, Object... params) { return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params); } public
R evalWrite(String key, Codec codec, RedisCommand
evalCommandType, String script, List<Object> keys, Object... params) { RFuture
res = evalWriteAsync(key, codec, evalCommandType, script, keys, params); return get(res); } } //Redis命令的异步执行器CommandAsyncService public class CommandAsyncService implements CommandAsyncExecutor { //Redis连接管理器 final ConnectionManager connectionManager; final RedissonObjectBuilder objectBuilder; final RedissonObjectBuilder.ReferenceType referenceType; public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) { this.connectionManager = connectionManager; this.objectBuilder = objectBuilder; this.referenceType = referenceType; } @Override public
V getNow(CompletableFuture
future) { try { return future.getNow(null); } catch (Exception e) { return null; } } @Override public
R read(String key, Codec codec, RedisCommand
command, Object... params) { RFuture
res = readAsync(key, codec, command, params); return get(res); } @Override public
RFuture
readAsync(String key, Codec codec, RedisCommand
command, Object... params) { NodeSource source = getNodeSource(key); return async(true, source, codec, command, params, false, false); } private NodeSource getNodeSource(String key) { int slot = connectionManager.calcSlot(key); return new NodeSource(slot); } public
RFuture
async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand
command, Object[] params, boolean ignoreRedirect, boolean noRetry) { CompletableFuture
mainPromise = createPromise(); RedisExecutor
executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry); executor.execute(); return new CompletableFutureWrapper<>(mainPromise); } @Override public
V get(RFuture
future) { if (Thread.currentThread().getName().startsWith("redisson-netty")) { throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners"); } try { return future.toCompletableFuture().get(); } catch (InterruptedException e) { future.cancel(true); Thread.currentThread().interrupt(); throw new RedisException(e); } catch (ExecutionException e) { throw convertException(e); } } ... }
3.可重入锁源码之lua脚本加锁逻辑
(1)通过Redisson.getLock()方法获取一个RedissonLock实例
(2)加锁时的执行流程
(3)加锁时执行的lua脚本
(4)执行加锁lua脚本的命令执行器逻辑
(5)如何根据slot值获取对应的节点
(1)通过Redisson.getLock()方法获取一个RedissonLock实例
在Redisson.getLock()方法中,会传入命令执行器CommandExecutor来创建一个RedissonLock实例,而命令执行器CommandExecutor是在执行Redisson.create()方法时初始化好的,所以命令执行器CommandExecutor会被封装在RedissonLock实例中。
因此,通过RedissonLock实例可以获取一个命令执行器CommandExecutor,通过命令执行器CommandExecutor可获取连接管理器ConnectionManager,通过连接管理器ConnectionManager可获取Redis的配置信息类Config,通过Redis的配置信息类Config可以获取各种配置信息。
RedissonLock类继承自实现了RLock接口的RedissonBaseLock类。在RedissonLock的构造方法里面,有个internalLockLeaseTime变量,这个internalLockLeaseTime变量与WatchDog看门狗有关系。interlnalLockLeaseTime的默认值是30000毫秒,即30秒;
public class Application { public static void main(String[] args) throws Exception { Config config = new Config(); config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001"); //创建RedissonClient实例 RedissonClient redisson = Redisson.create(config); //获取可重入锁 RLock lock = redisson.getLock("myLock"); lock.lock(); ... } } //创建Redisson实例 public class Redisson implements RedissonClient { protected final Config config;//Redis配置类 protected final ConnectionManager connectionManager;//Redis的连接管理器 protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器 ... public static RedissonClient create(Config config) { return new Redisson(config); } protected Redisson(Config config) { ... //根据Redis配置类Config实例创建和Redis的连接管理器 connectionManager = ConfigSupport.createConnectionManager(configCopy); //创建Redis的命令执行器 commandExecutor = new CommandSyncService(connectionManager, objectBuilder); ... } ... @Override public RLock getLock(String name) { return new RedissonLock(commandExecutor, name); } ... } //创建RedissonLock实例 //通过RedissonLock实例可以获取一个命令执行器CommandExecutor; public class RedissonLock extends RedissonBaseLock { protected long internalLockLeaseTime; protected final LockPubSub pubSub; final CommandAsyncExecutor commandExecutor; public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; //与WatchDog有关的internalLockLeaseTime //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间 this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } ... } //创建Redis的命令执行器 //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager public class CommandAsyncService implements CommandAsyncExecutor { final ConnectionManager connectionManager; ... public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) { this.connectionManager = connectionManager; this.objectBuilder = objectBuilder; this.referenceType = referenceType; } @Override public ConnectionManager getConnectionManager() { return connectionManager; } ... } //创建Redis的连接管理器 //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config public class ClusterConnectionManager extends MasterSlaveConnectionManager { ... public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { super(config, id); ... } ... } //创建Redis的连接管理器 //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config public class MasterSlaveConnectionManager implements ConnectionManager { private final Config cfg; protected final String id;//初始化时为UUID ... protected MasterSlaveConnectionManager(Config cfg, UUID id) { this.id = id.toString();//传入的是UUID this.cfg = cfg; ... } @Override public Config getCfg() { return cfg; } ... } //配置信息类Config中的lockWatchdogTimeout变量初始化为30秒,该变量与WatchDog有关 public class Config { private long lockWatchdogTimeout = 30 * 1000; ... //This parameter is only used if lock has been acquired without leaseTimeout parameter definition. //Lock expires after "lockWatchdogTimeout" if watchdog didn't extend it to next "lockWatchdogTimeout" time interval. //This prevents against infinity locked locks due to Redisson client crush or any other reason when lock can't be released in proper way. //Default is 30000 milliseconds public Config setLockWatchdogTimeout(long lockWatchdogTimeout) { this.lockWatchdogTimeout = lockWatchdogTimeout; return this; } public long getLockWatchdogTimeout() { return lockWatchdogTimeout; } }
默认情况下,调用RedissonLock.lock()方法加锁时,传入的leaseTime为-1。此时锁的超时时间会设为lockWatchdogTimeout默认的30秒,从而避免出现死锁的情况。
public class RedissonLock extends RedissonBaseLock { ... //加锁 @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); ... } //解锁 @Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } ... }
(2)加锁时的执行流程
首先会调用RedissonLock的tryAcquire()方法处理异步RFuture相关,然后调用RedissonLock的tryAcquireAsync()方法对执行脚本的结果进行处理,接着调用
RedissonLock.tryLockInnerAsync方法执行加锁的lua脚本。
public class RedissonLock extends RedissonBaseLock { protected long internalLockLeaseTime; protected final LockPubSub pubSub; final CommandAsyncExecutor commandExecutor; public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; //与WatchDog有关的internalLockLeaseTime //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间 this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } ... //加锁 @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //线程ID,用来生成设置Hash的值 long threadId = Thread.currentThread().getId(); //尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); //ttl为null说明加锁成功 if (ttl == null) { return; } //加锁失败时的处理 CompletableFuture
future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { commandExecutor.getNow(future).getLatch().acquire(); } else { commandExecutor.getNow(future).getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(commandExecutor.getNow(future), threadId); } } ... private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法 //可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步 return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private
RFuture
tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture
ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime //internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage
f = ttlRemainingFuture.thenApply(ttlRemaining -> { //加锁返回的ttlRemaining为null表示加锁成功 if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); } //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
RFuture
tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand
command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()),//锁的名字:KEYS[1] unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒 getLockName(threadId)//ARGV[2],值为UUID + 线程ID ); } ... } public abstract class RedissonBaseLock extends RedissonExpirable implements RLock { final String id; final String entryName; final CommandAsyncExecutor commandExecutor; public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } protected String getLockName(long threadId) { return id + ":" + threadId; } ... } abstract class RedissonExpirable extends RedissonObject implements RExpirable { ... RedissonExpirable(CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); } ... } public abstract class RedissonObject implements RObject { protected final CommandAsyncExecutor commandExecutor; protected String name; protected final Codec codec; public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) { this.codec = codec; this.commandExecutor = commandExecutor; if (name == null) { throw new NullPointerException("name can't be null"); } setName(name); } ... protected final
V get(RFuture
future) { //下面会调用CommandAsyncService.get()方法 return commandExecutor.get(future); } ... } public class CommandAsyncService implements CommandAsyncExecutor { ... @Override public
V get(RFuture
future) { if (Thread.currentThread().getName().startsWith("redisson-netty")) { throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners"); } try { return future.toCompletableFuture().get(); } catch (InterruptedException e) { future.cancel(true); Thread.currentThread().interrupt(); throw new RedisException(e); } catch (ExecutionException e) { throw convertException(e); } } ... }
(3)加锁时执行的lua脚本
public class RedissonLock extends RedissonBaseLock { //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
RFuture
tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand
command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()),//锁的名字:KEYS[1],比如"myLock" unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒 getLockName(threadId)//ARGV[2],值为UUID + 线程ID ); } ... }
首先执行Redis的exists命令,判断key为锁名的Hash值是否不存在,也就是判断key为锁名myLock的Hash值是否存在。
一.如果key为锁名的Hash值不存在,那么就进行如下加锁处理
首先通过Redis的hset命令设置一个key为锁名的Hash值。该Hash值的key为锁名,value是一个映射。也就是在value值中会有一个field为UUID + 线程ID,value为1的映射。比如:hset myLock UUID:ThreadID 1,lua脚本中的ARGV[2]就是由UUID + 线程ID组成的唯一值。
然后通过Redis的pexpire命令设置key为锁名的Hash值的过期时间,也就是设置key为锁名的Hash值的过期时间为30秒。比如:pexpire myLock 30000。所以默认情况下,myLock这个锁在30秒后就会自动过期。
二.如果key为锁名的Hash值存在,那么就执行如下判断处理
首先通过Redis的hexists命令判断在key为锁名的Hash值里,field为UUID + 线程ID的映射是否已经存在。
如果在key为锁名的Hash值里,field为UUID + 线程ID的映射存在,那么就通过Redis的hincrby命令,对field为UUID + 线程ID的value值进行递增1。比如:hincrby myLock UUID:ThreadID 1。也就是在key为myLock的Hash值里,把field为UUID:ThreadID的value值从1累加到2,发生这种情况的时候往往就是当前线程对锁进行了重入。接着执行:pexpire myLock 30000,再次将myLock的有效期设置为30秒。
如果在key为锁名的Hash值里,field为UUID + 线程ID的映射不存在,发生这种情况的时候往往就是其他线程获取不到这把锁而产生互斥。那么就通过Redis的pttl命令,返回key为锁名的Hash值的剩余存活时间,因为不同线程的ARGV[2]是不一样的,ARGV[2] = UUID + 线程ID。
(4)执行加锁lua脚本的命令执行器逻辑
在RedissonLock的tryLockInnerAsync()方法中,会通过RedissonBaseLock的evalWriteAsync()方法执行lua脚本,即通过CommandAsyncService的evalWriteAsync()方法执行lua脚本。
在CommandAsyncService的evalWriteAsync()方法中,首先会执行CommandAsyncService的getNodeSource()方法获取对应的节点。然后执行CommandAsyncService的evalAsync()方法来执行lua脚本。
在CommandAsyncService的getNodeSource()方法中,会根据key进行CRC16运算,然后再对16384取模,计算出key的slot值。然后根据这个slot值创建一个NodeSource实例进行返回。
在CommandAsyncService的evalAsync()方法中,会将获得的NodeSource实例封装到Redis执行器RedisExecutor里。然后执行RedisExecutor,实现将脚本请求发送给对应的Redis节点处理。
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock { //从外部传入的:在创建实现了RedissonClient的Redisson实例时,初始化的命令执行器CommandExecutor final CommandAsyncExecutor commandExecutor; public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } ... protected
RFuture
evalWriteAsync(String key, Codec codec, RedisCommand
evalCommandType, String script, List<Object> keys, Object... params) { //获取可用的节点,并继续封装一个命令执行器CommandBatchService MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName()); int availableSlaves = entry.getAvailableSlaves(); CommandBatchService executorService = createCommandBatchService(availableSlaves); //通过CommandAsyncService.evalWriteAsync方法执行lua脚本 RFuture
result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params); if (commandExecutor instanceof CommandBatchService) { return result; } //异步执行然后获取结果 RFuture<BatchResult
> future = executorService.executeAsync(); CompletionStage
f = future.handle((res, ex) -> { if (ex == null && res.getSyncedSlaves() != availableSlaves) { throw new CompletionException(new IllegalStateException("Only " + res.getSyncedSlaves() + " of " + availableSlaves + " slaves were synced")); } return result.getNow(); }); return new CompletableFutureWrapper<>(f); } private CommandBatchService createCommandBatchService(int availableSlaves) { if (commandExecutor instanceof CommandBatchService) { return (CommandBatchService) commandExecutor; } BatchOptions options = BatchOptions.defaults().syncSlaves(availableSlaves, 1, TimeUnit.SECONDS); return new CommandBatchService(commandExecutor, options); } ... } public class CommandBatchService extends CommandAsyncService { ... public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) { this(executor.getConnectionManager(), options, executor.getObjectBuilder(), RedissonObjectBuilder.ReferenceType.DEFAULT); } private CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) { super(connectionManager, objectBuilder, referenceType); this.options = options; } ... } public class CommandAsyncService implements CommandAsyncExecutor { final ConnectionManager connectionManager; final RedissonObjectBuilder objectBuilder; final RedissonObjectBuilder.ReferenceType referenceType; public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) { this.connectionManager = connectionManager; this.objectBuilder = objectBuilder; this.referenceType = referenceType; } ... @Override public
RFuture
evalWriteAsync(String key, Codec codec, RedisCommand
evalCommandType, String script, List<Object> keys, Object... params) { //获取key对应的节点 NodeSource source = getNodeSource(key); //让对应的节点执行lua脚本请求 return evalAsync(source, false, codec, evalCommandType, script, keys, false, params); } //获取key对应的Redis Cluster节点 private NodeSource getNodeSource(String key) { //先计算key对应的slot值 int slot = connectionManager.calcSlot(key); //返回节点实例 return new NodeSource(slot); } //执行lua脚本 private
RFuture
evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand
evalCommandType, String script, List<Object> keys, boolean noRetry, Object... params) { if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) { CompletableFuture
mainPromise = new CompletableFuture<>(); Object[] pps = copy(params); CompletableFuture
promise = new CompletableFuture<>(); String sha1 = calcSHA(script); RedisCommand cmd = new RedisCommand(evalCommandType, "EVALSHA"); List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length); args.add(sha1); args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); //将根据key进行CRC16运算然后对16384取模获取到的NodeSource实例,封装到Redis执行器RedisExecutor中 RedisExecutor
executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd, args.toArray(), promise, false, connectionManager, objectBuilder, referenceType, noRetry); //通过执行Redis执行器RedisExecutor,来实现将lua脚本请求发送给对应的Redis节点进行处理 executor.execute(); ... } ... } ... } public class ClusterConnectionManager extends MasterSlaveConnectionManager { public static final int MAX_SLOT = 16384;//Redis Cluster默认有16384个slot ... //对key进行CRC16运算,然后再对16384取模 @Override public int calcSlot(String key) { if (key == null) { return 0; } int start = key.indexOf('{'); if (start != -1) { int end = key.indexOf('}'); if (end != -1 && start + 1 < end) { key = key.substring(start + 1, end); } } int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; return result; } ... }
(5)如何根据slot值获取对应的节点
因为最后会执行封装了NodeSource实例的RedisExecutor的excute()方法,而NodeSource实例中又会封装了锁名key对应的slot值,所以RedisExecutor的excute()方法可以通过getConnection()方法获取对应节点的连接。
其中RedisExecutor的getConnection()方法会调用到
MasterSlaveConnectionManager的connectionWriteOp()方法,该方法又会通过调用ConnectionManager的getEntry()方法根据slot值获取节点,也就是由ClusterConnectionManager的getEntry()方法去获取Redis的主节点。
其实在初始化连接管理器ClusterConnectionManager时,就已经根据配置初始化好哪些slot映射到那个Redis主节点了。
public class RedisExecutor
{ NodeSource source; ... public void execute() { ... //异步获取建立好的Redis连接 CompletableFuture
connectionFuture = getConnection().toCompletableFuture(); ... } protected CompletableFuture
getConnection() { ... connectionFuture = connectionManager.connectionWriteOp(source, command); return connectionFuture; } ... } public class MasterSlaveConnectionManager implements ConnectionManager { ... @Override public CompletableFuture
connectionWriteOp(NodeSource source, RedisCommand
command) { MasterSlaveEntry entry = getEntry(source); ... } private MasterSlaveEntry getEntry(NodeSource source) { if (source.getRedirect() != null) { return getEntry(source.getAddr()); } MasterSlaveEntry entry = source.getEntry(); if (source.getRedisClient() != null) { entry = getEntry(source.getRedisClient()); } if (entry == null && source.getSlot() != null) { //根据slot获取Redis的主节点 entry = getEntry(source.getSlot()); } return entry; } ... } public class ClusterConnectionManager extends MasterSlaveConnectionManager { //slot和Redis主节点的原子映射数组 private final AtomicReferenceArray
slot2entry = new AtomicReferenceArray<>(MAX_SLOT); //Redis客户端连接和Redis主节点的映射关系 private final Map
client2entry = new ConcurrentHashMap<>(); ... @Override public MasterSlaveEntry getEntry(int slot) { //根据slot获取Redis的主节点 return slot2entry.get(slot); } ... //初始化连接管理器ClusterConnectionManager时 //就已经根据配置初始化好那些slot映射到那个Redis主节点了 public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { ... for (String address : cfg.getNodeAddresses()) { ... CompletableFuture<Collection
> partitionsFuture = parsePartitions(nodes); Collection
partitions = partitionsFuture.join(); List<CompletableFuture
> masterFutures = new ArrayList<>(); for (ClusterPartition partition : partitions) { ... CompletableFuture
masterFuture = addMasterEntry(partition, cfg); masterFutures.add(masterFuture); } ... } ... } private CompletableFuture
addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { ... CompletionStage
connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName); connectionFuture.whenComplete((connection, ex1) -> { //成功连接时的处理 if (ex1 != null) { log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); result.completeExceptionally(ex1); return; } MasterSlaveServersConfig config = create(cfg); config.setMasterAddress(partition.getMasterAddress().toString()); //创建Redis的主节点 MasterSlaveEntry entry; if (config.checkSkipSlavesInit()) { entry = new SingleEntry(ClusterConnectionManager.this, config); } else { Set
slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet()); config.setSlaveAddresses(slaveAddresses); entry = new MasterSlaveEntry(ClusterConnectionManager.this, config); } CompletableFuture
f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress()), configEndpointHostName); f.whenComplete((masterClient, ex3) -> { if (ex3 != null) { log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3); result.completeExceptionally(ex3); return; } //为创建的Redis的主节点添加slot值 for (Integer slot : partition.getSlots()) { addEntry(slot, entry); lastPartitions.put(slot, partition); } ... }); }); ... } //添加slot到对应节点的映射关系 private void addEntry(Integer slot, MasterSlaveEntry entry) { MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry); if (oldEntry != entry) { entry.incReference(); shutdownEntry(oldEntry); } client2entry.put(entry.getClient(), entry); } ... }
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/174376.html