Redis专题-秒杀
阅读原文时间:2023年08月18日阅读:4

Redis专题-并发/秒杀

开局一张图,内容全靠“编”。

昨天晚上在群友里看到有人在讨论库存并发的问题,看到这里我就决定写一篇关于redis秒杀的文章。

我们看看一般我们库存是怎么出问题的

其实redis提供了两种解决方案:加锁和原子操作

1.1、加锁

加锁:其实非常常见,读取数据前,客户端先获取锁,再操作。

当客户端获得锁后,一直持有直到客户端完成操作,再释放。

怎么操作呢,客户端使用分布式锁来获取锁,(使用redis或者zookeeper来实现一个分布式锁)以商品的维度来加锁,在获取到锁的线程中,按顺序执行商品的库存查询和扣减,同时实现了顺序性和原子性。

但是,但是,有问题:

1、如果使用redis来实现分布式锁,那么锁的时效性是个问题。太短了,业务还没跑完锁就释放了。太长了,如果异常,其他业务就一直阻塞等着自动释放。

2、如果使用zookeeper,确实不用担心锁释放问题(临时节点),而且一致性好,但是性能不高。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同不到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。(挖坑后续写一个redis和zookeeper实现分布式锁的文章)

所以。。继续往下看。。

1.2、原子操作

原子操作:执行过程中保持原子性操作,而原子性操作是不需要加锁的,也就是无锁操作。所以既保证了并发也不会减少系统并发性能。

redis的原子操作其实也有两种方式:

1、单命令操作:多个操作在redis中一个操作完成

2、lua:多个操作写成lua脚本,以原子性方式执行单个lua脚本

1.2.1、INCR/DECR

Redis 是使用单线程来串行处理客户端的请求操作命令的,所以,当 Redis 执行某个命令操作时,其他命令是无法执行的,这相当于命令操作是互斥执行的。

Redis 的单个命令操作可以原子性地执行,但是在实际应用中,数据修改时可能包含多个操作,至少包括读数据、数据增减、写回数据三个操作,这显然就不是单个命令操作了,那该怎么办呢?

Redis提供INCR/DECR,将读数据、数据增减、写回数据三个操作合并为了一个,可以对数据进行增值 / 减值操作,而且它们本身就是单个命令操作,所以本身具有互斥性。可以直接帮助我们进行并发控制。

// 将商量id的库存减1
DECR id

是的,就是这么简单就搞定了扣减库存。

1.2.2、Lua脚本

Redis 会把整个 Lua 脚本作为一个整体执行,在执行的过程中不会被其他命令打断,从而保证了 Lua 脚本中操作的原子性。

将要执行的操作编写到一个 Lua 脚本中,使用 Redis 的 EVAL 命令来执行脚本。

原生 EVAL 方法的使用语法如下:

EVAL script numkeys key [key ...] arg [arg ...]

script 是我们 Lua 脚本的字符串形式,numkeys 是我们要传入的参数数量,key 是我们的入参,可以传入多个,arg 是额外的入参。

但这种方式需要每次都传入 Lua 脚本字符串,不仅浪费网络开销,同时 Redis 需要每次重新编译 Lua 脚本,对于我们追求性能极限的系统来说,不是很完美。所以这里就要说到另一个命令 EVALSHA 了,原生语法如下:

EVALSHA sha1 numkeys key [key ...] arg [arg ...]

可以看到其语法与 EVAL 类似,不同的是这里传入的不是脚本字符串,而是一个加密串 sha1。这个 sha1 是从哪来的呢?它是通过另一个命令 SCRIPT LOAD 返回的,该命令是预加载脚本用的,语法为:

SCRIPT LOAD script

将 Lua 脚本先存储在 Redis 中,并返回一个 sha1,下次要执行对应脚本时,只需要传入 sha1 即可执行对应的脚本。这完美地解决了 EVAL 命令存在的弊端,所以我们这里也是基于 EVALSHA 方式来实现的。

-- 调用Redis的get指令,查询活动库存,其中KEYS[1]为传入的参数1,即库存key
local c_s = redis.call('get', KEYS[1])
-- 判断活动库存是否充足,其中KEYS[2]为传入的参数2,即当前抢购数量
if not c_s or tonumber(c_s) < tonumber(KEYS[2]) then
   return 0
end
-- 如果活动库存充足,则进行扣减操作。其中KEYS[2]为传入的参数2,即当前抢购数量
redis.call('decrby',KEYS[1], KEYS[2])
return 1

我们可以将脚本先卸载配置中心,代码执行的时候就去拉取最新的sha1。或者卸载代码里面写死。

当然这个脚本也可以扩展,比如加上IP限制等等。但是太多操作放在Lua里也会降低redis的并发性能,所以非并发控制就不写到lua了。

理论看完了,实操一下吧

2.1、安装redis

跳过,不会安装的出门右拐。

我自己用podman。

podman run -p 6379:6379 --name my_redis --privileged=true -v D:\podman\redis\conf\redis.conf:/etc/redis/redis.conf  -v D:\podman\redis\data:/data -d docker.io/library/redis redis-server /etc/redis/redis.conf --appendonly yes

2.2、代码

在下.neter,就写C#代码了

   [ApiController]
    [Route("[controller]")]
    public class HomeController : ControllerBase
    {
        private static string _redisConnection = "localhost:6379";
        private static ConnectionMultiplexer _connMultiplexer;

        private string _redisScript = @"local c_s = redis.call('get', KEYS[1])
                                        if not c_s or tonumber(c_s) < tonumber(KEYS[2]) then
                                           return 0
                                        end
                                        redis.call('decrby',KEYS[1], KEYS[2])
                                          return 1";
        private string _sha1 = string.Empty;
        /// <summary>
        /// 锁
        /// </summary>
        private static readonly object Locker = new object();

        private static int _count = 0;
        private static int _rushToPurchaseCount = 0;

        /// <summary>
        /// 获取 Redis 连接对象
        /// </summary>
        /// <returns></returns>
        private IConnectionMultiplexer GetConnectionRedisMultiplexer()
        {
            if ((_connMultiplexer == null) || !_connMultiplexer.IsConnected)
            {
                lock (Locker)
                {
                    if ((_connMultiplexer == null) || !_connMultiplexer.IsConnected)
                    {
                        _connMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
                    }
                }
            }
            return _connMultiplexer;
        }

        [HttpPost("/Init")]
        public IActionResult Init()
        {
            GetConnectionRedisMultiplexer();
            return Ok();
        }
        [HttpPost]
        public async Task<IActionResult> Post()
        {
            System.Diagnostics.Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            var db = _connMultiplexer.GetDatabase();
            var cache = db.ScriptEvaluateAsync(_redisScript,
                new RedisKey[] { "key999", "1" });

            var results = (string[]?)await cache;

            if (results[0] == "1")
            {
                Interlocked.Increment(ref _rushToPurchaseCount);
                Console.WriteLine($"恭喜您抢到了,{_rushToPurchaseCount}");
            }
            else
            {
                Console.WriteLine("很遗憾,您没有抢到");
            }
            return Ok();
        }
    }

我们在redis中新增5个库存

配置一下Jmeter,100个线程3秒内跑完

家人们!准备开枪!3!2!1!上链接!

让我们恭喜这5位大冤种

Jmeter聚合报告

redis库存为0

好了,到这里就先结束了。拜拜

github StackExchange

手把手带你搭建秒杀系统-不差毫厘:秒杀的库存与限购

Redis 核心技术与实战-无锁的原子操作:Redis如何应对并发访问?

.Net Core使用分布式缓存Redis:Lua脚本