ZooKeeper学习(二)ZooKeeper实现分布式锁
阅读原文时间:2023年07月08日阅读:1

  在日常开发过程中,大型的项目一般都会采用分布式架构,那么在分布式架构中若需要同时对一个变量进行操作时,可以采用分布式锁来解决变量访问冲突的问题,最典型的案例就是防止库存超卖,当然还有其他很多的控制方式,这篇文章我们讨论一下怎么使用ZooKeeper来实现分布式锁。

  前面提到的分布式锁,在ZooKeeper中可以通过Curator来实现。

  定义:Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。

开发思路

  在第一篇文章中,了解了ZooKeeper节点的概念,实现分布式锁的基本思路也是基于对节点的监听与操作从而实现的。

  • 1、创建一个父节点,并对父节点设置监听事件,实际加锁的对象为父节点下的子节点。
  • 2、若父节点下存在临时子节点,则获取锁失败,不存在子节点时,则各个线程可尝试争夺锁。
  • 3、业务逻辑执行完毕后会删除临时子节点,此时下一个进程进入时发现没有存在子节点,则创建子节点并获取锁

pom.xml


org.apache.zookeeper zookeeper 3.4.10
org.apache.curator curator-framework 2.8.0
org.apache.curator curator-recipes 2.8.0

application.yml

#zookeeper分布式锁curator服务配置
curator:
retryCount: 5 #重试次数
elapsedTimeMs: 5000 #重试间隔时间
connectString: 127.0.0.1:2181 # zookeeper 地址
sessionTimeoutMs: 60000 # session超时时间
connectionTimeoutMs: 5000 # 连接超时时间

配置类

/**
* ZK的属性
*/
@Data
@Component
@ConfigurationProperties(prefix = "curator")//获取application.yml配置的值
public class ZkProperties {

private int retryCount;//重试次数

private int elapsedTimeMs;//重试间隔时间

private String connectString;//zookeeper 地址

private int sessionTimeoutMs;//session超时时间

private int connectionTimeoutMs;//连接超时时间  

}

/**
* ZK的属性配置
*/
@Configuration//标识这是一个配置类
public class ZkConfiguration {

@Autowired  
ZkProperties zkProperties;

@Bean(initMethod = "start")  
public CuratorFramework curatorFramework() {  
    return CuratorFrameworkFactory.newClient(  
            zkProperties.getConnectString(),  
            zkProperties.getSessionTimeoutMs(),  
            zkProperties.getConnectionTimeoutMs(),  
            new RetryNTimes(zkProperties.getRetryCount(), zkProperties.getElapsedTimeMs()));  
}  

}

分布式锁工具

/**
* 分布式锁工具类
* 【类解析】
* 1、InitializingBean接口为bean提供了初始化方法的方式。
* 2、它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。
* 【锁原理】
* 1、创建一个父节点,并对父节点设置监听事件,实际加锁的对象为父节点下的子节点。
* 2、若父节点下存在临时子节点,则获取锁失败。不存在子节点时,则各个线程可尝试争夺锁。
* 3、业务逻辑执行完毕后会删除临时子节点,此时下一个进程进入时发现没有存在子节点,则创建子节点并获取锁
*/
@Slf4j
@Service
public class DistributedLockByZookeeperUtil implements InitializingBean {

private final static String ROOT\_PATH\_LOCK = "rootlock";//父节点路径  
private CountDownLatch countDownLatch = new CountDownLatch(1);//节点计数器

@Autowired  
private CuratorFramework curatorFramework;

/\*\*  
 \* 获取分布式锁  
 \*/  
public void acquireDistributedLock(String path) {  
    String keyPath = "/" + ROOT\_PATH\_LOCK + "/" + path;  
    //1、一直循环等待获取锁  
    while (true) {  
        try {  
            //2、尝试创建子节点,若子节点已经存在,则创建异常,并进入catch块代码  
            curatorFramework  
                    .create()//创建节点  
                    .creatingParentsIfNeeded()//如果父节点不存在,则在创建节点的同时创建父节点  
                    .withMode(CreateMode.EPHEMERAL)//【临时节点】创建后,会话结束节点会自动删除  
                    .withACL(ZooDefs.Ids.OPEN\_ACL\_UNSAFE)//【接入权限】任何链接都可以操作该节点  
                    .forPath(keyPath);//对应的操作路径  
            log.info("获取分布式锁成功!路径为:{}", keyPath);  
            break;  
        } catch (Exception e) {  
            //3、创建子节点失败时,即获取锁失败  
            log.info("获取分布式锁失败!路径为:{}", keyPath);  
            log.info("等待重新获取锁.......");  
            try {  
                if (countDownLatch.getCount() <= 0) {  
                    countDownLatch = new CountDownLatch(1);//重置计数器  
                }  
                //4、从新挂起当前线程,调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行  
                countDownLatch.await();  
            } catch (InterruptedException e1) {  
                e1.printStackTrace();  
            }  
        }  
    }  
}

/\*\*  
 \* 释放分布式锁  
 \*/  
public boolean releaseDistributedLock(String path) {  
    try {  
        String keyPath = "/" + ROOT\_PATH\_LOCK + "/" + path;  
        //1、查看当前节点是否已经存在  
        if (curatorFramework.checkExists().forPath(keyPath) != null) {  
            //2、若子节点存在,则删除子节点,即释放锁  
            curatorFramework.delete().forPath(keyPath);  
        }  
    } catch (Exception e) {  
        log.error("释放分布式锁错误!");  
        return false;  
    }  
    return true;  
}

/\*\*  
 \* 创建 watcher 事件  
 \*/  
private void addWatcher(String path) throws Exception {  
    String keyPath;  
    if (path.equals(ROOT\_PATH\_LOCK)) {  
        keyPath = "/" + path;  
    } else {  
        keyPath = "/" + ROOT\_PATH\_LOCK + "/" + path;  
    }  
    //1、创建子节点监听事件  
    final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);  
    //2、设置监听器初始化模式:异步初始化。初始化后会触发事件。  
    cache.start(PathChildrenCache.StartMode.POST\_INITIALIZED\_EVENT);  
    //3、创建监听事件  
    cache.getListenable().addListener((client, event) -> {  
        //4、当发生子节点移除事件时,进入if内逻辑  
        if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD\_REMOVED)) {  
            String oldPath = event.getData().getPath();  
            log.info("上一个节点 " + oldPath + " 已经被断开");  
            //5、移除的节点为监听节点的子节点时,即路径包含父节点时,进入if内逻辑  
            if (oldPath.contains(path)) {  
                //6、释放计数器,让当前的请求获取锁  
                countDownLatch.countDown();  
            }  
        }  
    });  
}

