大数跨境
0
0

如何优雅的实现分布式锁?(文末赠书)

如何优雅的实现分布式锁?(文末赠书) CppGuide
2020-06-10
2
导读:哇!涨知识了,原来分布式锁还可以这么玩。。。

今天来介绍下的是分布式锁的实现,文末再免费送 5 本重量级的书,不要错过!!!

        随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的JavaAPI并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

       在一个应用中,通常存在着不同的模式,这几种分布式锁需要结合使用,将一种分布式锁不加区别的应用到所有场景将导致较差的效果。因此,我们分别讨论这三种方式的具体实现,以及容易出现的误用。

Oracle乐观锁

      在oracle中实现乐观锁很简单,只要在相应的数据上增加一个版本控制,例如version,每次读出来的时候,把该字段也读出来,当写回去时,把该字段加1,提交之前跟数据库的该字段比较一次,如果比数据库的值大的话,就允许保存,否则不允许保存。如下所示:

       会话1操作如下:

select id, balance, version from account where id="1";查询结果:id=1, balance=1000, version=1update accountset balance=balance+100, version=version+1where id="1" and version=1;select id, balance, version from account where id="1";查询结果:id=1, balance=1100, version=2

         会话2操作如下:

select id, balance, version from account where id="1";查询结果:id=1, balance=1000, version=1update accountset balance=balance-50, version=version+1where id="1" and version=1;select id, balance, version from account where id="1";查询结果:id=1, balance=1100, version=2

      会话1已修改成功,实际account.balance=1100、account.version=2,会话2也将版本号加1(version=2)试图向数据库提交数据(balance=950),但此时比对数据库记录版本时发现,操作员B提交的数据版本号为2,数据库记录当前版本也为2,不满足提交版本必须大于记录当前版本才能执行更新的乐观锁策略,因此会话2的提交不会生效。                             


 Redis分布式锁

      使用Redis使用分布锁时,主要有两种方式,一种是基于原生的Redis客户端,使用setnx、getset以及expire等实现;另一种方式使用Redisson,它们就像Zookeeper的原生API与ApacheCurator。早期开发人员一般使用Redis客户端自己实现,这两年很多用户采用了Redission库。

1.基于Jedis的分布式锁

        首先通过Maven引入Jedis开源组件,在pom.xml文件加入如下依赖:

<dependency>   <groupId>redis.clients</groupId>   <artifactId>jedis</artifactId>    <version>2.9.0</version></dependency>

实现如下

public class RedisUtil {    private static finalString LOCK_SUCCESS = "OK";    private static finalString SET_IF_NOT_EXIST = "NX";    private static finalString SET_WITH_EXPIRE_TIME = "PX";    /**     * 尝试获取分布式锁。加锁代码满足我们可靠性里描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。     * @param jedis Redis客户端     * @param lockKey 锁     * @param requestId 请求标识     * @param expireTime 超期时间     * @return 是否获取成功     */    public booleantryGetDistributedLock(Jedis jedis, String lockKey, String requestId, intexpireTime) {        try (Jedis jedis = jedisPool.getResource()){                        String result = jedis.set(namespace +lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);              if (LOCK_SUCCESS.equals(result)) {                  return true;              }              return false;        }}private staticfinal Long RELEASE_SUCCESS = 1L;    /**     * 释放分布式锁     * @param jedis Redis客户端     * @param lockKey 锁     * @param requestId 请求标识     * @return 是否释放成功     */    public static booleanreleaseDistributedLock(Jedis jedis, String lockKey, String requestId) {        String script ="if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";        try (Jedis jedis =jedisPool.getResource()) {              Object result = jedis.eval(script,Collections.singletonList(namespace + lockKey),Collections.singletonList(requestId));              if (RELEASE_SUCCESS.equals(result)) {                  return true;              }              return false;        }    }}

