分布式锁选型和源码分析

以云看科技 2024-05-05 01:59:28

1.背景

在分布式场景中,当我们同时部署多个服务时,如果两个请求分别被发送到两个服务上,这时候编程语言及相应的类库提供的锁便不能保证共享资源的安全访问。为了实现不同进程的线程对代码和资源的同步访问,保证在多线程下处理共享数据的安全性,我们就需要用到分布式锁技术。

目前主流的实现分布式锁的组件有zookeeper、redis、etcd这三种。下面会介绍这些组件能实现分布式锁的特性,并根据源码来分析这三种分布式锁的实现方式,最后对比一下这三种方式的优劣以及我们在选型时需要考虑的问题。

2.ETCD

2.1 etcd简介

Etcd 是一个分布式,可靠的Key-Value存储系统,主要用于存储分布式系统中的关键数据。使用Go语言编写,并通过Raft一致性算法处理和确保分布式一致性,提供HTTP API,支持ssl证书,常用于键值对存储、消息发布与订阅、分布式锁等场景。最为人熟知的就是在Kubernetes中的应用,Etcd是Kubernetes集群中的一个十分重要的组件,用于保存集群所有的网络配置和对象的状态信息。

2.2 etcd能够实现分布式锁的特性

2.2.1 Revision机制

每个Key的返回结果RangeResponse中有一个KeyValue结构存储着Revision,如下代码所示:

