【mq读书笔记】mq读写分离机制
阅读原文时间:2022年05月12日阅读:1

mq根据MessageQueue查找Broker地址的唯一依据是brokerName,同一组Broker(M-S)他们的bokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0,rokckertMq提供mQClientFactory.findBrokerAddressInSubscribe来实现根据brokerName,brokerId查找Broker地址

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl:

public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);

FindBrokerResult:

public class FindBrokerResult {
private final String brokerAddr;
private final boolean slave;
private final int brokerVersion;

org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe:

public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName,
final long brokerId,
final boolean onlyThisBroker
) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;

      //获取brokerName下的所有broker
HashMap map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
       //根据brokerId获取broker地址
brokerAddr = map.get(brokerId);
slave = brokerId != MixAll.MASTER_ID;
found = brokerAddr != null;

        if (!found && !onlyThisBroker) {  
            Entry<Long, String> entry = map.entrySet().iterator().next();  
            brokerAddr = entry.getValue();  
            slave = entry.getKey() != MixAll.MASTER\_ID;  
            found = true;  
        }  
    }

    if (found) {  
        return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));  
    }

    return null;  
}

从ConcurrentMap> brokerAddrTable地址缓存表中根据brokerName获取所有的Broker信息。

根据brokerId从Broker主从缓存表中获取指定Broker名称,日过根据brokerId未找到相关条目,此时onlyThisBroker未false,则随机返回Broker中任意一个Broker,否则返回null。

根据消息消费队列获取brokerId的实现:

public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
}
     //pullFromWhichNodeTable是在哪初始化的?
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}

    return MixAll.MASTER\_ID;  
}

从ConcurrentMap pullFromWhichNodeTable缓存表中获取该消息消费队列的brokerId,如果找到则返回,否则返回brokerName的主节点。

消息消费拉取线程PullMessageService根据PullRequest请求从主服务器拉取消息后会返回下一次建议拉取的brokerId,消息消费者线程在收到消息后,会根据主服务器的建议拉取brokerId来更新

pullFromWhichNodeTable。

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#updatePullFromWhichNode:

public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (null == suggest) {
this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
} else {
suggest.set(brokerId);
}
}

消息服务端是根据何种规则来建议哪个消息消费队列该从哪台Broker服务器上拉取消息呢?

org.apache.rocketmq.store.DefaultMessageStore#getMessage:

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

maxoffsetPy:主服务器消息存储文件最大偏移量

maxPhyPffsetPulling:此次拉取消息最大偏移量

diff:对于PullMessageService线程来说,当前未被拉取到消息消费端的消息长度。

TOTAL_PHYSICAL_MEMORY_SIZE:

mq所在服务器总内存大小。accessMessageInMemoryMaxRatio表示RocketMQ所能使用的最大内存比例,超过该内存,消息江北置换出内存;

memory表示RocketMQ消息常驻内存的大小,超过该大小,rocketMq将会将旧的消息置换回磁盘。

如果diff大于memory,表示当前需要拉去的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取。
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean):

if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

        if (getMessageResult.isSuggestPullingFromSlave()) {  
            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());  
        } else {  
            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER\_ID);  
        }

如果一个Master拥有多台Slave服务器,参与消息卡去负载的从服务器只会是其中一个。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章