消息队列的一些场景及源码分析,RocketMQ使用相关问题及性能优化 https://www.cnblogs.com/yizhiamumu/p/16694126.html
消息队列的对比测试与RocketMQ使用扩展 https://www.cnblogs.com/yizhiamumu/p/16677881.html
消息队列为什么选用redis?聊聊如何做技术方案选型?https://www.cnblogs.com/yizhiamumu/p/16573472.html
分布式事务原理及解决方案案例https://www.cnblogs.com/yizhiamumu/p/16662412.html
分布式事务实战方案汇总 https://www.cnblogs.com/yizhiamumu/p/16625677.html
消息队列初见:一起聊聊引入系统mq 之后的问题https://www.cnblogs.com/yizhiamumu/p/16573472.html
参考:消息队列为什么选用redis?聊聊如何做技术方案选型?https://www.cnblogs.com/yizhiamumu/p/16573472.html
近几年,确实出现了很多消息队列解决方案,但其实去分析每种消息队列,会发现他们诞生的背景和要针对性解决的问题是不一样的。
回到 RocketMQ,大家能从近两年 RocketMQ 在社区的一系列动作中发现,RocketMQ 同时在消息、事件、流三个领域都有发力,逐渐演进至一个超融合处理平台。作为一个融合的数据处理平台,RocketMQ 当前在开源的布局看起来是与业界多个 MQ 趋同,在 RocketMQ 开源的背后其实是商业上真实的需求驱动。
一般讲性能,其实就是吞吐量和延迟两个指标。
对于吞吐量来讲,RocketMQ 在 2017 年就能优化到单机 50W 的 TPS。如果是在批量的场景,实际上从生产环境的稳定性,以及业务消息的重要性来讲,各个消息队列都能轻易地打满网络带宽或者磁盘资源。
也就是说,性能一般情况下差异都不大,是很难作为一个产品的核心竞争力的,除非是架构层面有限制。
延迟就是一个非常重要的指标了,在线业务对于是 2ms 延时和 5ms 延时基本上都能接受,但非常难以接受的是经常性有秒级的毛刺(在延迟这个指标后面长尾延迟)。
除了上述两点,弹性和可扩展能力也是非常重要的。
消息我们可以直接在内存中使用数组或者队列来存储数据即可。性能非常高。
但是有几方面的缺点
既然要存储数据,就需要解决数据存哪里?从存储方式来看,主要有几个方面:
性能,吞吐量,本质上就是数据结构的设计决定的。我们看看上面数据存储方式对应的数据结构
存储
数据结构
写放大
读放大
mysql
B+ tree
写一条数据需要两次写入1、数据写入是按页为单位进行写的,假设页的大小为B 字节,那么写放大为Θ(B)(最坏的结果)2、为了避免在写页的过程中出现故障,需要写入redo log(WAL)
既支持随机读取又支持范围查找的系统。读放大为O(logBN/B),数据量大的适合性能会急剧下降,常规是b+ tree 超过4层,大约2000万记录是临界点
rocketdb
LSM tree
Memtable/SSTable实现,写的话也变成顺序写了(这一点是极大的优化点),但是后台会出现多路归并算法来合并,这个过程占用磁盘IO 会到当前消息的读写有扰动,写放大Θ(klogkN/B)
读的顺序是MemTable->分层的sst ,性能会比B+ tree 略差,读放大Θ((log2N/B)/logk)
文件系统
append only log
直接在文件末尾追加,所有的的写都是顺序的,因此性能极高
不支持根据内容进行检索,只能根据文件偏移量执行查询
mysql 在大数据量的情况,性能会急剧下降,并且扩展性非常不友好。
分布式KV 存储 天然的分布式系统,对大数据量和未来的扩展都问题不大,LSM tree 对写性能和吞吐都比mysql 要好。查询其实是可以通过缓存等手段去优化,可以考虑。
但是,满足性能和吞吐量最优的毫无疑问是使用文件系统,因为消息不需要修改,读和写都是顺序读写,性能极高。
但是现实中的需求我们可能需要使用多个队列来完成不同的业务。比如一个队列来处理订单相关的业务,一个队列来处理商品相关的业务等等。那么我们该如何调整呢?
我们都知道文件 append only log 的方式是不支持根据消息的内容来搜索的,如果所有的队列的数据存在一个文件中,是没办法满足需求的。
换个思路,一个队列一个文件我们就可以绕开根据内容检索的需求,kafka 就是这么玩的。
这个时候,每个队列一个文件,读写还是顺序的吗?
我们现在面临的问题是,作为一款面向业务的高性能消息中间件,随着业务的复杂度变高,队列数量是急剧变大的。
如果要保证写入的吞吐量和性能,还需要得所有的队列都写在同一个文件。
但是,按照队列消费的场景就意味着要根据消息内容(队列名字)来进行消费,append only log 是不支持检索的,如何解决这个问题。
我们会增加一个索引来处理慢sql 。我们是否也可以建立一个队列的索引,每一个队列就是一个索引文件。
读取数据的时候,先从索引队列找到消息在文件的偏移量后,在到数据文件去读取。
那么,索引的文件的数量变大的之后,那么对索引文件的读写不就是又变成随机读写了吗?性能又会急剧下降?
一个一个来解决:
(rocketmq 中数据文件称为:commitlog, topic索引文件称为 consumeQueue)
方案
优点
缺点
每一个queue 都单独一个文件
消费的时候不需要独立建立一个索引,系统复杂度降低,并且性能高
当queue 很多的时候,并且每个queue 的数据量都不是很大情况,就会存在很多小文件,写和读都讲变成随机读,性能会受到影响
所有queue 共享一个文件
所有的写都是顺序写的,性能比较高,可以支撑大量queue 性能也不至于下降的厉害
1、需要建立独立的索引文件,查询数据的链路变长,需要先从索引查到数据再到数据文件查询2、索引队列本身也是小文件,好在因为数据量少,基本可以常驻内存3、读变成随时读,不过整体还是顺序读
我们得出结论:选择文件系统,append only log.根据消息队列即时消费和顺序读写的特点,刚写入的内容还在page cache ,就被读走了,甚至都不需要回到磁盘,性能会非常高。
如果所有的数据都存在一个commitlog 文件的话,随着数据量变大,文件必然会非常大。
解决方案是,我们大文件切换成小文件,每个文件固定大小1G,写满了就切换到一个新的文件
消息队列的第一个特点就是数据量大,一台机器容易面临瓶颈,因此我们需要把数据均衡的分发到各个机器上。
解决方案是,一段很长的队列平均切成N份,把这N份分别放到不同的机器上
虽然消息已经分成切分成为多份放到不同的机器了,但是每一份都是都只有一个副本,也就意味着,任何一台机器的硬盘坏掉的话,该机器上的消息就会丢失掉了,这是不可接受的。
行业通常的做法一份数据存多个副本,并且确保所有的副本不能全都在同一台机器。
问题来了,那么这多份数据是同步双写还是异步双写呢?
方案
优点
缺点
同步双写
数据不会丢失
性能会降低,单个RT变长
异步双写
单个RT 更加小,性能更高,吞吐量更大
数据可能会丢失
其实每个业务场景需求是不一样的,RocketMq 是支持可配置的
RocketMQ主要的存储文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件
RocketMQ
采用的是混合型的存储结构,即为Broker
单个实例下所有的队列共用一个日志数据文件(即为CommitLog
)来存储。
RocketMQ
的混合型存储结构(多个Topic
的消息实体内容都存储于一个CommitLog
中)针对Producer
和Consumer
分别采用了数据
和索引
部分相分离的存储结构,Producer
发送消息至Broker
端,然后Broker
端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog
中。
只要消息被刷盘持久化至磁盘文件CommitLog
中,那么Producer
发送的消息就不会丢失。正因为如此,Consumer
也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker
允许等待30s
的时间,只要这段时间内有新消息到达,将直接返回给消费端。
这里,RocketMQ
的具体做法是,使用Broker
端的后台服务线程—ReputMessageService
不停地分发请求并异步构建ConsumeQueue
(逻辑消费队列)和IndexFile
(索引文件)数据
所以,Broker是怎么保存数据的呢?Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。
Broker
CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G,写满后自动生成一个新的文件。
log
由于同一个topic的消息并不是连续的存储在commitlog中,消费者如果直接从commitlog获取消息效率非常低,所以通过consumequeue保存commitlog中消息的偏移量的物理地址,这样消费者在消费的时候先从consumequeue中根据偏移量定位到具体的commitlog物理文件,然后根据一定的规则(offset和文件大小取模)在commitlog中快速定位。
log
RocketMQ
对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache
、顺序读写
、零拷贝
在RocketMQ
中,ConsumeQueue
逻辑消费队列存储的数据较少,并且是顺序读取,在page cache
机制的预读取作用下,Consume Queue
文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog
消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO
调度算法,比如设置调度算法为Deadline
(此时块存储采用SSD的话),随机读的性能也会有所提升。
页缓存(PageCache
)是OS
对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS
使用PageCache
机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache
。对于数据的写入,OS
会先写入至Cache
内,随后通过异步的方式由pdflush
内核线程将Cache
内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache
的情况,OS
从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取
RocketMQ
主要通过MappedByteBuffer
对文件进行读写操作。其中,利用了NIO
中的FileChannel
模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap
的方式减少了传统IO
,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ
的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
什么是零拷贝
在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态
切换
传统文件传输示意图
所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O
的性能。零拷贝比较常见的实现方式是mmap
,这种机制在Java
中是通过MappedByteBuffer
实现的。
mmap示意图
RocketMQ
提供了两种刷盘策略:同步刷盘
和异步刷盘
同步刷盘
:在消息达到Broker
的内存之后,必须刷到commitLog
日志文件中才算成功,然后返回Producer
数据已经发送成功。异步刷盘
:异步刷盘是指消息达到Broker
内存后就返回Producer
数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog
日志文件中Broker
在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中
刷盘的最终实现都是使用NIO
中的 MappedByteBuffer.force()
将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker
把消息写到CommitLog
映射区后,就会等待写入完成
异步而言,只是唤醒对应的线程,不保证执行的时机,
RocketMQ
中的负载均衡都在Client
端完成,具体来说的话,主要可以分为Producer
端发送消息时候的负载均衡和Consumer
端订阅消息的负载均衡。
Producer
端在发送消息的时候,会先根据Topic
找到指定的TopicPublishInfo
,在获取了TopicPublishInfo
路由信息后,RocketMQ
的客户端在默认方式下selectOneMessageQueue()
方法会从TopicPublishInfo
中的messageQueueList
中选择一个队列(MessageQueue
)进行发送消息。具这里有一个sendLatencyFaultEnable
开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available
的Broker
代理。
Producer负载均衡:索引递增随机取模
public MessageQueue selectOneMessageQueue(){
//索引递增
int index = this.sendWhichQueue.incrementAndGet();
//利用索引取随机数,取余数
int pos = Math.abs(index) % this.messageQueueList.size();
if(pos<0){
pos=0;
}
return this.messageQueueList.get(pos);
}
所谓的latencyFaultTolerance
,是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency
超过550Lms
,就退避3000Lms
;超过1000L
,就退避60000L
;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue
)来发送消息,latencyFaultTolerance
机制是实现消息发送高可用的核心关键所在。
在RocketMQ
中,Consumer
端的两种消费模式(Push/Pull
)都是基于拉模式来获取消息的,而在Push
模式只是对pull
模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull
)中,均需要Consumer
端知道从Broker
端的哪一个消息队列中去获取消息。因此,有必要在Consumer
端来做负载均衡,即Broker
端中多个MessageQueue
分配给同一个ConsumerGroup
中的哪些Consumer
消费。
所谓的长轮询,就是Consumer
拉取消息,如果对应的Queue
如果没有数据,Broker
不会立即返回,而是把 PullReuqest
hold起来,等待 queue
消息后,或者长轮询阻塞时间到了,再重新处理该 queue
上的所有 PullRequest
//如果没有拉到数据
case ResponseCode.PULL_NOT_FOUND:
// broker 和 consumer 都允许 suspend,默认开启
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
//封装一个PullRequest
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//把PullRequest挂起来
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
挂起的请求,有一个服务线程会不停地检查,看queue
中是否有数据,或者超时。
PullRequestHoldService#run()
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
//检查hold住的请求
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 \* 1000) {
log.info("\[NOTIFYME\] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
是因为使用了顺序存储、Page Cache和异步刷盘。
1、我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多。
2、写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache。
3、最后由操作系统异步将缓存中的数据刷到磁盘。
二:RocketMQ
的基本架构RocketMQ
一共有四个部分组成:NameServer
,Broker
,Producer 生产者
,Consumer 消费者
,它们对应了:发现
、发
、存
、收
,为了保证高可用,一般每一部分都是集群部署的
NameServer
是一个无状态的服务器,角色类似于 Kafka
使用的 Zookeeper
,但比 Zookeeper
更轻量。
特点:
每个 NameServer
结点之间是相互独立,彼此没有任何信息交互。
Nameserver
被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer
在发送消息前从 NameServer
中获取 Topic
的路由信息也就是发往哪个 Broker
,Consumer
也会定时从 NameServer
获取 Topic
的路由信息,Broker
在启动时会向 NameServer
注册,并定时进行心跳连接,且定时同步维护的 Topic
到 NameServer
功能主要有两个:
Broker
结点保持长连接。Topic
的路由信息。消息存储和中转角色,负责存储和转发消息
Broker
内部维护着一个个 Consumer Queue
,用来存储消息的索引,真正存储消息的地方是 CommitLog
(日志文件)
单个 Broker
与所有的 Nameserver
保持着长连接和心跳,并会定时将 Topic
信息同步到 NameServer
,和 NameServer
的通信底层是通过 Netty
实现的。
消息生产者,业务端负责发送消息,由用户自行实现和分布式部署。
Producer
由用户进行分布式部署,消息由Producer
通过多种负载均衡模式发送到Broker
集群,发送低延时,支持快速失败。
RocketMQ
提供了三种方式发送消息:同步
、异步
和单向
同步发送
:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。异步发送
:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。单向发送
:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集消息消费者,负责消费消息,一般是后台系统负责异步消费。
Consumer
也由用户部署,支持PUSH
和PULL
两种消费模式,支持集群消费和广播消费,提供实时的消息订阅机制。
Pull
:拉取型消费者(Pull Consumer
)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull
称为主动消费型
Push
:推送型消费者(Push Consumer
)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push
称为被动消费类型
,但其实从实现上看还是从消息服务器中拉取消息,不同于 Pull
的是 Push
首先要注册消费监听器,当监听器处触发后才开始消费消息RocketMQ
是一个分布式消息队列,也就是消息队列
+分布式系统
作为消息队列,它是发-存-收
的一个模型,对应的就是Producer、Broker、Cosumer
;作为分布式系统,它要有服务端、客户端、注册中心,对应的就是Broker、Producer/Consumer、NameServer
主要的工作流程:RocketMQ
由NameServer
注册中心集群、Producer
生产者集群、Consumer
消费者集群和若干Broker
(RocketMQ
进程)组成:
NameServer因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。
RocketMQ的高可用主要是在体现在Broker的读和写的高可用,Broker的高可用是通过集群
和主从
实现的。
Broker可以配置两种角色:Master和Slave,Master角色的Broker支持读和写,Slave角色的Broker只支持读,Master会向Slave同步消息。
也就是说Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。
Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
如何达到发送端写的高可用性呢?
在创建 Topic 的时候,把 Topic 的多个Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组Master 仍然可用, Producer 仍然可以发送消息 RocketMQ 目前还不支持把Slave自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。
而消息在master和slave之间的同步是根据raft协议来进行的:
1、在broker收到消息后,会被标记为uncommitted状态
2、然后会把消息发送给所有的slave
3、slave在收到消息之后返回ack响应给master
4、master在收到超过半数的ack之后,把消息标记为committed
5、发送committed消息给所有slave,slave也修改状态为committed
Kafka采用Zookeeper作为注册中心(也开始逐渐去Zookeeper),
RocketMQ不使用Zookeeper其实主要可能从这几方面来考虑:
消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。
生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。
由于同步发送的一般不会出现这样使用方式。
异步发送的场景下,一般分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。
所以在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。
以下单的场景举例。
1、下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚(订单数据、MQ消息记录都不会保存)。
2、下单成功,直接返回客户端成功,异步发送MQ消息。
3、MQ回调通知消息发送结果,对应更新数据库MQ发送状态。
4、JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试
在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入。
MQ
异步回调的形式是适合大部分场景下的一种解决方案。
如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。
比如RocketMQ:
RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。
比如Kafka也可以通过配置做到:
acks=all 只有参与复制的所有节点全部收到消息,才返回生产者成功。这样的话除非所有的节点都挂了,消息才会丢失。
replication.factor=N,设置大于1的数,这会要求每个partion至少有2个副本
min.insync.replicas=N,设置大于1的数,这会要求leader至少感知到一个follower还保持着连接
retries=N,设置一个非常大的值,让生产者发送失败一直重试
虽然我们可以通过配置的方式来达到MQ本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。
所以存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。
1、消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。
2、Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。
3、Broker通过主从模式来保证高可用,Broker支持Master和Slave同步复制、Master和Slave异步复制模式,生产者的消息都是发送给Master,但是消费既可以从Master消费,也可以从Slave消费。同步复制模式可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不会丢失。
图:同步刷盘和异步刷盘
消费者丢失消息的场景1:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。
RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。
消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。(Kafka没有这些)
MQ
消费者丢失消息的场景2:消费者收到消息,但消费业务逻辑出错,消费失败。
解决:利用前面提到的MQ本地表,消费者收到消息且业务逻辑执行完毕后再更新MQ消息的状态(更新为已消费)
所以从Consumer角度分析,如何保证消息被成功消费?
因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的“有且仅有一次” 。RocketMQ择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等和消息去重。
业务幂等:第一种是保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。
消息去重:第二种是业务端,对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个惟一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。
具体做法是可以建立一个消费记录表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。
发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:
我们可以从以下几个角度来考虑:
1、消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费。
2、如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息。
3、处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状。
MQ
最初,我们发送的消息记录是落库保存了的,而转发发送的数据也保存了,那么我们就可以通过这部分数据来找到丢失的那部分数据,再单独跑个脚本重发就可以了。
如果转发的程序没有落库,那就和消费方的记录去做对比,只是过程会更艰难一点。
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。
顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指某个 Topic 下的所有消息都要保证顺序;
部分顺序消息只要保证每一组消息被顺序消费即可,比如订单消息,只要保证同一个订单 ID 个消息能按顺序消费即可。
部分顺序消息相对比较好实现,生产端需要做到把同 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从同一个Message Queue读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。
发送端使用 MessageQueueSelector 类来控制 把消息发往哪个 Message Queue 。
消费端通过使用 MessageListenerOrderly 来解决单 Message Queue 的消息被并发处理的问题。
RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。
要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后Producer Consumer 的并发设置,也要是一。简单来说,为了保证整个 Topic全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲RocketMQ的高并发、高吞吐的特性了。
有两种方案:
一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。
对消息的过滤有三种方式:
根据Tag过滤:这是最常见的一种,用起来高效简单
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
SQL 表达式过滤:SQL表达式过滤更加灵活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤
电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
RocketMQ是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
但是目前RocketMQ支持的延时级别是有限的:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
RocketMQ怎么实现延时消息的:临时存储
+定时任务
。
Broker收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的Message Queue中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标Topic的队列中,然后消费者就可以正常消费这些消息。
事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。
半事务消息:是指暂时还不能被 Consumer 消费的消息,Producer 成功发送到 Broker 端的消息,但是此消息被标记为 “暂不可投递” 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后,Consumer 才能消费此条消息。就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。
实现原理如下:
事务
依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查。
死信队列用于处理无法被正常消费的消息,即死信消息。
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;
达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列。
死信消息的特点:
死信队列的特点:
RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。
rocketmq-console 这个是官方提供了一个 WEB 项目,可以查看 rocketmq数据和执行一些操作。但是这个监控界面又没有权限控制,并且还有一些消 耗性能的查询操作,如果要提高性能,建议这个可以暂停
-XX:-UseBiasedLocking: 禁用偏向锁
RocketMQ 推荐使用 G1 垃圾回收器
-Xms8g -Xmx8g -Xmn4g:这个就是很关键的一块参数了,也是重点需要调整的,就是默认的堆大小是 8g 内存,新生代是 4g 内存。
如果是内存比较大,比如有 48g 的内存,所以这里完全可以给他们翻几倍,比如给堆内存 20g,其中新生代给 10g,甚至可以更多些,当然要留一些内存给操作系统来用
-XX:+UseG1GC -XX:G1HeapRegionSize=16m:这几个参数也是至关重要的,这是选用了G1垃圾回收器来做分代回收,对新生代和老年代都是用G1来回收。这里把G1的region大小设置为了16m,这个因为机器内存比较多,所以region 大小可以调大一些给到16m,不然用2m的region, 会导致region数量过多。
-XX:G1ReservePercent=25:这个参数是说,在 G1 管理的老年代里预留 25%的空闲内存,保证新生代对象晋升到老年代的时候有足够空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了。默认值是 10%,略微偏少,这里 RocketMQ 给调大了一些。
-XX:initiatingHeapOccupancyPercent= :30:这个参数是说,当堆内存的使用率达到 30%之后就会自动启动 G1 的并发垃圾回收,开始尝试回收一些垃圾对象。默认值是 45%,这里调低了一些,也就是提高了 GC 的频率,但是避免了垃圾对象过多,一次垃圾回收耗时过长的问题。
-XX:-OmitStackTraceInFastThrow:这个参数是说,有时候 JVM 会抛弃-些异常堆栈信息,因此这个参数设置之后,就是禁用这个特性,要把完整的异常堆栈信息打印出来。
-XX:+AIwaysPreTouch:这个参数的意思是我们刚开始指定 JVM 用多少内存,不会真正分配给他,会在实际需要使用的时候再分配给他。所以使用这个参数之后,就是强制让 JVM 启动的时候直接分配我们指定的内存,不要等到使用内存的时候再分配。
-XX:-UseLargePages:这个参数的意思是禁用大内存页,某些情况下会导致内存浪费或实例无法启动。默认启动。
# vim /etc/sysctl.conf
一个请求到 RocketMQ 的应用,一般会经过网卡、内核空间、用户空间
在操作系统级别,是可以做软中断聚合的优化。
网卡队列 CPU 绑定
缓冲区调整
队列大小调整等
文: 一只阿木木
手机扫一扫
移动阅读更方便
你可能感兴趣的文章