rocketmq有序消息的(四)
阅读原文时间:2021年11月12日阅读:1

opic的有序消息已经成为mq的标配。而RocketMQ中是这样区分消息类型的, 普通消息也叫做无序消息,简单来说就是没有顺序的消息,而有序消息就是按照一定的先后顺序的消息类型。举个例子,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序也就是 1、2、3 ,而不会出现普通消息那样的 2、1、3 等情况。

一、有序消息该如何实现

理论上:我们都知道消息首先由 producer 到 broker,再从 broker 到 consumer,分这两步走。那么要保证消息的有序,势必这两步都是要保证有序的,即要保证消息是按有序发送到 broker,broker 也是有序将消息投递给 consumer,两个条件必须同时满足,缺一不可。

由于一个 topic 只有一个 queue ,即使我们有多个 producer 实例和 consumer 实例也很难提高消息吞吐量。就好比过独木桥,大家只能一个挨着一个过去,效率低下。

常见做法就是将 order id 进行处理,将 order id 相同的消息发送到 topicB 的同一个 queue,假设我们 topicB 有 2 个 queue,那么我们可以简单的对 id 取余,奇数的发往 queue0,偶数的发往 queue1,消费者按照 queue 去消费时,就能保证 queue0 里面的消息有序消费,queue1 里面的消息有序消费。

二、RocketMQ的topic的补充

opic 只是消息的逻辑分类,内部实现其实是由 queue 组成。当 producer 把消息发送到某个 topic 时,默认是会消息发送到具体的 queue 上。由于一个 topic 可以有多个 queue,所以在性能比全局有序高得多。假设 queue 数是 n,理论上性能就是全局有序的 n 倍,当然 consumer 也要跟着增加才行。在实际情况中,这种局部有序消息是会比全局有序消息用的更多。

/**
* 有序消息
*/
public class OrderedProducer {
public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";

public static void main(String\[\] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {  
    // 1:创建生产者对象,并指定组名  
    DefaultMQProducer producer = new DefaultMQProducer("GROUP\_TEST");

    // 2:指定NameServer地址  
    producer.setNamesrvAddr(NAME\_SERVER\_ADDR);

    // 3:启动生产者  
    producer.start();  
    producer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败重试次数,默认为2

    // 4:定义消息队列选择器  
    MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {

        /\*\*  
         \* 消息队列选择器,保证同一条业务数据的消息在同一个队列  
         \* @param mqs topic中所有队列的集合  
         \* @param msg 发送的消息  
         \* @param arg 此参数是本示例中producer.send的第三个参数  
         \* @return  
         \*/  
        @Override  
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
            Integer id = (Integer) arg;  
            // id == 1001  
            int index = id % mqs.size();  
            // 分区顺序:同一个模值的消息在同一个队列中  
            return mqs.get(index);

            // 全局顺序:所有的消息都在同一个队列中  
            // return mqs.get(mqs.size() - 1);  
        }  
    };

    String\[\] tags = new String\[\]{"TagA", "TagB", "TagC"};

    List<Map> bizDatas = getBizDatas();

    // 5:循环发送消息  
    for (int i = 0; i < bizDatas.size(); i++) {  
        Map bizData = bizDatas.get(i);  
        // keys:业务数据的ID,比如用户ID、订单编号等等  
        Message msg = new Message("TopicTest", tags\[i % tags.length\], "" + bizData.get("msgType"), bizData.toString().getBytes(RemotingHelper.DEFAULT\_CHARSET));  
        // 发送有序消息  
        SendResult sendResult = producer.send(msg, messageQueueSelector, bizData.get("msgType"));

        System.out.printf("%s, body:%s%n", sendResult, bizData);  
    }

    // 6:关闭生产者  
    producer.shutdown();  
}

public static List<Map> getBizDatas() {  
    List<Map> orders = new ArrayList<Map>();

    HashMap orderData = new HashMap();  
    orderData.put("msgType", 1001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "存钱1000");  
    orders.add(orderData);

    orderData = new HashMap();  
    orderData.put("msgType", 2001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "取钱1000");  
    orders.add(orderData);

    orderData = new HashMap();  
    orderData.put("msgType", 3001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "存钱2000");  
    orders.add(orderData);

    orderData = new HashMap();  
    orderData.put("msgType", 4001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "存钱3000");  
    orders.add(orderData);

    orderData = new HashMap();  
    orderData.put("msgType", 5001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "存钱4000");  
    orders.add(orderData);

    orderData = new HashMap();  
    orderData.put("msgType", 6001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "取钱5000");  
    orders.add(orderData);

    orderData = new HashMap();  
    orderData.put("msgType", 7001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "取钱6000");  
    orders.add(orderData);

    orderData = new HashMap();  
    orderData.put("msgType", 8001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "取钱2000");  
    orders.add(orderData);

    orderData = new HashMap();  
    orderData.put("msgType", 9001);  
    orderData.put("userId", "张三");  
    orderData.put("desc", "存钱9000");  
    orders.add(orderData);

    return orders;  
}  

}

/**
* 顺序消息消费者
*/
public class OrderedConsumer {
public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
public static void main(String[] args) throws Exception {
// 1. 创建消费者(Push)对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");

    // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV\_ADDR,可以省略此步  
    consumer.setNamesrvAddr(NAME\_SERVER\_ADDR);

    /\*\*  
     \* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>  
     \* 如果非第一次启动,那么按照上次消费的位置继续消费  
     \* 这里设置的是一个consumer的消费策略  
     \* CONSUME\_FROM\_LAST\_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息  
     \* CONSUME\_FROM\_FIRST\_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍  
     \* CONSUME\_FROM\_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前  
     \*/  
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME\_FROM\_FIRST\_OFFSET);

    // 3. 订阅对应的主题和Tag   String\[\] tags = new String\[\]{"TagA", "TagB", "TagC"};  
    consumer.subscribe("TopicTest", "TagA || TagB || TagC");

    // 4. 注册消息接收到Broker消息后的处理接口  
    // 注1:普通消息消费 \[\[  

// consumer.registerMessageListener(new MessageListenerConcurrently() {
// AtomicInteger count = new AtomicInteger(0);
//
// public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// doBiz(list.get(0));
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }
// });
// ]] 注1:普通消息消费

    // consumer  
    consumer.setMaxReconsumeTimes(-1);  
    // 延时  level  3

    // 注2:顺序消息消费 \[\[  
    consumer.registerMessageListener(new MessageListenerOrderly() {  
        @Override  
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,  
                                                   ConsumeOrderlyContext context) {  
            context.setAutoCommit(true);  
            doBiz(msgs.get(0));

            return ConsumeOrderlyStatus.SUCCESS;  
        }  
    });

    // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错)  
    consumer.start();

    System.out.println("已启动消费者");  
}

/\*\*  
 \* 模拟处理业务  
 \*  
 \* @param message  
 \*/  
public static void doBiz(Message message) {  
    try {  
        System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), message.getTags(), new String(message.getBody(), RemotingHelper.DEFAULT\_CHARSET));  
    } catch (UnsupportedEncodingException e) {  
        e.printStackTrace();  
    }  
}  

}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章