2.基于Redisson的分布式锁

       Redisson(https://redisson.org/)是Redis官网推荐的Java语言实现分布式锁的项目,支持各种部署架构的redis,包括单机、集群(Cluster)、哨兵(Sentinel)以及主从(Master/Slave),广泛被各大厂使用。当然Redisson的功能远不止分布式锁,还包括其他一些分布式结构,但是本节主要讨论其分布式锁实现。Redisson实现了JDK中的Lock接口,用法和JDK的锁很类似,只不过Redssion的锁是分布式实现。其伪代码如下:

RLock lock = redisson.getLock("MyLockName");lock.lock();try {    // do sth.} finally {    lock.unlock();}

       首先引入Redisson库依赖:

<dependency>      <groupId>org.redisson</groupId>      <artifactId>redisson</artifactId>      <version>2.2.3</version></dependency>
连接管理器实现如下:
// Redission连接管理器public class RedissonManager {    private static finalString RAtomicName = "genId_";    private static Configconfig = new Config();    private static Redissonredisson = null;    public static void init(){        try {           config.useClusterServers() //这是用的集群server                   .setScanInterval(2000) //设置集群状态扫描时间                   .setMasterConnectionPoolSize(10000) //设置连接数                    .setSlaveConnectionPoolSize(10000)                   .addNodeAddress("127.0.0.1:6379","127.0.0.1:6380");            redisson =Redisson.create(config);        }catch (Exception e){           e.printStackTrace();        }    }    public static RedissongetRedisson(){        return redisson;    }}

基于Redisson的分布式锁工具类实现:

public class DistributedRedisLock {    private static Redissonredisson = RedissonManager.getRedisson();    private static finalString LOCK_TITLE = "redisLock_";    public static voidacquire(String lockName){        String key =LOCK_TITLE + lockName;        RLock mylock =redisson.getLock(key);        mylock.lock(2,TimeUnit.MINUTES); //lock提供带timeout参数,timeout结束强制解锁,防止死锁       System.err.println("======lock======"+Thread.currentThread().getName());    }    public static voidrelease(String lockName){        String key =LOCK_TITLE + lockName;        RLock mylock =redisson.getLock(key);        mylock.unlock();       System.err.println("======unlock======"+Thread.currentThread().getName());    }// 测试private static void redisLock(){       RedissonManager.init(); //初始化        for (int i = 0; i <100; i++) {            Thread t = newThread(new Runnable() {                @Override                public voidrun() {                    try {                        Stringkey = "test123";                       DistributedRedisLock.acquire(key);                       Thread.sleep(1000); //获得锁之后可以进行相应的处理                        System.err.println("======获得锁后进行相应的操作======");                       DistributedRedisLock.release(key);                       System.err.println("=============================");                    } catch(Exception e) {                       e.printStackTrace();                    }                }            });            t.start();        }    }}


 Zookeeper分布式锁

有序性是Zookeeper中非常重要的一个特性,所有的更新都是全局有序,每个更新都有唯一的时间戳,该时间戳称为zxid(Zookeeper Transaction Id)。读请求只会相对于更新有序,也就是读请求的返回结果中会带有这个最新的zxid。在正式讨论如何使用Zookeeper实现分布式锁前,我们需要先了解下Zookeeper中关于节点的几个特性:

l 有序节点:假如当前有一个父节点为/hs/lock,我们可以在这个父节点下面创建子节点;Zookeeper提供了一个可选的有序特性,例如我们可以创建子节点/hs/lock/node-并且指明有序,那么Zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为/hs/lock/node-0000000000,下一个节点则为/hs/lock/node-0000000001,依次类推。

l 临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,Zookeeper会自动删除该节点。

l 事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,Zookeeper会通知客户端。当前Zookeeper有如下四种事件:节点创建、节点删除、节点数据修改、子节点变更。

            这几个特性是Zookeeper的关键特性,也是实现分布式锁的关键,读者应确保好好领会。下面描述使用Zookeeper实现分布式锁的算法流程,假设锁空间的根节点为/hs/lock

1)客户端连接Zookeeper,并在/hs/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/hs/lock-0000000000,第二个为/hs/lock-0000000001,以此类推。

2)客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁。

3)执行业务代码。

4)完成业务流程后,删除对应的子节点释放锁。

虽然Zookeeper原生客户端暴露的API已经比较简洁,但是友好性并不好,实现各种实际功能比较繁琐,分布式锁也不例外,所以实际中一般直接使用curator开源项目提供的Zookeeper分布式锁实现。如下所示,首先引入依赖:

    <dependency>        <groupId>org.apache.zookeeper</groupId>        <artifactId>zookeeper</artifactId>        <version>3.4.12</version>    </dependency>        <dependency>        <groupId>org.apache.curator</groupId>            <artifactId>curator-recipes</artifactId>            <version>4.0.0</version>             <!--4.0.0原生不兼容zk 3.4, 必须进行兼容性处理,否则会报KeeperErrorCode = Unimplemented异常-->                <exclusions>                    <exclusion>                         <groupId>org.apache.Zookeeper</groupId>                         <artifactId>Zookeeper</artifactId>                    </exclusion>                </exclusions>      </dependency>                 

        下面实现基于Curator的分布式锁工具类:   

package chapter6.sec63;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorDLock {      public static voidmain(String[] args) throws Exception {                     // 创建Zookeeper的客户端                     RetryPolicyretryPolicy = new ExponentialBackoffRetry(1000, 3);                     CuratorFrameworkclient = CuratorFrameworkFactory.newClient("9.20.38.223:2181",retryPolicy);                     client.start();                     // 创建分布式锁, 锁空间的根节点路径为/hs/lock                     InterProcessMutexmutex = new InterProcessMutex(client, "/hs/lock");                     mutex.acquire();                     // 获得了锁, 进行业务流程                     System.out.println("Entermutex");                     // 完成业务流程, 释放锁                     mutex.release();                     // 关闭客户端                     client.close();      }

注:本文选自北京大学出版社出版的《Oracle高性能系统架构实战大全》一书,略有改动,经出版社授权刊登于此。


赠书福利


     《Oracle高性能系统架构实战大全》介绍与 Oracle 数据库性能相关的方方面面,涵盖了 Oracle 的体系架构,其背后的运行机制包括事务、锁与闩、多版本并发,各种 Oracle 高级特性如分区、并行执行、直接路径操作,以及大数据导入 / 导出性能优化,并用具体的例子来解释每个特性,它是如何工作的、其限制是什么。
本书用通俗易懂的方式介绍如何理解并掌握 Oracle SQL 执行计划,如何高效地掌握 Oracle SQL 性能分析与优化,这是摆在众多开发人员面前的两座大山。



如何免费获得该书呢?


请在下面留言区留言你关于“分布式锁”方面的见解或经验分享,截止 2020 年 6 月 15 日 20:00 点留言点赞排名前 5 名我每人送一本感谢粉丝的持续支持。(与内容不相关的内容不会被公布哦)


注意,不符合要求的留言不会放出,为了公平起见,禁止恶意刷赞,后台已设置监控,刷赞的、不是真实点赞的一律无效。


如果没有中奖,喜欢本书的粉丝也可以在京东小程序直接购买。


【声明】内容源于网络
0
0
CppGuide
专注于高质量高性能C++开发,站点:cppguide.cn
内容 1260
粉丝 0
CppGuide 专注于高质量高性能C++开发,站点:cppguide.cn
总阅读289
粉丝0
内容1.3k