【本博客属于原创,如需转载,请注明出处:https://www.cnblogs.com/gdouzz/p/12097968.html】
最近研究库存的相关,在高峰期经常出现超卖等等情况,最后根据采用是基于Redis来实现了分布式锁,特此拿出来和大家分享。
准备工作:centos7,Redis,Nginx,以及JMeter测试工具。
在传统的程序中,我们写了如下最简单对库存操作的代码如下:
下面是基于AspNetCore.WebAPI 创建的一个对库存进行操作(减少)的接口,我相信很多同志都能够写出这种加lock来保证高并发的时候,库存不会出现超卖,这种做法的性能问题,不属于我们这篇文章的讨论范围,我们要讨论的是,这种写法到底会不会造成超卖的情况出现呢?,如果这是传统的企业内部应用,单体架构,如下图所示,也能满足需求;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using StackExchange.Redis;
namespace RedisDistributedLockMvc.Controllers
{
[Route("api/inv")]
[ApiController]
public class InvController : ControllerBase
{
private static readonly object LockObject = new object();
private static readonly ConfigurationOptions Options = new ConfigurationOptions()
{
EndPoints = { { "192.168.232.132", 6379 } },
Password = "123456"
};
\[HttpGet\]
public ActionResult<string> Get()
{
string msg = null;
lock (LockObject)
{
int invQty = GetInvQty();
if (invQty > 0)
{
invQty = invQty - 1;
SetInvQty(invQty);
msg = $"扣减成功,当前库存:{invQty}";
}
else
{
msg = "扣减失败,库存不足";
}
}
Console.WriteLine(msg);
return msg;
}
private int GetInvQty()
{
var qty = 0;
using (var conn = ConnectionMultiplexer.Connect(Options))
{
var db = conn.GetDatabase();
qty = Convert.ToInt32(db.StringGet("InvQty"));
}
return qty;
}
private void SetInvQty(int qty)
{
using (var conn = ConnectionMultiplexer.Connect(Options))
{
var db = conn.GetDatabase();
db.StringSet("InvQty", qty);
}
}
}
}
随着业务的越来越复杂,这种单体架构的形式,已经满足不了我们的正常业务需求,很多公司演变成了下面这种架构模式;
下面是我在测试环境搭建的过程(为了让自己有更深刻的体会,我建议大家按照上面的架构图搭建一个简单的环境);
1、把上面的AspNetCore代码发布到Centos 7机器上,分别指向该机器的不同端口(5000,5001),等同于部署了两份;
2、在Centos 7机器上安装Nginx,然后修改nginx配置文件,指向刚刚配置配置的地址;
特别说明:如果觉得上面操作很难,在Centos中,可以通过yum源来安装相应的软件,通过使用xshell来编写命令,如果是对文件操作的,不熟悉命令可以用WinScp这种软件进行可视化修改完后保存。然后需要特别注意的是,防火墙以及相关的端口和服务是否启动。
除此之外还要特别留意,Nginx是一个进程,刚刚两个不同的端口,分别对应不同的进程,这三个进程之间的安全是通过叫(seLinux)来管理的,如果nginx配置好之后,外网访问还是报502,可以尝试着把seLinux关闭(当然不推荐关闭,也有相关的解决方案)。
当然,如果有同学想尝试这个过程,碰到问题的也可以联系我:QQ:3484677573,说明是在博客园看到的即可。
搭建环境完毕之后,接下来开始我们的测试;
测试之前,因为要模拟高并发的环境,我们采用的jmeter作为我们压测的工具,简单的使用教程如下:
首先从官网下载和安装jmeter,安装完之后,找到安装下的bin目录,找到jmeter.bat,双击即可启动jmeter。
打开jmeter之后,添加线程组,比较简单,可以指定线程的个数等等。
接下来再添加一个HttpRequest,如下图所示,根据提示,输入相关的内容,然后点击上面的运行,即可开始测试。
至此,测试工具也准备OK了,那我们开始测试,先假设我们Redis里面有50个库存;
接下来在centos启动linux,以及运行我们的服务,如下图所示(5000和50001);
使用jmeter进行压测,调用,看看是否会出现超卖的情况。
1、先开启50个线程,进行压测,看看输出结果,两个服务输出的结果如下:
把结果设置成50,确实出现了超卖的情况,只要两台机器输出的当前库存是一样,就说明出现了超卖。
说明我们最开始的那段代码在分布式环境下,或者在我们最常见的负载均衡部署方式下面是不行的,所以就提出了我们分布式锁的解决方案。
PS:我以前也觉得上面这种加锁的方法,好像是不能用在分布式环境中,经过上面这么一折腾,我印象更加深刻了,也有了更清晰的认识。
分布式锁的解决方案,在业界内也有很多,也有很多成熟的框架,下面我们介绍一种基于Redis来实现的分布式锁解决方案;
先解释一下,前面的做法为什么不行和我们为什么要采取redis来做分布式锁
1、上面这种Lock属于进程内的锁,当只有一个进程的时候(只部署了一台服务器)是没有问题的,当存在多台服务器的时候,就会出问题;
2、之所以采取Redis来做分布式锁,Redis是单线程的,当我们有N个请求同时到达的时候,它会通过队列的形式变成串行访问;
话不多说,直接看代码
这个版本的分布式锁,我们做了最简单的考虑
1、锁超时的问题(通过对Redis官网给出的SetNx方法,对应的就是StackExchange.dll里面的 db.StringSet("InvQty222", "111", TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None);
2、执行过程中,可能出异常的情况,在finally里面释放锁;
private static readonly object LockObject = new object();
private static readonly ConfigurationOptions Options = new ConfigurationOptions()
{
EndPoints = { { "192.168.232.132", 6379 } },
Password = "123456"
};
\[HttpGet\]
public ActionResult<string> Get()
{
#region 用Lock方式实现锁
//string msg = null;
//lock (LockObject)
//{
// int invQty = GetInvQty();
// if (invQty > 0)
// {
// invQty = invQty - 1;
// SetInvQty(invQty);
// msg = $"扣减成功,当前库存:{invQty}";
// }
// else
// {
// msg = "扣减失败,库存不足";
// }
//}
//Console.WriteLine(msg);
//return msg;
#endregion
#region Redis实现的第一版本分布式锁
string msg = null;
var isSuccess = SetLockVersion1("1");
//如果key存在返回的就是false
//如果key不存在返回,就set,返回true
//除此之外,我们还要考虑的是这把锁的超时时间,
//如果这把锁一直不释放(执行过程卡住了,那么要考虑把锁超时)
if (isSuccess)
{
try
{
int invQty = GetInvQty();
if (invQty > 0)
{
invQty = invQty - 1;
SetInvQty(invQty);
msg = $"扣减成功,当前库存:{invQty}";
}
else
{
msg = "扣减失败,库存不足";
}
}
finally
{
//还要考虑执行过程中,如果出现了异常,也要把锁给释放掉。
UnLockVersion1(); //释放锁;
}
}
else
{
msg = "资源正忙,请刷新后重试";
}
Console.WriteLine(msg);
return msg;
#endregion
}
private int GetInvQty()
{
var qty = 0;
using (var conn = ConnectionMultiplexer.Connect(Options))
{
var db = conn.GetDatabase();
qty = Convert.ToInt32(db.StringGet("InvQty"));
}
return qty;
}
private void SetInvQty(int qty)
{
using (var conn = ConnectionMultiplexer.Connect(Options))
{
var db = conn.GetDatabase();
db.StringSet("InvQty", qty);
}
}
private bool SetLockVersion1(string value)
{
using (var conn = ConnectionMultiplexer.Connect(Options))
{
var db = conn.GetDatabase();
var flag= db.StringSet("LockValue", value, TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None); //如果存在了返回false,不存在才返回true;
db.KeyExpire("LockValue", TimeSpan.FromSeconds(10));
return flag;
}
}
private bool UnLockVersion1()
{
using (var conn = ConnectionMultiplexer.Connect(Options))
{
var db = conn.GetDatabase();
return db.KeyDelete("LockValue");
}
}
按照上面的程序,我们再把代码部署到centos上,然后利用jmeter进行压测;
经过我们这么一折腾,好像超卖的现象没有出现了;但是我们上面的做法还是比较的简单,很多情况都没有考虑在里面,就比如下面这几种情况;
问题一、锁失效问题,问题根源,A线程加的锁,被B线程释放了。
为了解决这种情况,我们可以在进来的时候,存一个clientId到Redis的锁里面,再失效key的时候,判断一下,当前clientId和redis锁里面的值是否一致,如果一致,就才释放。
string msg = null;
string clientId = Guid.NewGuid().ToString();
var isSuccess = SetLockVersion1(clientId);
//如果key存在返回的就是false
//如果key不存在返回,就set,返回true
//除此之外,我们还要考虑的是这把锁的超时时间,
//如果这把锁一直不释放(执行过程卡住了,那么要考虑把锁超时)
if (isSuccess)
{
try
{
int invQty = GetInvQty();
if (invQty > 0)
{
invQty = invQty - 1;
SetInvQty(invQty);
msg = $"扣减成功,当前库存:{invQty}";
}
else
{
msg = "扣减失败,库存不足";
}
}
finally
{
if (clientId.Equals(GetLockValue()))
{
//还要考虑执行过程中,如果出现了异常,也要把锁给释放掉。
UnLockVersion1(); //释放锁;
}
}
}
else
{
msg = "资源正忙,请刷新后重试";
}
Console.WriteLine(msg);
return msg;
问题二:锁超时的问题,如果客户端1需要执行这把锁的时间大于锁设定的超时时间,该怎么做呢
1、开启一个守护线程(后台线程),假如你锁的设置30秒超时,那你每隔10秒去检查是不是还是client1持有锁,如果是那就延长10秒,类似于Watch dog思路。
这个守护线程要注意的点就是,如果锁都没人使用了,这个守护线程要及时的关闭,不能一直开启着,如果不需要延长时间即不必要去延长。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章