SpringBoot - 集成RocketMQ实现延迟消息队列
阅读原文时间:2021年08月26日阅读:1

目录

前言

RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,记录下SpringBoot整合RocketMQ的方式,RocketMQ的安装可以查看:Windows下安装RocketMQ

SpringBoot2.5.3 + RocketMQ4.7.0

具体实现

  • pom.xml

    org.apache.rocketmq rocketmq-client 4.7.0

  • application.yml

    rocketmq:
    producer:
    producer-group: CoisiniProducerGroup
    consumer:
    consumer-group: CoisiniConsumerGroup
    namesrv-addr: 127.0.0.1:9876

  • MQ生产者

    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    import java.util.Objects;

    /**

    • @Description MQ生产者

    • @author coisini

    • @date Aug 25, 2021

    • @Version 1.0
      */
      @Component
      public class ProducerSchedule {

      private DefaultMQProducer producer;

      @Value("${rocketmq.producer.producer-group}")
      private String producerGroup;

      @Value("${rocketmq.namesrv-addr}")
      private String nameSrvAddr;

      public ProducerSchedule() {

      }

      /**

      • 生产者构造

      • @PostConstruct该注解被用来修饰一个非静态的void()方法

      • Bean初始化的执行顺序:

      • Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
        */
        @PostConstruct
        public void defaultMQProducer() {
        if (Objects.isNull(this.producer)) {
        this.producer = new DefaultMQProducer(this.producerGroup);
        this.producer.setNamesrvAddr(this.nameSrvAddr);
        }

        try {
        this.producer.start();
        System.out.println("Producer start");
        } catch (MQClientException e) {
        e.printStackTrace();
        }
        }

      /**

      • 消息发布

      • @param topic

      • @param tag

      • @param messageText

      • @return
        */
        public String send(String topic, String messageText){
        Message message = new Message(topic, messageText.getBytes());

        /**

        • 延迟消息级别设置
        • messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
          */
          message.setDelayTimeLevel(4);

        SendResult result = null;
        try {
        result = this.producer.send(message);
        System.out.println("MessageQueue: " + result.getMessageQueue());
        System.out.println("MsgId: " + result.getMsgId());
        System.out.println("SendStatus: " + result.getSendStatus());
        } catch (Exception e) {
        e.printStackTrace();
        }

        return result.getMsgId();
        }
        }

  • MQ消费者

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;

    /**

    • @Description MQ消费者

    • CommandLineRunner 初始化预加载数据

    • @author coisini

    • @date Aug 25, 2021

    • @Version 1.0
      */
      @Component
      public class ConsumerSchedule implements CommandLineRunner {

      @Value("${rocketmq.consumer.consumer-group}")
      private String consumerGroup;

      @Value("${rocketmq.namesrv-addr}")
      private String nameSrvAddr;

      public void messageListener() throws MQClientException {
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.consumerGroup);
      consumer.setNamesrvAddr(this.nameSrvAddr);

      /**
       * 订阅主题
       */
      consumer.subscribe("Topic", "*");
      
      /**
       * 设置消费消息数
       */
      consumer.setConsumeMessageBatchMaxSize(1);
      
      /**
       * 注册消息监听
       */
      consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
          for (Message message : messages) {
              System.out.println("监听到消息:" + new String(message.getBody()));
          }
          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      });
      
      consumer.start();

      }

      @Override
      public void run(String… args) throws Exception {
      this.messageListener();
      }
      }

  • 测试接口

    @RestController
    @RequestMapping("/test")
    public class TestController {

    @Autowired
    private ProducerSchedule producerSchedule;
    
    @GetMapping("/push")
    public void pushMessageToMQ() throws Exception {
        producerSchedule.send("Topic", "Coisini");
    }

    }

  • 接口调用:

  • 30s后延迟消息触发:

- End -



梦想是咸鱼
关注一下吧

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章