开局一张图,内容全靠“编”。
昨天晚上在群友里看到有人在讨论库存并发的问题,看到这里我就决定写一篇关于redis秒杀的文章。
我们看看一般我们库存是怎么出问题的
其实redis提供了两种解决方案:加锁和原子操作。
加锁:其实非常常见,读取数据前,客户端先获取锁,再操作。
当客户端获得锁后,一直持有直到客户端完成操作,再释放。
怎么操作呢,客户端使用分布式锁来获取锁,(使用redis或者zookeeper来实现一个分布式锁)以商品的维度来加锁,在获取到锁的线程中,按顺序执行商品的库存查询和扣减,同时实现了顺序性和原子性。
但是,但是,有问题:
1、如果使用redis来实现分布式锁,那么锁的时效性是个问题。太短了,业务还没跑完锁就释放了。太长了,如果异常,其他业务就一直阻塞等着自动释放。
2、如果使用zookeeper,确实不用担心锁释放问题(临时节点),而且一致性好,但是性能不高。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同不到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。(挖坑后续写一个redis和zookeeper实现分布式锁的文章)
所以。。继续往下看。。
原子操作:执行过程中保持原子性操作,而原子性操作是不需要加锁的,也就是无锁操作。所以既保证了并发也不会减少系统并发性能。
redis的原子操作其实也有两种方式:
1、单命令操作:多个操作在redis中一个操作完成
2、lua:多个操作写成lua脚本,以原子性方式执行单个lua脚本
Redis 是使用单线程来串行处理客户端的请求操作命令的,所以,当 Redis 执行某个命令操作时,其他命令是无法执行的,这相当于命令操作是互斥执行的。
Redis 的单个命令操作可以原子性地执行,但是在实际应用中,数据修改时可能包含多个操作,至少包括读数据、数据增减、写回数据三个操作,这显然就不是单个命令操作了,那该怎么办呢?
Redis提供INCR/DECR,将读数据、数据增减、写回数据三个操作合并为了一个,可以对数据进行增值 / 减值操作,而且它们本身就是单个命令操作,所以本身具有互斥性。可以直接帮助我们进行并发控制。
// 将商量id的库存减1
DECR id
是的,就是这么简单就搞定了扣减库存。
Redis 会把整个 Lua 脚本作为一个整体执行,在执行的过程中不会被其他命令打断,从而保证了 Lua 脚本中操作的原子性。
将要执行的操作编写到一个 Lua 脚本中,使用 Redis 的 EVAL 命令来执行脚本。
原生 EVAL 方法的使用语法如下:
EVAL script numkeys key [key ...] arg [arg ...]
script 是我们 Lua 脚本的字符串形式,numkeys 是我们要传入的参数数量,key 是我们的入参,可以传入多个,arg 是额外的入参。
但这种方式需要每次都传入 Lua 脚本字符串,不仅浪费网络开销,同时 Redis 需要每次重新编译 Lua 脚本,对于我们追求性能极限的系统来说,不是很完美。所以这里就要说到另一个命令 EVALSHA 了,原生语法如下:
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
可以看到其语法与 EVAL 类似,不同的是这里传入的不是脚本字符串,而是一个加密串 sha1。这个 sha1 是从哪来的呢?它是通过另一个命令 SCRIPT LOAD 返回的,该命令是预加载脚本用的,语法为:
SCRIPT LOAD script
将 Lua 脚本先存储在 Redis 中,并返回一个 sha1,下次要执行对应脚本时,只需要传入 sha1 即可执行对应的脚本。这完美地解决了 EVAL 命令存在的弊端,所以我们这里也是基于 EVALSHA 方式来实现的。
-- 调用Redis的get指令,查询活动库存,其中KEYS[1]为传入的参数1,即库存key
local c_s = redis.call('get', KEYS[1])
-- 判断活动库存是否充足,其中KEYS[2]为传入的参数2,即当前抢购数量
if not c_s or tonumber(c_s) < tonumber(KEYS[2]) then
return 0
end
-- 如果活动库存充足,则进行扣减操作。其中KEYS[2]为传入的参数2,即当前抢购数量
redis.call('decrby',KEYS[1], KEYS[2])
return 1
我们可以将脚本先卸载配置中心,代码执行的时候就去拉取最新的sha1。或者卸载代码里面写死。
当然这个脚本也可以扩展,比如加上IP限制等等。但是太多操作放在Lua里也会降低redis的并发性能,所以非并发控制就不写到lua了。
理论看完了,实操一下吧
跳过,不会安装的出门右拐。
我自己用podman。
podman run -p 6379:6379 --name my_redis --privileged=true -v D:\podman\redis\conf\redis.conf:/etc/redis/redis.conf -v D:\podman\redis\data:/data -d docker.io/library/redis redis-server /etc/redis/redis.conf --appendonly yes
在下.neter,就写C#代码了
[ApiController]
[Route("[controller]")]
public class HomeController : ControllerBase
{
private static string _redisConnection = "localhost:6379";
private static ConnectionMultiplexer _connMultiplexer;
private string _redisScript = @"local c_s = redis.call('get', KEYS[1])
if not c_s or tonumber(c_s) < tonumber(KEYS[2]) then
return 0
end
redis.call('decrby',KEYS[1], KEYS[2])
return 1";
private string _sha1 = string.Empty;
/// <summary>
/// 锁
/// </summary>
private static readonly object Locker = new object();
private static int _count = 0;
private static int _rushToPurchaseCount = 0;
/// <summary>
/// 获取 Redis 连接对象
/// </summary>
/// <returns></returns>
private IConnectionMultiplexer GetConnectionRedisMultiplexer()
{
if ((_connMultiplexer == null) || !_connMultiplexer.IsConnected)
{
lock (Locker)
{
if ((_connMultiplexer == null) || !_connMultiplexer.IsConnected)
{
_connMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
}
}
}
return _connMultiplexer;
}
[HttpPost("/Init")]
public IActionResult Init()
{
GetConnectionRedisMultiplexer();
return Ok();
}
[HttpPost]
public async Task<IActionResult> Post()
{
System.Diagnostics.Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
var db = _connMultiplexer.GetDatabase();
var cache = db.ScriptEvaluateAsync(_redisScript,
new RedisKey[] { "key999", "1" });
var results = (string[]?)await cache;
if (results[0] == "1")
{
Interlocked.Increment(ref _rushToPurchaseCount);
Console.WriteLine($"恭喜您抢到了,{_rushToPurchaseCount}");
}
else
{
Console.WriteLine("很遗憾,您没有抢到");
}
return Ok();
}
}
我们在redis中新增5个库存
配置一下Jmeter,100个线程3秒内跑完
家人们!准备开枪!3!2!1!上链接!
让我们恭喜这5位大冤种
Jmeter聚合报告
redis库存为0
好了,到这里就先结束了。拜拜
手机扫一扫
移动阅读更方便
你可能感兴趣的文章