消息队列(四)--- RocketMQ-消息发送2
阅读原文时间:2023年07月13日阅读:3

  接着我们上一篇继续分析消息发送,上节讲到消息发送前有可能遇到 broker 失效的情况,RocketMQ 主要是采用两种策略 :

  • 重试发送

  • broker 故障延迟机制
      后者指的是当发送给某一broker失败后,会将该broker暂时排除在消息队列的选择范围内,到达某个时间点后再继续重试发送,发送的时候消耗的时长越多,那么延迟的时长就越多(就像缓存算法一样,使用得越少,越容易给淘汰)。下面介绍broker 故障延时机制

    接着昨天选取某一个消息队列的代码

    /**
    * Broker 故障延迟机制
    * 获取,判断是否失效,失效则移存记录
    *
    */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
    try {
    //同一个线程,index 增加 1
    int index = tpInfo.getSendWhichQueue().getAndIncrement();
    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { //取余,使同个线程发送到不同的 messagequeue int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //判断该 broker 是否失效 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } // 获取的 mq 失效 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) {
    final MessageQueue mq = tpInfo.selectOneMessageQueue();
    if (notBestBroker != null) {
    mq.setBrokerName(notBestBroker);
    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
    }
    return mq;
    } else {
    latencyFaultTolerance.remove(notBestBroker);
    }
    } catch (Exception e) {
    log.error("Error occurred when selecting message queue", e);
    }

        return tpInfo.selectOneMessageQueue();  
    }
    
    return tpInfo.selectOneMessageQueue(lastBrokerName);  

    }

    broker 故障延迟机制相关类如下

其中LatencyFaultToleranceImpl 定义内部类 FaultItem ,从名字也可以看出这是放置失效broker用的,内部存放 currentLatency表示延时时间,startTimestamp 表示失效时间终点时间戳。


// LatencyFaultToleranceImpl#updateFaultItem 

@Override  
/\*\*  
 \*  
 \* @param name broker名字  
 \* @param currentLatency 当前延迟时长  
 \* @param notAvailableDuration 延迟时长  
 \*/  
@Override  
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {  
    FaultItem old = this.faultItemTable.get(name);  
    if (null == old) {  
        final FaultItem faultItem = new FaultItem(name);  
        faultItem.setCurrentLatency(currentLatency);  
        //可以看到 下次可用的时间 =  现在的时间 + 延迟的时间  
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        old = this.faultItemTable.putIfAbsent(name, faultItem);  
        if (old != null) {  
            old.setCurrentLatency(currentLatency);  
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);  
        }  
    } else {  
        old.setCurrentLatency(currentLatency);  
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);  
    }  
}

/\*\*  
 \* 尝试从规避的broker 中选择一个可用的较好的 broker ,如果没找到,将返回 null  
 \* 哪一个较好呢? 肯定是规避时间较短的  
 \*/  
@Override  
public String pickOneAtLeast() {  
    final Enumeration<FaultItem> elements = this.faultItemTable.elements();  
    List<FaultItem> tmpList = new LinkedList<FaultItem>();  
    while (elements.hasMoreElements()) {  
        final FaultItem faultItem = elements.nextElement();  
        tmpList.add(faultItem);  
    }

    if (!tmpList.isEmpty()) {  
        //洗牌后又排序,这里看不懂为啥要洗牌后排序,为什么不直接排序?  
        Collections.shuffle(tmpList);  
        //排序的根据看 FaultItem 的 compareTo 方法,根据时间排序  
        Collections.sort(tmpList);

        final int half = tmpList.size() / 2;  
        if (half <= 0) {  
            return tmpList.get(0).getName();  
        } else {  
            // whichItemWorst 记录本次选取的 broker ,加若选去后还是发送失败,那么同个线程下次再次调用 pickOneAtLeast 的时候  
            //就会调用选用另外一个broker  
            final int i = this.whichItemWorst.getAndIncrement() % half;  
            return tmpList.get(i).getName();  
        }  
    }

    return null;  
}

// MQFaultStrategy#updateFaultItem  
// 其中 currentLatency 是发送时长(发送花了多少时间)  
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {  
    if (this.sendLatencyFaultEnable) {  
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);  
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);  
    }  
}

