C#基于Redis实现分布式锁
阅读原文时间:2023年07月08日阅读:2

  【本博客属于原创,如需转载,请注明出处:https://www.cnblogs.com/gdouzz/p/12097968.html

  最近研究库存的相关,在高峰期经常出现超卖等等情况,最后根据采用是基于Redis来实现了分布式锁,特此拿出来和大家分享。

  准备工作:centos7,Redis,Nginx,以及JMeter测试工具。

  在传统的程序中,我们写了如下最简单对库存操作的代码如下:

  下面是基于AspNetCore.WebAPI 创建的一个对库存进行操作(减少)的接口,我相信很多同志都能够写出这种加lock来保证高并发的时候,库存不会出现超卖,这种做法的性能问题,不属于我们这篇文章的讨论范围,我们要讨论的是,这种写法到底会不会造成超卖的情况出现呢?,如果这是传统的企业内部应用,单体架构,如下图所示,也能满足需求;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using StackExchange.Redis;

namespace RedisDistributedLockMvc.Controllers
{
[Route("api/inv")]
[ApiController]
public class InvController : ControllerBase
{

    private static readonly object LockObject = new object();  
    private static readonly ConfigurationOptions Options = new ConfigurationOptions()  
    {  
        EndPoints = { { "192.168.232.132", 6379 } },  
        Password = "123456"  
    };

    \[HttpGet\]  
    public ActionResult<string> Get()  
    {  
        string msg = null;  
        lock (LockObject)  
        {  
            int invQty = GetInvQty();  
            if (invQty > 0)  
            {  
                invQty = invQty - 1;  
                SetInvQty(invQty);  
                msg = $"扣减成功,当前库存:{invQty}";  
            }  
            else  
            {  
                msg = "扣减失败,库存不足";  
            }  
        }  
        Console.WriteLine(msg);  
        return msg;  
    }

    private int GetInvQty()  
    {  
        var qty = 0;  
        using (var conn = ConnectionMultiplexer.Connect(Options))  
        {  
            var db = conn.GetDatabase();  
            qty = Convert.ToInt32(db.StringGet("InvQty"));  
        }  
        return qty;  
    }

