【mq读书笔记】消费进度管理
阅读原文时间:2022年05月12日阅读:1

从前2节可以看到,一次消费后消息会从ProcessQueue处理队列中移除该批消息,返回ProcessQueue最小偏移量,并存入消息进度表中。那消息进度文件存储在哪合适呢?

广播模式:同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的,互相不影响,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定

集群模式:同一个消费组内的所有消费者共享消息主题下的所有消息,同一个消息消费队列在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度需要保存在一个每个消费者都能访问到的地方。

public interface OffsetStore {
/**
* Load
*/
void load() throws MQClientException;//从消息进度存储文件加载消息进度到内存。

/\*\*  
 \* Update the offset,store it in memory  
 \*/  
void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);//更新内存中的消息消费进度

/\*\*  
 \* Get offset from local storage  
 \*  
 \* @return The fetched offset  
 \*/  
long readOffset(final MessageQueue mq, final ReadOffsetType type);//读取消息消费进度

/\*\*  
 \* Persist all offsets,may be in local storage or remote name server  
 \*/  
void persistAll(final Set<MessageQueue> mqs);//持久化指定消息队列进度到磁盘

/\*\*  
 \* Persist the offset,may be in local storage or remote name server  
 \*/  
void persist(final MessageQueue mq);

/\*\*  
 \* Remove offset  
 \*/  
void removeOffset(MessageQueue mq);//将消息队列的消息消费进度从内存溢出

/\*\*  
 \* @return The cloned offset table of given topic  
 \*/  
Map<MessageQueue, Long> cloneOffsetTable(String topic);//克隆该主题下所有消息队列的消息消费进度。

/\*\*  
 \* @param mq  
 \* @param offset  
 \* @param isOneway  
 \*/  
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,  
    MQBrokerException, InterruptedException, MQClientException;//更新存储在Broker端的消息消费进度,使用集群模式  

}

public class LocalFileOffsetStore implements OffsetStore {
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");//消息进度存储目录,通过rocketmq.client.localOffsetStoreDir,如果未指定,默认为用户主目录/.rocketmq_offsets.
private final static InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;//消息客户端
private final String groupName;//消费消费组
private final String storePath;//消息进度存储文件
private ConcurrentMap offsetTable =
new ConcurrentHashMap();消息消费进度(内存)

加载顺序:storePath->storePath+".bak"

集群模式消费进度存储:

消费进度读取与广播模式相似,如果从内存读取,直接读缓存,如果从磁盘读取则需要请求Broker,命令为QUERY_CONSUMER_OFFSET。持久化消息进度命令为UPDATE_CONSUMER_OFFSET,更新ConsumerOffsetManager的ConcurrentMap>offsetTable,Broker端默认10s持久化一次消费进度。存储文件名:{RocketMQ_HOME}/store/config/consumerOffset.json