//计算出需要延迟的时长  
private long computeNotAvailableDuration(final long currentLatency) {  
    for (int i = latencyMax.length - 1; i >= 0; i--) {  
        if (currentLatency >= latencyMax\[i\])  
            return this.notAvailableDuration\[i\];  
    }

    return 0;  
}

private long\[\] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};  
private long\[\] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

  假如是一个异步发送方法

public static void main(  
    String\[\] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {

    DefaultMQProducer producer = new DefaultMQProducer("Jodie\_Daily\_test");  
    producer.start();  
    producer.setRetryTimesWhenSendAsyncFailed(0);

    for (int i = 0; i < 10000000; i++) {  
        try {  
            final int index = i;  
            Message msg = new Message("Jodie\_topic\_1023",  
                "TagA",  
                "OrderID188",  
                "Hello world".getBytes(RemotingHelper.DEFAULT\_CHARSET));  
            producer.send(msg, new SendCallback() {  
                @Override  
                public void onSuccess(SendResult sendResult) {  
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());  
                }

                @Override  
                public void onException(Throwable e) {  
                    System.out.printf("%-10d Exception %s %n", index, e);  
                    e.printStackTrace();  
                }  
            });  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    producer.shutdown();  
}

DefaultMQProducerimpl#sendKernelImpl是消息发送的核心方法,我们看一下


/\*\*  
 \*  
 \* @param msg 发送的消息  
 \* @param mq  选定的消息队列  
 \* @param communicationMode 交互模式  
 \* @param sendCallback  异步消息 callback 回调  
 \* @param topicPublishInfo topic消息  
 \* @param timeout 超时时长  
 \* @return  
 \* @throws MQClientException  
 \* @throws RemotingException  
 \* @throws MQBrokerException  
 \* @throws InterruptedException  
 \*/  
private SendResult sendKernelImpl(final Message msg,  
    final MessageQueue mq,  
    final CommunicationMode communicationMode,  
    final SendCallback sendCallback,  
    final TopicPublishInfo topicPublishInfo,  
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  
    //No.1 获取地址  
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());  
    if (null == brokerAddr) {  
        tryToFindTopicPublishInfo(mq.getTopic());  
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());  
    }

    SendMessageContext context = null;  
    if (brokerAddr != null) {  
        //  
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte\[\] prevBody = msg.getBody();  
        try {  
            //for MessageBatch,ID has been set in the generating process  
            // 为消息分配全局唯一 ID ,如果消息体默认超过 4K ,会对消息采用 zip 压缩,设置标识位  
            if (!(msg instanceof MessageBatch)) {  
                MessageClientIDSetter.setUniqID(msg);  
            }

            int sysFlag = 0;  
            if (this.tryToCompressMessage(msg)) {  
                sysFlag |= MessageSysFlag.COMPRESSED\_FLAG;  
            }

            final String tranMsg = msg.getProperty(MessageConst.PROPERTY\_TRANSACTION\_PREPARED);  
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {  
                sysFlag |= MessageSysFlag.TRANSACTION\_PREPARED\_TYPE;  
            }

            // 如果有钩子函数,执行钩子函数  
            if (hasCheckForbiddenHook()) {  
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();  
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());  
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());  
                checkForbiddenContext.setCommunicationMode(communicationMode);  
                checkForbiddenContext.setBrokerAddr(brokerAddr);  
                checkForbiddenContext.setMessage(msg);  
                checkForbiddenContext.setMq(mq);  
                checkForbiddenContext.setUnitMode(this.isUnitMode());  
                this.executeCheckForbiddenHook(checkForbiddenContext);  
            }

            if (this.hasSendMessageHook()) {  
                context = new SendMessageContext();  
                context.setProducer(this);  
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());  
                context.setCommunicationMode(communicationMode);  
                context.setBornHost(this.defaultMQProducer.getClientIP());  
                context.setBrokerAddr(brokerAddr);  
                context.setMessage(msg);  
                context.setMq(mq);  
                String isTrans = msg.getProperty(MessageConst.PROPERTY\_TRANSACTION\_PREPARED);  
                if (isTrans != null && isTrans.equals("true")) {  
                    context.setMsgType(MessageType.Trans\_Msg\_Half);  
                }

                if (msg.getProperty("\_\_STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY\_DELAY\_TIME\_LEVEL) != null) {  
                    context.setMsgType(MessageType.Delay\_Msg);  
                }  
                this.executeSendMessageHookBefore(context);  
            }

            //==================== 构造request请求体 ==================

            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();  
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());  
            requestHeader.setTopic(msg.getTopic());  
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());  
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());  
            requestHeader.setQueueId(mq.getQueueId());  
            requestHeader.setSysFlag(sysFlag);  
            requestHeader.setBornTimestamp(System.currentTimeMillis());  
            requestHeader.setFlag(msg.getFlag());  
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));  
            requestHeader.setReconsumeTimes(0);  
            requestHeader.setUnitMode(this.isUnitMode());  
            requestHeader.setBatch(msg instanceof MessageBatch);  
            if (requestHeader.getTopic().startsWith(MixAll.RETRY\_GROUP\_TOPIC\_PREFIX)) {  
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);  
                if (reconsumeTimes != null) {  
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));  
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY\_RECONSUME\_TIME);  
                }

                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);  
                if (maxReconsumeTimes != null) {  
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));  
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY\_MAX\_RECONSUME\_TIMES);  
                }  
            }

            SendResult sendResult = null;

            //==================== 构造request请求体 ==================

            //(重要)根据不同的交互模式,发送请求,核心发送逻辑  
            switch (communicationMode) {  
                case ASYNC:  
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  
                        brokerAddr,  
                        mq.getBrokerName(),  
                        msg,  
                        requestHeader,  
                        timeout,  
                        communicationMode,  
                        sendCallback,  
                        topicPublishInfo,  
                        this.mQClientFactory,  
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),  
                        context,  
                        this);  
                    break;  
                case ONEWAY:  
                case SYNC:  
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  
                        brokerAddr,  
                        mq.getBrokerName(),  
                        msg,  
                        requestHeader,  
                        timeout,  
                        communicationMode,  
                        context,  
                        this);  
                    break;  
                default:  
                    assert false;  
                    break;  
            }

            if (this.hasSendMessageHook()) {  
                context.setSendResult(sendResult);  
                this.executeSendMessageHookAfter(context);  
            }

            return sendResult;  
            ....  
            ....  
            (异常处理)

}           

  MQ 客户端发送消息的入口是 MQClientAPIImpl#sendMessage,MQClientAPIImpl 持有 RemotingClient 字段,它是个接口,实现类是 NettyRemotingClient ,即是它就是真正执行发送的对象,请求命令是 RequestCode.SEND_MESSAGE,我们可以找到该命令的处理类: org . apache.rocketmq. broker.processor.SendMessageProcessor。人口方法在 SendMessageProcessor#sendMessage。

