分布式-消息中间件介绍
阅读原文时间:2021年04月21日阅读:1

一、消息队列概述

  • 消息队列中间件是分布式系统中重要的组件

  • 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等

二、消息队列作用

主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构(分布式事务)。是大型分布式系统不可缺少的中间件。

2.1 解藕

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.2 异步消息

用户注册后,需要发注册邮件和注册短信

2.3 流量削锋

秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

  • 1.可以控制活动的人数;
  • 2.可以缓解短时间内高流量压垮应用;

  • 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;
  • 秒杀业务根据消息队列中的请求信息,再做后续处理。

2.4 日志处理

日志处理是指将消息队列用在日志处理中

  • 日志采集客户端,负责日志数据采集,定时写受写入Kafka队列;
  • Kafka消息队列,负责日志数据的接收,存储和转发;
  • 日志处理应用:订阅并消费kafka队列中的日志数据;

(1)Kafka:接收用户日志的消息队列。

(2)Logstash:做日志解析,统一成JSON输出给Elasticsearch。

(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。

(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

2.5 消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯;比如实现点对点消息队列,或者聊天室等。
点对点通讯

客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯

客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
以上实际是消息队列的两种消息模式,点对点或发布订阅模式

2.6 消息中间件应用场景

2.6.1 电商系统


消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket Mq。
(1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
(2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。
(3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

2.6.2 日志收集系统


分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。

Zookeeper注册中心,提出负载均衡和地址查找服务

日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列

Kafka集群:接收,路由,存储,转发等消息处理

Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据

三、消息模型

3.1 点对点:Queue,不可重复消费

P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

3.2 发布/订阅:Topic,可以重复消费

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub的特点

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。
    针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。

发布订阅模式

支持订阅组的发布订阅模式:

  • 发布订阅模式下,当发布者消息量很大时,显然单个订阅者的处理能力是不足的。实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,这样订阅者很容易实现消费能力线性扩展。
  • 可以看成是一个topic下有多个Queue,每个Queue是点对点的方式,Queue之间是发布订阅方式。

3.3 点对点、发布订阅区别

3.3.1 点对点模式

  • 生产者发送一条消息到queue,一个queue可以有很多消费者
  • 但是一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,所以Queue实现了一个可靠的负载均衡。

3.3.2 发布订阅模式

  • 发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
  • topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

3.4 消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。

3.4.1 同步(PULL)

订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;

3.4.2 异步(PUSH)

订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

3.4.3 JNDI

Java命名和目录接口,是一种标准的Java命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。
JNDI在JMS中起到查找和访问发送目标或消息来源的作用。

四、消息顺序、消息重复

4.1 顺序消息–生产端解决

顺序消息 就像 串行化 隔离一样,并发度不行

  • 消息有序指的是一类消息消费时,能按照发送的顺序来消费。
  • 顺序消息,应该尽可能在生产者这边做

生产者投递消息是有序的,要保证 MQ 收到的消息有序,那么消息应该投递到同一个机器的同一个队列且队列中的消息由同一个消费者消费

要实现严格的顺序消息,简单且可行的办法就是:

  • 保证生产者 - MQServer - 消费者是一对一对一的关系,类似TCP 可靠传输一样

  • 发送时间的顺序性,只能独占消费,以确保有序投递。可以给消息加上全局时间戳(全局单调递增序列),来表示消息的前后顺序,而消费端,可以在一定时间窗口内对收到的消息排序

  • 因果关系的逻辑顺序性,可根据业务状态来判断

比如生产者依次发布: 1订单创建事件,2订单支付事件,3订单发货事件消费者收到的消息顺序是: 2,3,1;那么处理时,可以丢弃消息1,因为 2 必发生在 1 之后

消息的顺序问题

  • 1.不关注乱序的应用实际大量存在
  • 2.队列无序并不意味着消息无序

源码角度分析RocketMQ怎么实现发送顺序消息

一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:

// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId

在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。

private SendResult send()  {
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根据我们的算法,选择一个发送队列
        // 这里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}

4.2 重复消息 –消费端解决

造成消息的重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

  • 1.消费端处理消息的业务逻辑保持幂等性
  • 2.保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

为了确保消息可靠,很多时候,我们要保证消息多次生成,比如RocketMQ。这时我们要设置消息的阶乘投递(1,2,6,24…),并设置最大投递次数(避免消息死循环)

五、流行消息队列系统比较

  • 传统企业型消息队列ActiveMQ遵循了JMS规范,实现了点对点和发布订阅模型,但其他流行的消息队列RabbitMQ、Kafka并没有遵循JMS规范。

5.1 RabbitMQ

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。AQMP协议定义了消息路由规则和方式。生产端通过路由规则发送消息到不同queue,消费端根据queue名称消费消息。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后立即删除,不保留历史消息。

5.1.1 几个重要概念:

Broker简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

producer:消息生产者,就是投递消息的程序。

consumer:消息消费者,就是接受消息的程序。

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

5.1.2 RabbitMQ消息队列的使用过程

(1)客户端连接到消息队列服务器,打开一个channel。

(2)客户端声明一个exchange,并设置相关属性。

(3)客户端声明一个queue,并设置相关属性。

(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

(5)客户端投递消息到exchange。

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里

5.1.3点对点

生产端发送一条消息通过路由投递到Queue,只有一个消费者能消费到。

5.1.4 多订阅

当RabbitMQ需要支持多订阅时,发布者发送的消息通过路由同时写到多个Queue,不同订阅组消费不同的Queue。所以支持多订阅时,消息会多个拷贝

5.2 Kafka

  • Kafka只支持消息持久化,消费端为拉模型,消费状态和订阅关系由客户端端负责维护,消息消费完后不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。但是可能产生重复消费的情况。

点对点&多订阅

发布者生产一条消息到topic中,不同订阅组消费此消息。

5.3 RocketMQ

顺序消费与重复消费
https://blog.csdn.net/lovesomnus/article/details/51776942

六、MQ性能对比及选型

6.1 MQ性能对比

6.2 从社区活跃度

按照目前网络上的资料,RabbitMQ 、activeM 、ZeroMQ 三者中,综合来看,RabbitMQ 是首选。

6.3 持久化消息比较

ZeroMq 不支持,ActiveMq 和RabbitMq 都支持。持久化消息主要是指我们机器在不可抗力因素等情况下宕机了,消息不会丢失的机制。

6.4 综合技术实现

可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。当然ZeroMq 也可以做到,不过自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。

6.5 高并发

毋庸置疑,RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言。

6.6 比较关注的比较, RabbitMQ 和 Kafka

RabbitMq 比Kafka 成熟,在可用性上,稳定性上,可靠性上, RabbitMq 胜于 Kafka (理论上)。RabbitMQ使用ProtoBuf序列化消息。极大的方便了Consumer的数据高效处理,与XML相比,ProtoBuf有以下优势:

  • 1.简单
  • 2.size小了3-10倍
  • 3.速度快了20-100倍
  • 4.易于编程
  • 5.减少了语义的歧义.

ProtoBuf具有速度和空间的优势,使得它现在应用非常广泛。
另外,Kafka 的定位主要在日志等方面, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq 。
还有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出来很多。

6.7 选型最后总结:

如果我们系统中已经有选择 Kafka,或者 RabbitMq,并且完全可以满足现在的业务,建议就不用重复去增加和造轮子。
可以在 Kafka 和 RabbitMq 中选择一个适合自己团队和业务的,这个才是最重要的。但是毋庸置疑现阶段,综合考虑没有第三选择