Redis 学习笔记(六)Redis 如何实现消息队列
阅读原文时间:2022年02月12日阅读:1

消息队列(Messeage Queue,MQ)是在分布式系统架构中常用的一种中间件技术,从字面表述看,是一个存储消息的队列,所以它一般用于给 MQ 中间的两个组件提供通信服务。

1.1 消息队列介绍

我们引入一个削峰填谷实际场景来介绍 MQ ,削峰填谷是指处理短时间内爆发的请求任务,将巨量请求任务“削峰”,平摊在平常请求任务较低的时间段,也就是“填谷”。 比如组件1 发布请求任务,组件2接受请求任务并处理。如果没有 MQ , 组件2 就会在大量的请求任务下会出现假死的情况:

而如果使用 MQ 后可以将这些请求先暂存到队列中,排队执行,就不会出现组件2 假死的情况了。我们一般把发送消息的组件称为生产者,接受消息的组件称为消费者,如下图展示一个消息队列的模型:

![image-20220212105352795](E:\Typora\image!

消息队列需要满足消息有序性、能处理重复的消息以及消息可靠性,这样才能保证存取消息的一致性。

  • 消息有序性:虽然消费者异步读取消息,但是要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理掉。
  • 重复消息处理:在消息队列存取信息时,有可能因为网络阻塞而出现消息重传的情况。可能会造成业务逻辑被多次执行,所以要避免重复消息的处理。
  • 消息可靠性:在组件故障时,比如消费者宕机或者没有处理完信息时,消息队列需要能提供消息可靠性保证。所以需要在消费者故障时,可以重新读取消息再次进行处理,不影响业务服务。

1.2 消息队列应用场景

主要的应用有:异步处理、流量削峰、系统解耦

1.2.1 商品秒杀

秒杀活动中,会短时间出现爆发式的用户请求,如果没有消息队列,会导致服务器响应不过来。轻则会导致服务假死;重则会让服务器直接宕机。

这时可以加上消息队列,服务器接收到用户的请求后,先把这些请求全部写入消息队列中再排队处理,这样就不会导致同时处理多个请求的情况;若消息队列长度超过承载的最大数量,可以抛弃后续的消息,给用户返回“页面出错,请重新刷新”提示,这样降低服务器的负载,而且也能给用户很好的交互体验。

1.2.2 系统解耦

此外,我们可以利用消息队列来把系统的业务功能模块化,实现系统功能的解耦。如下图:

如果有两个功能服务,而且关系不是很紧密,比如订单系统和优惠券,虽然都和用户有关联,但是如果都放在用户模块,面临功能删减时会很麻烦。所以采用把两个服务独立出来,而将两个服务的消息发送以约定的方式通过消息队列发送过去,让其对应的消费者分别处理即可达到系统解耦的目的。

1.3 常见的消息队列中间件

1.3.1 RabbitMQ

1.3.1.1 RabbitMQ 介绍

RabbitMQ 是一个老牌的开源消息中间件,它实现了标准的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)消息中间件,使用 Erlang 语言开发,支持集群部署。支持 java、python、Go、.NET 等等主流开发语言。

其主要的运行流程如下图:

我们发现在 Rabbit 服务器中,它在生产者和队列间加入了交换器(ExChange)模块,它的作用和交换机很相似,它会根据配置的路由规则将生产者发出的消息分发到不同的队列中。路由规则很灵活,可以自己来进行设计。

1.3.1.2 RabbitMQ 特点
  1. 支持持久化,RabbitMQ 支持磁盘持久化功能,保证了消息不会丢失;
  2. 高并发,RabbitMQ 使用了 Erlang 开发语言,Erlang 是为电话交换机开发的语言,天生自带高并发光环和高可用特性;
  3. 支持分布式集群,正是因为 Erlang 语言实现的,因此 RabbitMQ 集群部署也非常简单,只需要启动每个节点并使用 --link 把节点加入到集群中即可,并且 RabbitMQ 支持自动选主和自动容灾;
  4. 支持多种语言,比如 Java、.NET、PHP、Python、JavaScript、Ruby、Go 等;
  5. 支持消息确认,支持消息消费确认(ack)保证了每条消息可以被正常消费;
  6. 它支持很多插件,比如网页控制台消息管理插件、消息延迟插件等,RabbitMQ 的插件很多并且使用都很方便。