type KeyValue struct { Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"` ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"` Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"` Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"`}

Revision是全局唯一的,它的初始为0,每进行一次put操作都会加一,我们通过Revision的大小就可以知道写操作的顺序。在实现分布式锁时,多个客户端会同时抢锁,根据Revision号的大小依次获得锁,以此来实现公平锁。

2.2.2 Watch机制

etcd中的监听机制,会对指定的键、前缀目录进行更改,并对更改时间进行通知。

HTTP/1.1到HTTP/2,轮询到推送

V2版本的client通过HTTP/1.1协议,长连接定时轮询server,最新V3版本的client采用了HTTP/2的gRPC协议,双向stream的API,实现了服务端推送功能、多路复用等机制。同时Watch也从client轮询优化成server流式推送,极大降低server端 socket、内存等资源。

MVCC保证watch的可靠性

MVCC不仅仅在etcd中解决了并发问题,而且多版本的数据提供了watch机制的可靠性。当client因网络等异常出现连接闪断后,通过版本号,它就可从server端的 boltdb中获取错过的历史事件,而无需全量同步,它是etcd Watch机制数据增量同步的核心。

2.2.3 Lease机制

即租约机制,能够对key进行过期和续期操作,通过该机制实现了key的自动删除。即TTL处理,采用小顶堆来维护key到期的时间先后问题。可以在set key的同时携带leaseID,当lease过期后所有关联的key都将被自动删除,防止不能没有手动unlock后出现的阻塞。

2.2.4 Prefix机制

类似linux中的目录结构/a/b/,在获得key的name之后,etcd会进行目录化处理,etcd中的key可以按照目录的前缀pfx来获取所有该前缀后面的key。

2.3 源码分析

etcd分布式锁在官方提供的etcdctl中本身就集成了,有赖于http api的支持,各语言的client能够非常容易的进行接入与扩展,下面我们分析一下etcd v3版本client的Mutex实现,我们的源码来自最新(2022.10.20)的main分支上的代码。(https://github.com/etcd-io/etcd)

数据结构

Mutex是一个锁的数据结构,etcd放弃了go中原本的Mutex结构,不仅保存了锁本身的信息,也保存了server信息和session对象。

type Mutex struct { s *Session // Session对象 pfx string // 锁的前缀,如 "business/lock/" myKey string // 当前持有锁的客户端的 leaseid 值(完整 Key 的组成为 pfx+"/"+leaseid) myRev int64 // 自增revision hdr *pb.ResponseHeader // etcd Server中的信息}

client/v3/concurrency/mutex.go#tryAcquire

tryAcquire是试图获取锁并返回函数,拼接生成全局唯一的key,以及可以进行重入判断,设置租约,放入到etcd中。

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { //申明并赋值session和client s := m.s client := m.s.Client() //赋值myKey为pfx+"/"+leaseid,leaseid是一个64位的整数值,每个客户端唯一 m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) //比较 Key的revision是否为0,为0表示不存在该key cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) //put key,设置租约 put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) //再获取锁,判断是否存在相同租约,租约一致,那么是可以重入的. get := v3.OpGet(m.myKey) //通过前缀获取最先创建的key,获取当前锁的真正持有者 getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) //Txn进行事务处理,判断前面的cmp条件,成立(不存在该key)执行Then中的存入key, //不成立(不存在该key)执行Else中的获取 resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() if err != nil { return nil, err } m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } return resp, nil}

client/v3/concurrency/mutex.go#lock

Lock函数是分布式锁的主逻辑,其中包括了获取锁成功与否的判断,以及没有获取锁后的阻塞操作。

func (m *Mutex) Lock(ctx context.Context) error { //上方代码块的逻辑,获取key resp, err := m.tryAcquire(ctx) if err != nil { return err } //操作失败,则获取else返回的值,即已有的revision ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { //获取锁成功 m.hdr = resp.Header return nil } client := m.s.Client() // 代码走到这里说明没有获得锁,需要等待之前的锁被释放,即revision 小于当前revision 的 kv 被删除 // 阻塞等待其他程序释放以为pfx前缀锁,并删除其他revisions _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed if werr != nil { m.Unlock(client.Ctx()) return werr } //..异常处理省略}

流程图

下图就是etcdctl中的重入锁Mutex的Lock函数的流程图,可以看出etcd中的revision、watch、lease、prefix机制共同作用从而能够实现分布式锁。

图1 etcd分布式锁流程图

3.Zookeeper

3.1 zookeeper简介

Apache ZooKeeper是Apache软件基金会的一个软件项目,它为大型分布式计算提供了开源的分布式配置服务、同步服务和命名注册。ZooKeeper曾经是Hadoop的一个子项目,但现在是一个独立的顶级项目。

3.2 zookeeper能够实现分布式锁的特性

3.2.1 临时顺序节点

临时节点的生命周期与客户端的会话绑定,一旦客户端会话失效(非TCP连接断开),那么这个节点就会被自动清理掉,zookeeper规定临时节点只能作为叶子节点。顺序节点的父节点会维护一个自增整性数字,用于子节点创建的先后顺序。结合临时节点和顺序节点的特点就是临时顺序节点。

3.2.2 Watch 机制提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者。zookeeper的Watch 机制是典型的观察者模式的实现,在客户端和服务端分别维护了ZKWatchManager和WatchManager,client在注册上事件之后,在server中,节点的增删改动会触发回调。

3.3 源码分析

zookeeper本身的命令并没有提供分布式锁的api,而是各语言client通过其特性进行封装实现的。因此找到zookeeper里面最有代表性的客户端curator,分析其源码中InterProcessMutex实现分布式锁的方式。下面我们的源码来自最新(2022.10.20)的master分支上的代码(https://github.com/apache/curator)。

org.apache.curator.framework.recipes.locks.InterProcessMutex#internalLock

curator重入锁的入口可以通过获取当前线程中的lockData来判断是否进行重入,获取锁成功后保存到LockData中。

private boolean internalLock(long time, TimeUnit unit) throws Exception{ //获取当前线程 Thread currentThread = Thread.currentThread(); //通过能否在map中取到该线程的LockData信息,来判断该线程是否已经持有锁 LockData lockData = threadData.get(currentThread); if ( lockData != null ) { //因为当前线程的锁存在,lockcount自增后返回,变成重入锁 lockData.lockCount.incrementAndGet(); return true; } //当前线程的锁不存在,进行加锁 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { //加锁成功,保存到map中 LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }

org.apache.curator.framework.recipes.locks.LockInternals#attemptLock

这个方法主要是创建临时顺序节点,并且在未获取到锁时,会进行阻塞。

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{ //开始时间 final long startMillis = System.currentTimeMillis(); //将超时时间统一转化为毫秒单位 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; //节点数据,这里为null final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; //重试次数 int retryCount = 0; //锁路径 String ourPath = null; //是否获取到锁 boolean hasTheLock = false; //是否完成 boolean isDone = false; while ( !isDone ) { isDone = true; try { //创建临时顺序节点,并返回节点路径 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); //依据返回的节点路径,判断是否抢到了锁;阻塞,直到获取锁成功 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { //在会话过期时,可能导致driver找不到临时有序节点,进行重试,失败抛出NoNodeException,进行重试 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } //获取到锁,则返回节点路径,供调用方记录到map中 if ( hasTheLock ) { return ourPath; } return null; }org.apache.curator.framework.recipes.locks.LockInternals#internalLockLoop

internalLockLoop方法包含了核心获取锁的逻辑,会设置监听,排列key目录下所有的临时有序节点进行判断并获取锁;如果获取不到锁,就会监听上一个节点的变动,直到获取锁为止。

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { //是否获取到锁 boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { //给前一个节点设置了监听器,当该节点被删除时,将会触发watcher中的回调 client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } //一直尝试获取锁 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { //返回basePath下所有的临时有序节点,并且按照后缀从小到大排列 List<String> children = getSortedChildren(); //取出当前线程创建出来的临时有序节点的名称 String sequenceNodeName = ourPath.substring(basePath.length() + 1); //判断当前节点是否处于排序后的首位,如果处于首位,则代表获取到了锁 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { //获取到锁之后,则终止循环 haveTheLock = true; } else { //这里代表没有获取到锁 //获取比当前节点索引小的前一个节点 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { //如果前一个节点不存在,则直接抛出NoNodeException,catch中不进行处理,在下一轮中继续获取锁 //如果前一个节点存在,则给它设置一个监听器,监听它的释放事件 client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); //判断是否超时 if ( millisToWait <= 0 ) { //获取锁超时,删除刚才创建的临时有序节点 doDelete = true; break; } //没超时的话,在millisToWait内进行等待 wait(millisToWait); } else { //无限期阻塞等待,监听到前一个节点被删除时,才会触发唤醒操作 wait(); } } catch ( KeeperException.NoNodeException e ) { //如果前一个节点不存在,则直接抛出NoNodeException,catch中不进行处理,在下一轮中继续获取锁 } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { //删除刚才创建出来的临时有序节点 deleteOurPath(ourPath); } } return haveTheLock; }

流程图

下图就是curator的重入锁internalLock方法的流程图,可以看出zookeeper中的监听和临时顺序节点机制共同作用从而能够实现分布式锁。

图2 curator分布式锁流程图

4.Redis

4.1 redis简介

Redis是一个开源、用C语言编写、遵守BSD协议、高性能的(key/value)分布式内存数据库。基于内存运行,它可以用作数据库、缓存和消息中间件,支持多种类型的数据结构,是当前最热门的NoSQL数据库之一。

4.2 redis能够实现分布式锁的特性

4.2.1 Redis的基本数据类型HASH

hash结构能够非常方便的存储分布式锁的key所对应的一些数据,例如在redisson分布式锁中存储着这些数据(分布式锁的key、租约时间、获取锁时的唯一值UUID+threadId、channelName)。

4.2.2 Redis的订阅发布redis的发布订阅,类似前面zk和etcd的watcher功能。通过这个功能,分布式锁中所有等待线程可以订阅锁的同一个channel,当有线程释放锁的时候,会往这个通道发送消息,此时所有等待线程都可以订阅消费这条消息,从而从等待状态中释放出来,重新尝试获取锁。

4.2.3 Lua脚本的原子性redis6.0版本前后,网络数据读写和协议解析由单线程转化为多线程,但是命令执行依旧是单线程。当某个lua脚本正在运行的时候,不会有其他脚本或Redis命令被执行。在分布式锁中,能够保证大量redis原生命令整合成一个原子脚本。4.2.4 key自带ttl

redisdb的struct中存着dict *expires,字典的键为key,字典的value为过期事件 UNIX时间戳。检查给定的键是否在过期字典中,如果存在就获取键的过期时间,检查当前UNIX时间戳是否大于键的过期时间,是就过期,否则未过期,在分布式锁中可以控制锁的时间。

4.3 源码分析

redis和zookeeper同样,本身的命令并没有提供分布式锁的能力,我们通过redis官方推荐的java客户端redisson,分析其源码Reentrant Lock中实现分布式锁的方式。下面我们的源码来自最新(2022.10.20)的master分支上的代码(https://github.com/redisson/redisson)。

org.redisson.RedissonLock#lock(long,java.util.concurrent.TimeUnit,boolean)

lock方法入口,会调用获取锁方法,创建这个key的订阅消息并发布。在获取锁失败后(锁被占用)会阻塞,并等待其他获得锁的线程的消息队列触发,再进行锁的获取。

/** * 获取锁,注意这里没有等待时间,只有指定锁的最大持有时间 * 通过interruptibly参数配置支持中断 */ private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 订阅redisson_lock__channel:{$KEY},redisson client通过Redis的订阅发布,获取到解锁的事件通知 // 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射 CompletableFuture<RedissonLockEntry> future = subscribe(threadId); //实际上内部是netty的时间轮算法HashedWheelTimer-newTimeout,触发的延迟任务,目的是确保redis命令执行完成后再执行后续操作 //时间是getTimeout() + getRetryInterval() * getRetryAttempts();注意这个时间是redis的baseconfig中的时间 pubSub.timeout(future); RedissonLockEntry entry; // 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断 if (interruptibly) { //内部就是CompletableFuture<V>的get方法,阻塞线程,直到计算完成(订阅成功)返回结果 entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } // 走到下面的循环说明Redis已经存在对应的KEY // 有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁 try { while (true) { // 死循环中获取锁,这个死循环或者抛出中断异常,或者获取到锁成功break跳出 ttl = tryAcquire(-1, leaseTime, unit, threadId); // 返回的ttl为空,说明获取到锁,跳出死循环 if (ttl == null) { break; } // 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待 // 直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走 if (ttl >= 0) { try { //tryAcquire尝试在特定时间内获取1个许可,可中断 entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { //获取一个许可,在提供一个许可前一直将线程阻塞,除非线程被中断。 entry.getLatch().acquire(); } else { //获取许可,不可以被打断 entry.getLatch().acquireUninterruptibly(); } } } } finally { // 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件 unsubscribe(entry, threadId); }// get(lockAsync(leaseTime, unit)); }

org.redisson.RedissonLock#tryAcquireAsync

获得锁方法,主要进行了lua脚本的调用,以及过期时间的修改。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // leaseTime>0 说明没过期 if (leaseTime > 0) { // 实质是异步执行加锁Lua脚本 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 否则,已经过期了,传参变为新的时间,internalLockLeaseTime是读取的配置文件里的时间 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired,如果ttlRemainingFuture是null的话,就是获得锁了 if (ttlRemaining == null) { if (leaseTime > 0) { //有剩余时间的话,内部锁剩余时间就是leaseTime internalLockLeaseTime = unit.toMillis(leaseTime); } else { //看门狗续期,本文章由于篇幅所限,不介绍这个机制 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }org.redisson.RedissonLock#tryLockInnerAsync

加锁的lua脚本,主要是用了redis中的hash结构进行存储锁的数据,并且加入了重入的判断逻辑。

/* KEYS[1] 就是 Collections.singletonList(getName()),表示分布式锁的key; ARGV[1] 就是internalLockLeaseTime,即锁的租约时间(持有锁的有效时间),默认30s; ARGV[2] 就是getLockName(threadId),是获取锁时set的唯一值 value,即UUID+threadId */ <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, // 1.如果缓存中的key不存在,则执行 hincrby 命令(hincrby key UUID+threadId 1,创建并+1), 设值重入次数1 // 然后通过 pexpire 命令设置锁的过期时间(即锁的租约时间) // 返回空值 nil ,表示获取锁成功 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果key已经存在,并且value也匹配,表示是当前线程持有的锁,则执行 hincrby 命令,重入次数加1,并且设置失效时间 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果key已经存在,但是value不匹配,说明锁已经被其他线程持有,通过 pttl 命令获取锁的剩余存活时间并返回,至此获取锁失败 "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }

流程图

下图就是reidsson的重入锁Lock方法的流程图,可以看出redis中的hash结构,lua脚本原子性,ttl,发布订阅共同作用从而能够实现分布式锁。

图3 redis分布式锁流程图

5.etcd、zookeeper与redis的对比

5.1 CAP的选择

选择AP模型放弃一致性实现分布式锁,就是redis的实现。放弃一致性具体体现在Redis的主从同步是异步进行的,如果向master发送请求修改了数据后,master突然出现异常,发生高可用切换,缓冲区的数据可能无法同步到新的master(原replica)上,导致数据不一致。针对这个问题Martin Kleppmann又提出了红锁算法(https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html),然而这个实现至少需要3个独立的redis实例,且依赖系统时钟为人诟病,后续又有在redis基础上提供付费方案Tair。尤其是基础组件需要特别考虑保A弃C的场景,因为这把复杂性暴露给了调用方。

选择CP模型放弃可用性实现分布式锁,一般用于对数据质量要求很高的场合中,而分布式锁的大多数场景恰恰符合要求数据强一致。etcd的Raft和zookeeper的ZAB来保证强一致性,主节点由于某些原因宕机,系统会在从节点中选取出来数据比较新的一个从节点作为新的主节点,从而避免数据丢失等问题。但是放弃可用性一旦网络发生分区,节点之间的信息同步时间可以无限制地延长,如果是热点业务不能及时修复,那也会导致不可预期的损失。

5.2 运维方面的考虑

在如今大多数业务系统或者稍微有点规模的系统中,redis被使用的概率非常高。在不想引入新组件情况下,从运维成本方面考虑redis应该算是优选方案。若是有足够的运维能力能处理好etcd或者zookeeper集群或项目中原本就有这些组件的团队,则可以从其他方面来考虑方案选型。

而若是考虑使用云产品的话,那么主流云厂商zookeeper的价格比redis贵了很多,redis价格有很大优势,而etcd常常作为Kubernetes的核心组件,目前没有进行单独售卖。

5.3 性能考虑

性能方面redis无疑是最快的,完全基于内存以及6.0最新的io多线程优化,使得性能方面没有敌手,而etcd和zookeeper由于要保证一致性,Raft和ZAB都会使得其性能远低于redis。

5.4 分布式锁本身API的丰富性

etcdctlMutex重入锁RWMutex读写锁

ookeeper的curatorInterProcessMutex:重入锁InterProcessSemaphoreMutex:不可重入锁InterProcessReadWriteLock:读写锁InterProcessMultiLock:联锁

redis的redissonReentrant Lock::重入锁Fair Lock:公平锁MultiLock:联锁RedLock:红锁ReadWriteLock:读写锁Semaphore:信号量PermitExpirableSemaphore:可过期性信号量CountDownLatch:闭锁

5.5 对比图

etcd

zookeeper

redis

命令支持

原生自带分布式锁命令

需要各语言client

需要各语言client

CAP

CP

CP

AP

一致性算法

Raft

ZAB

监听机制

基于HTTP/2 -gRPC api的推送watcher

自己实现的观察者模式watcher

发布订阅模式

编写语言

GO

JAVA

C

并发性能

运维成本

6.缦图中台团队中分布式锁的选型

6.1 使用原因

在业务中台的项目中,服务都是多节点部署的,如果两个请求分别被发送到两个节点上,这时候Java类库提供的锁lock和synchronized便不能保证共享资源的安全访问。为了实现不同进程的线程对代码和资源的同步访问,保证在多线程下处理共享数据的安全性,就需要用到分布式锁技术。

比如下面这段代码,业务逻辑就是更新字典分组表的同时,更新缓存中的数据。那么这段代码如果使用lock和synchronized,只能保证在单个jvm中,这代码块是被锁住的,如果在其他节点上同样有更新请求,那么这段代码就会导致数据库与缓存中的数据不一致。所以在实现上,需要在这段代码块加上分布式锁。

SysDictGroupInfo sysDictGroupInfo = new SysDictGroupInfo();BeanUtils.copyProperties(sysDictGroupInfoVo, sysDictGroupInfo);//数据库更新super.updateById(sysDictGroupInfo);//redis同步更新redissonUtil.set(SYS_DICT_GROUP_INFO + sysDictGroupInfo.getId(), JSON.toJSONString(sysDictGroupInfo));

6.2 选型

在我们团队中,我们选择了redisson来实现分布式锁。原因如下:

由于项目之中本身就包含了redis组件,不包含etcd和zookeeper,不需要额外引入组件。在Java项目中,redisson客户端已经实现了非常丰富的分布式锁API,不需要额外封装。在中台项目中,后期需要支撑大量QPS,redis的分布式锁性能无疑是最好的。

6.3 redissonClient原生使用模版

//上写锁(这边是示例,各种重入锁都行) RReadWriteLock XXXLock = redissonClient.getReadWriteLock(LOCK); try { try { //最多尝试20秒获取锁,2秒钟以后自动解锁 if (dicItemLock.writeLock().tryLock(20,2, TimeUnit.SECONDS)){ //业务 }else{ //在waittime中还没有获取到锁 throw new BadRequestException("请稍后再试"); } }catch (Exception e) { //看情况需不需要捕获,不需要就不用catch } finally { XXXLock.writeLock().unlock(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("tryLock fail lockname:{}, reason:{}", LOCK ,e.getMessage()); throw new BadRequestException("请稍后再试"); }tryLock中的参数看具体业务。InterruptedException异常需要特殊处理。尝试获取锁失败抛出异常。finally中必须要释放锁。外层的try-catch可以封装在tryLock中,不需要每个方法都写。

7.总结

分布式锁的选型需要从多方面考虑,包括使用场景,cap的选择,运维层面,性能方面,api的支持能力等等。

这些方面中,首先需要考虑的就是cap的选择,在需要强一致的场景中,那么需要在etcd和zookeeper中做选择,redis只能做到最终一致性。运维方面,如果运维能力不强,那么可以选择redis和etcd做分布式锁;如果用的是云上资源,那么redis做分布式锁最好;如果需要高性能,那么请选择redis;如果不想进行二次开发,那么curator和redisson的api支撑丰富程度足够使用了。同时项目中本身是否需要额外引入组件也需要考虑。

相信阅读完本文,读者会对这些方面有所了解,能够根据团队本身的情况进行选型,同时各组件分布式锁实现方式的源码也会对有能力自己二次封装的团队起到借鉴作用。

本文作者

杰拉尔,来自缦图互联网中心中台团队。

来源-微信公众号:缦图coder

出处:https://mp.weixin.qq.com/s/DApZNdJK66nKx-16UwVL8A

1 阅读:30

以云看科技

简介:感谢大家的关注