/\*\*  
 \* 创建父节点,并创建永久节点  
 \* PS:在所有的属性被初始化后调用此方法,创建父节点  
 \*/  
@Override  
public void afterPropertiesSet() {  
    //1、指定命名空间  
    curatorFramework = curatorFramework.usingNamespace("lock-namespace");  
    //2、下面代码逻辑的父节点路径  
    String path = "/" + ROOT\_PATH\_LOCK;  
    try {  
        //3、父节点不存在时,创建父节点  
        if (curatorFramework.checkExists().forPath(path) == null) {  
            curatorFramework  
                    .create()//创建节点  
                    .creatingParentsIfNeeded()//如果父节点不存在,则在创建节点的同时创建父节点  
                    .withMode(CreateMode.PERSISTENT)//【持久化节点】客户端与zookeeper断开连接后,该节点依旧存在  
                    .withACL(ZooDefs.Ids.OPEN\_ACL\_UNSAFE)//【接入权限】任何链接都可以操作该节点  
                    .forPath(path);//对应的操作路径  
        }  
        //4、添加对父节点的监听事件  
        addWatcher(ROOT\_PATH\_LOCK);  
        log.info("root path 的 watcher 事件创建成功");  
    } catch (Exception e) {  
        log.error("连接zookeeper失败,请查看日志 >> {}", e.getMessage(), e);  
    }  
}  

}

测试类

/**
* 测试类
*/
@RestController
@RequestMapping("/test")
public class TestController {

@Autowired  
private DistributedLockByZookeeperUtil distributedLockByZookeeper;//分布式锁工具类  
@Autowired  
private IUserInfoService iUserInfoService;//业务类

private final static String PATH = "test";//子节点对应路径(PS:在锁工具里面会拼接完整路径)

@GetMapping("/doSomeThings")  
public boolean doSomeThings() {  
    /\*1、获取锁\*/  
    Boolean flag;//是否已经释放锁 释放成功:true , 释放失败:false  
    distributedLockByZookeeper.acquireDistributedLock(PATH);//获取锁  
    /\*2、业务代码块\*/  
    try {  
        iUserInfoService.update();  
        UserInfoVO vo = iUserInfoService.querySingleVO(1);  
        System.out.println("剩余库存为:" + vo.getCreateStaff());  
    } catch (Exception e) {  
        e.printStackTrace();  
        //业务代码报错时及时释放锁  
        flag = distributedLockByZookeeper.releaseDistributedLock(PATH);  
    }  
    /\*3、释放锁\*/  
    flag = distributedLockByZookeeper.releaseDistributedLock(PATH);//执行成功释放锁  
    return flag;  
}

}

  压测的方法有很多,我使用的是Jmeter来进行并发调用测试类代码,测试结果分布式锁有效,这里不再写压测过程,感兴趣的亲可以看下文末的文章推荐。

参考文章:

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器