//SendMessageProcessor#sendMessage 

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,  
    final RemotingCommand request,  
    final SendMessageContext sendMessageContext,  
    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);  
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

    response.setOpaque(request.getOpaque());

    response.addExtField(MessageConst.PROPERTY\_MSG\_REGION, this.brokerController.getBrokerConfig().getRegionId());  
    response.addExtField(MessageConst.PROPERTY\_TRACE\_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);

    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();  
    if (this.brokerController.getMessageStore().now() < startTimstamp) {  
        response.setCode(ResponseCode.SYSTEM\_ERROR);  
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));  
        return response;  
    }

    response.setCode(-1);  
    // No.1 先检查  
    super.msgCheck(ctx, requestHeader, response);  
    if (response.getCode() != -1) {  
        return response;  
    }

    final byte\[\] body = request.getBody();

    int queueIdInt = requestHeader.getQueueId();  
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    if (queueIdInt < 0) {  
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();  
    }

    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();  
    msgInner.setTopic(requestHeader.getTopic());  
    msgInner.setQueueId(queueIdInt);  
    // 如果消息重试次数超过允许的最大重试次数,消息将进入DLD延迟队列  
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {  
        return response;  
    }

    msgInner.setBody(body);  
    msgInner.setFlag(requestHeader.getFlag());  
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));  
    msgInner.setPropertiesString(requestHeader.getProperties());  
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());  
    msgInner.setBornHost(ctx.channel().remoteAddress());  
    msgInner.setStoreHost(this.getStoreHost());  
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());

    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {  
        String traFlag = msgInner.getProperty(MessageConst.PROPERTY\_TRANSACTION\_PREPARED);  
        if (traFlag != null) {  
            response.setCode(ResponseCode.NO\_PERMISSION);  
            response.setRemark(  
                "the broker\[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "\] sending transaction message is forbidden");  
            return response;  
        }  
    }

    //(重要)进行消息存储,后续分析  
    PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