    private void SetInvQty(int qty)  
    {  
        using (var conn = ConnectionMultiplexer.Connect(Options))  
        {  
            var db = conn.GetDatabase();  
            db.StringSet("InvQty", qty);  
        }  
    }  
}  

}

      随着业务的越来越复杂,这种单体架构的形式,已经满足不了我们的正常业务需求,很多公司演变成了下面这种架构模式;

  下面是我在测试环境搭建的过程(为了让自己有更深刻的体会,我建议大家按照上面的架构图搭建一个简单的环境);

  1、把上面的AspNetCore代码发布到Centos 7机器上,分别指向该机器的不同端口(5000,5001),等同于部署了两份;

  2、在Centos 7机器上安装Nginx,然后修改nginx配置文件,指向刚刚配置配置的地址;

  特别说明:如果觉得上面操作很难,在Centos中,可以通过yum源来安装相应的软件,通过使用xshell来编写命令,如果是对文件操作的,不熟悉命令可以用WinScp这种软件进行可视化修改完后保存。然后需要特别注意的是,防火墙以及相关的端口和服务是否启动。

  除此之外还要特别留意,Nginx是一个进程,刚刚两个不同的端口,分别对应不同的进程,这三个进程之间的安全是通过叫(seLinux)来管理的,如果nginx配置好之后,外网访问还是报502,可以尝试着把seLinux关闭(当然不推荐关闭,也有相关的解决方案)。

  当然,如果有同学想尝试这个过程,碰到问题的也可以联系我:QQ:3484677573,说明是在博客园看到的即可。

  搭建环境完毕之后,接下来开始我们的测试;

  测试之前,因为要模拟高并发的环境,我们采用的jmeter作为我们压测的工具,简单的使用教程如下:

  首先从官网下载和安装jmeter,安装完之后,找到安装下的bin目录,找到jmeter.bat,双击即可启动jmeter。

   打开jmeter之后,添加线程组,比较简单,可以指定线程的个数等等。

  接下来再添加一个HttpRequest,如下图所示,根据提示,输入相关的内容,然后点击上面的运行,即可开始测试。

  至此,测试工具也准备OK了,那我们开始测试,先假设我们Redis里面有50个库存;

  接下来在centos启动linux,以及运行我们的服务,如下图所示(5000和50001);

  使用jmeter进行压测,调用,看看是否会出现超卖的情况。

   1、先开启50个线程,进行压测,看看输出结果,两个服务输出的结果如下:

  把结果设置成50,确实出现了超卖的情况,只要两台机器输出的当前库存是一样,就说明出现了超卖。

    说明我们最开始的那段代码在分布式环境下,或者在我们最常见的负载均衡部署方式下面是不行的,所以就提出了我们分布式锁的解决方案。

  PS:我以前也觉得上面这种加锁的方法,好像是不能用在分布式环境中,经过上面这么一折腾,我印象更加深刻了,也有了更清晰的认识。

  分布式锁的解决方案,在业界内也有很多,也有很多成熟的框架,下面我们介绍一种基于Redis来实现的分布式锁解决方案;

  先解释一下,前面的做法为什么不行和我们为什么要采取redis来做分布式锁

  1、上面这种Lock属于进程内的锁,当只有一个进程的时候(只部署了一台服务器)是没有问题的,当存在多台服务器的时候,就会出问题;

    2、之所以采取Redis来做分布式锁,Redis是单线程的,当我们有N个请求同时到达的时候,它会通过队列的形式变成串行访问;

  话不多说,直接看代码

  这个版本的分布式锁,我们做了最简单的考虑

  1、锁超时的问题(通过对Redis官网给出的SetNx方法,对应的就是StackExchange.dll里面的 db.StringSet("InvQty222", "111", TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None);

  2、执行过程中,可能出异常的情况,在finally里面释放锁;

private static readonly object LockObject = new object();
private static readonly ConfigurationOptions Options = new ConfigurationOptions()
{
EndPoints = { { "192.168.232.132", 6379 } },
Password = "123456"
};

    \[HttpGet\]  
    public ActionResult<string> Get()  
    {  
        #region 用Lock方式实现锁  
        //string msg = null;  
        //lock (LockObject)  
        //{  
        //    int invQty = GetInvQty();  
        //    if (invQty > 0)  
        //    {  
        //        invQty = invQty - 1;  
        //        SetInvQty(invQty);  
        //        msg = $"扣减成功,当前库存:{invQty}";  
        //    }  
        //    else  
        //    {  
        //        msg = "扣减失败,库存不足";  
        //    }  
        //}  
        //Console.WriteLine(msg);  
        //return msg;  
        #endregion

        #region   Redis实现的第一版本分布式锁  
        string msg = null;  
        var isSuccess = SetLockVersion1("1");  
        //如果key存在返回的就是false  
        //如果key不存在返回,就set,返回true  
        //除此之外,我们还要考虑的是这把锁的超时时间,  
        //如果这把锁一直不释放(执行过程卡住了,那么要考虑把锁超时)  
        if (isSuccess)  
        {  
            try  
            {  
                int invQty = GetInvQty();  
                if (invQty > 0)  
                {  
                    invQty = invQty - 1;  
                    SetInvQty(invQty);  
                    msg = $"扣减成功,当前库存:{invQty}";  
                }  
                else  
                {  
                    msg = "扣减失败,库存不足";  
                }  
            }  
            finally  
            {  
                //还要考虑执行过程中,如果出现了异常,也要把锁给释放掉。  
                UnLockVersion1();  //释放锁;  
            }  
        }  
        else  
        {  
            msg = "资源正忙,请刷新后重试";  
        }  
        Console.WriteLine(msg);  
        return msg;  
        #endregion  
    }

    private int GetInvQty()  
    {  
        var qty = 0;  
        using (var conn = ConnectionMultiplexer.Connect(Options))  
        {  
            var db = conn.GetDatabase();  
            qty = Convert.ToInt32(db.StringGet("InvQty"));  
        }  
        return qty;  
    }

    private void SetInvQty(int qty)  
    {  
        using (var conn = ConnectionMultiplexer.Connect(Options))  
        {  
            var db = conn.GetDatabase();  
            db.StringSet("InvQty", qty);

        }  
    }

    private bool SetLockVersion1(string value)  
    {  
        using (var conn = ConnectionMultiplexer.Connect(Options))  
        {  
            var db = conn.GetDatabase();  
            var flag=&nbsp;db.StringSet("LockValue",&nbsp;value,&nbsp;TimeSpan.FromSeconds(900),&nbsp;When.NotExists,&nbsp;CommandFlags.None); //如果存在了返回false,不存在才返回true;  
            db.KeyExpire("LockValue", TimeSpan.FromSeconds(10));  
            return flag;  
        }  
    }

    private bool UnLockVersion1()  
    {  
        using (var conn = ConnectionMultiplexer.Connect(Options))  
        {  
            var db = conn.GetDatabase();  
            return db.KeyDelete("LockValue");  
        }  
    }

  按照上面的程序,我们再把代码部署到centos上,然后利用jmeter进行压测;

  经过我们这么一折腾,好像超卖的现象没有出现了;但是我们上面的做法还是比较的简单,很多情况都没有考虑在里面,就比如下面这几种情况;

  问题一、锁失效问题,问题根源,A线程加的锁,被B线程释放了。

  

    为了解决这种情况,我们可以在进来的时候,存一个clientId到Redis的锁里面,再失效key的时候,判断一下,当前clientId和redis锁里面的值是否一致,如果一致,就才释放。

string msg = null;
string clientId = Guid.NewGuid().ToString();
var isSuccess = SetLockVersion1(clientId);
//如果key存在返回的就是false
//如果key不存在返回,就set,返回true
//除此之外,我们还要考虑的是这把锁的超时时间,
//如果这把锁一直不释放(执行过程卡住了,那么要考虑把锁超时)
if (isSuccess)
{
try
{
int invQty = GetInvQty();
if (invQty > 0)
{
invQty = invQty - 1;
SetInvQty(invQty);
msg = $"扣减成功,当前库存:{invQty}";
}
else
{
msg = "扣减失败,库存不足";
}
}
finally
{

                if (clientId.Equals(GetLockValue()))  
                {  
                    //还要考虑执行过程中,如果出现了异常,也要把锁给释放掉。  
                    UnLockVersion1();  //释放锁;  
                }  
            }  
        }  
        else  
        {  
            msg = "资源正忙,请刷新后重试";  
        }  
        Console.WriteLine(msg);  
        return msg;

  问题二:锁超时的问题,如果客户端1需要执行这把锁的时间大于锁设定的超时时间,该怎么做呢

  1、开启一个守护线程(后台线程),假如你锁的设置30秒超时,那你每隔10秒去检查是不是还是client1持有锁,如果是那就延长10秒,类似于Watch dog思路。

  这个守护线程要注意的点就是,如果锁都没人使用了,这个守护线程要及时的关闭,不能一直开启着,如果不需要延长时间即不必要去延长。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章