因为中间中的交换器模块,所以RabbitMQ 有不同的消息类型,主要分为以下几种:

  • direct(默认类型)模式,此模式为一对一的发送方式,也就是一条消息只会发送给一个消费者;
  • headers 模式,允许你匹配消息的 header 而非路由键(RoutingKey),除此之外 headers 和 direct 的使用完全一致,但因为 headers 匹配的性能很差,几乎不会被用到;
  • fanout 模式,为多播的方式,会把一个消息分发给所有的订阅者;
  • topic 模式,为主题订阅模式,允许使用通配符(#、*)匹配一个或者多个消息,我可以使用“cn.mq.#”匹配到多个前缀是“cn.mq.xxx”的消息,比如可以匹配到“cn.mq.rabbit”、“cn.mq.kafka”等消息。

但是 Rabbit 也存在以下的问题:

  • RabbitMQ 对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。
  • RabbitMQ 的性能是这几个消息队列中最差的,大概每秒钟可以处理几万到十几万条消息。如果应用对消息队列的性能要求非常高,那不要选择 RabbitMQ。
  • RabbitMQ 使用的编程语言 Erlang,扩展和二次开发成本高

1.3.2 Kafka

1.3.2.1 Kafka 介绍

Kafka 是 LinkedIn 公司开发的基于 ZooKeeper 的多分区、多副本的分布式消息系统,它于 2010 年贡献给了 Apache 基金会,并且成为了 Apache 的顶级开源项目。其中 ZooKeeper 的作用是用来为 Kafka 提供集群元数据管理以及节点的选举和发现等功能。

与 RabbitMQ 不同中间的 Kafka 集群部分是由 Broker 代理和 ZooKeeper 集群组成:

1.3.2.2 Kafka 特点
  • Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。
  • Kafka 性能高效、可扩展良好并且可持久化。它的分区特性,可复制和可容错都是不错的特性。
  • Kafka 使用 Scala 和 Java 语言开发,设计上大量使用了批量和异步的思想,使得 Kafka 能做到超高的性能。Kafka 的性能,尤其是异步收发的性能,是三者中最好的,但与 RocketMQ 并没有量级上的差异,大约每秒钟可以处理几十万条消息。
  • 在有足够的客户端并发进行异步批量发送,并且开启压缩的情况下,Kafka 的极限处理能力可以超过每秒 2000 万条消息。

同时 Kafka 也有缺点:

  • Kafka 同步收发消息的响应时延较高。因为其异步批量的设计带来的问题,在它的 Broker 中,很多地方都会使用这种先攒一波再一起处理的设计。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。

1.3.3 RocketMQ

1.3.3.1 RocketMQ 介绍

RocketMQ 是阿里巴巴开源的分布式消息中间件,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,后来捐赠给 Apache 软件基金会。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等

RocketMQ 要求生产者和消费者必须是一个集群。集群级别的高可用,是RocketMQ 和其他 MQ 的区别。

  • Name Server(名称服务提供者) :是一个几乎无状态节点,可集群部署,节点之间没有任何信息同步。提供命令、更新和发现 Broker 服务
  • Broker (消息中转提供者):负责存储转发消息
    • broker分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。
1.3.3.2 RocketMQ 特点
  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
  • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
  • RocketMQ 的性能比 RabbitMQ 要高一个数量级,每秒钟大概能处理几十万条消息
  • RocketMQ 的劣势是与周边生态系统的集成和兼容程度不够。

2.1 基于List 实现消息队列

List 的先进先出其实就符合消息队列对消息有序性的需求。具体实现如下图:

但是,在生产者往 List 中写入数据时,List 消息集合并不会主动地通知消费者有新消息写入。所以 Redis 提供了 brpop 命令, brpop 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。此外,消息队列通过给每一个消息提供全局唯一的 ID 号来解决分辨重复消息的需求。而消息的最后一个需求,消息可靠性如何解决呢?为了留存消息,List 类型提供brpoplpush 命令来让消费者从一个 List 中读取消息,同时, Redis 会把这个消息再插入到另一个 List 中留存。这样如果消费者处理时发生宕机,再次重启时,也可以从备份 List 中重新读取消息并进行处理。如下图:

2.2 基于发布订阅实现消息队列

Redis 主要有两种发布/订阅模式:基于频道(channel)和基于模式(pattern)的发布/订阅。

2.2.1 基于频道的发布/订阅

在 Redis 2.0 之后 Redis 就新增了专门的发布和订阅的类型,Publisher(发布者)和 Subscriber(订阅者)来实现消息队列了,它们对应的执行命令如下:

# 发布消息
publish channel "message"
# 订阅消息
subscribe channel
# 取消订阅
unsubscribe channel

2.2.2 基于模式的发布/订阅

除了订阅频道外,客户端还可以通过 psubscribe 命令订阅一个或者多个模式,从而成为这些模式的订阅者,它还会被发送给所有与这个频道相匹配的模式的订阅者,命令如下:

# 订阅模式
psubscribe pattern
# 退订模式
punsubscribe pattern

那么我们如何用发布/订阅来实现消息队列?我们可以使用模式订阅的功能,利用一个消费者"queue_"来订阅所有以"queue__"开头的消息队列。如下图:

但是发布订阅模式也存在以下缺点:

  • 无法持久化保存消息
  • 发布订阅模式是“先发后忘”的工作模式,若有订阅者离线,重连后不能消费之前的历史消息
  • 不支持消费者确认机制,稳定性无法得到保证

2.3 基于Stream 实现消息队列

然而在 Redis 5.0 之后新增了 Stream 类型,它提供了丰富的消息队列操作命令:

  • XADD:插入消息,保证 MQ 有序,可以自动生成全局唯一 ID

    # mqstream 为消息队列,消息的键是 repo 值为5
    # * 表示自动生成一个全局唯一ID
    XADD mqstream * repo 5
  • XREAD:用于读取消息,可以按 ID 读取数据,保证MQ对重复消息的处理

    # 从 1599203861727-0 起读取后续的所有消息
    XREAD BLOCK 100 STREAMS mqstream 1599203861727-0

    XREAD 后的block 配置项,类似于 brpop 命令的阻塞读取操作,后面的 100 的单位是毫秒,表示如果没有消息到来,XREAD 将阻塞 100 毫秒。

  • XREADGROUP:按消费组形式读取消息;

    # 创建名为 group1 的消费组,其消费队列是 mqstream
    XGROUP create mqstream group1 0
    # 让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息
    # 命令最后的参数 ">" 表示从第一条尚未被消费的消息开始读取
    XREADGROUP group group1 consumer1 streams mqstream >

    使用消费组的目的是让组内的多个消费者共同分担读取消息,通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

  • XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息(保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息),而 XACK 命令用于向消息队列确认消息处理已完成。

2.4 总结

List 和 Streams 实现消息队列的特点和区别:

关于 Redis 是否适合做消息队列,引用一下蒋德钧老师的看法:

Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。所以,关于是否用 Redis 做消息队列的问题,不能一概而论,我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。

https://zhuanlan.zhihu.com/p/86812691

https://kaiwu.lagou.com/course/courseInfo.htm?courseId=59#/detail/pc?id=1775

https://time.geekbang.org/column/article/284291

https://www.cnblogs.com/weifeng1463/p/12889300.html

https://pdai.tech/md/db/nosql-redis/db-redis-x-pub-sub.html

《Redis 设计与实现》

《Redis 开发与运维》