//

protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,  
    final SendMessageRequestHeader requestHeader, final RemotingCommand response) {  
    // 检查是否可写  
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())  
        && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {  
        response.setCode(ResponseCode.NO\_PERMISSION);  
        response.setRemark("the broker\[" + this.brokerController.getBrokerConfig().getBrokerIP1()  
            + "\] sending message is forbidden");  
        return response;  
    }  
    // 检查该Topic是否可以进行消息发送  
    if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {  
        String errorMsg = "the topic\[" + requestHeader.getTopic() + "\] is conflict with system reserved words.";  
        log.warn(errorMsg);  
        response.setCode(ResponseCode.SYSTEM\_ERROR);  
        response.setRemark(errorMsg);  
        return response;  
    }

    //在 NameServer端存储主题的配置信息  
    TopicConfig topicConfig =  
        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());  
    if (null == topicConfig) {  
        int topicSysFlag = 0;  
        if (requestHeader.isUnitMode()) {  
            if (requestHeader.getTopic().startsWith(MixAll.RETRY\_GROUP\_TOPIC\_PREFIX)) {  
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);  
            } else {  
                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);  
            }  
        }

        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());  
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(  
            requestHeader.getTopic(),  
            requestHeader.getDefaultTopic(),  
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),  
            requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

        if (null == topicConfig) {  
            if (requestHeader.getTopic().startsWith(MixAll.RETRY\_GROUP\_TOPIC\_PREFIX)) {  
                topicConfig =  
                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(  
                        requestHeader.getTopic(), 1, PermName.PERM\_WRITE | PermName.PERM\_READ,  
                        topicSysFlag);  
            }  
        }

        if (null == topicConfig) {  
            response.setCode(ResponseCode.TOPIC\_NOT\_EXIST);  
            response.setRemark("topic\[" + requestHeader.getTopic() + "\] not exist, apply first please!"  
                + FAQUrl.suggestTodo(FAQUrl.APPLY\_TOPIC\_URL));  
            return response;  
        }  
    }  
    // 检查队列,如果队列不合法,返回错误码  
    int queueIdInt = requestHeader.getQueueId();  
    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());  
    if (queueIdInt >= idValid) {  
        String errorInfo = String.format("request queueId\[%d\] is illegal, %s Producer: %s",  
            queueIdInt,  
            topicConfig.toString(),  
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        log.warn(errorInfo);  
        response.setCode(ResponseCode.SYSTEM\_ERROR);  
        response.setRemark(errorInfo);

        return response;  
    }  
    return response;  
}

  发送的逻辑我们可以归纳一下

  1. 创建 DefaultProducer,启动其 start 方法
  2. send 方法调用会,利用 DefaultProducer 内部的 DefaultProducerImpl 执行发送操作,DefaultProducerImpl 使用 MQClient 执行发送,这中间有个broker 故障延迟发送机制,最后到了MQClientAPIImpl 的发送方法,MQClientAPIImpl 使用 RemotingClient 发送command 的,消息检查,最后到达DefaultMessageStore#putMessage 进行消息存储 。
      再有一个,我们知道了brokerController 和 NamerServerController 这两个类存放这各种重要的信息,使得消息在发送模块等其他模块(只要该模块持有controller)就可以方法得到各种消息。
      同时类在设计的时候通过设计接口,功能扩展封装在抽象类中,而实际使用不要持有最终实现,而是持有接口类,方便以后扩展。
  • 《RocketMQ技术内幕》