SpringBoot RabbitMQ 实战解决项目中实践
阅读原文时间:2023年08月16日阅读:4
  • 1.1 环境准备

    Springboot 1.5.6.RELEAS
    Springcloud Dalston.SR2

  • 1.2 交换机类型

交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机

Name(交换机类型) Default pre-declared names(预声明的默认名称) 
Direct exchange(直连交换机) (Empty string) and amq.direct 
Fanout exchange(扇型交换机) amq.fanout 
Topic exchange(主题交换机) amq.topic 
Headers exchange(头交换机) amq.match (and amq.headers in RabbitMQ)

  • 1.3 绑定

绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

打个比方:

队列(queue)是我们想要去的位于纽约的目的地 
交换机(exchange)是JFK机场 
绑定(binding)就是JFK机场到目的地的路线。能够到达目的地的路线可以是一条或者多条 
拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动。

如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。

  • 1.4 ack机制

RabbitMq默认是ack机制:no-ack的方式

执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。

实际项目需要手动ack机制 - 见下问实战代码

  • 1.5 消息重复消费

用户在停止查询时,会导致消费者进程被杀死,因此ACK状态码未反馈至MQ,从而消息一直存留在MQ中,当新的消费者启动时会重新消费;

接受消息后-消费消息前,db或者redis nosql检查消息消费状态 - 见下问实战代码

  • 2.1 pom

    org.springframework.boot spring-boot-starter-amqp

  • 2.2 rabbitMQ config

    package com.ttd.trustProduct.mq.rabbit;

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    /**

    • rabbit 配置
      *

    • @author wolf
      */
      @Configuration
      @EnableRabbit
      public class RabbitConfiguration {

      //===============以下是验证topic Exchange的队列==========
      @Bean
      public Queue productMessage() {
      return new Queue("ttd.trust.product");
      }

      @Bean
      public Queue allMessages() {
      return new Queue("ttd.all");
      }

      @Bean
      TopicExchange exchange() {
      return new TopicExchange("exchange");
      }

      /**

      • 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
        *
      • @param queueMessage
      • @param exchange
      • @return
        */
        @Bean
        Binding bindingExchangeMessage(Queue productMessage, TopicExchange exchange) {
        return BindingBuilder.bind(productMessage).to(exchange).with("ttd.trust.product");
        }

      /**

      • 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配
        *
      • @param queueMessage
      • @param exchange
      • @return
        */
        @Bean
        Binding bindingExchangeMessages(Queue allMessages, TopicExchange exchange) {
        return BindingBuilder.bind(allMessages).to(exchange).with("ttd.#");
        }
        //===============以上是验证topic Exchange的队列==========

      //===============以下是验证Fanout Exchange的队列==========
      @Bean
      public Queue AMessage() {
      return new Queue("fanout.A");
      }

      @Bean
      public Queue BMessage() {
      return new Queue("fanout.B");
      }

      @Bean
      public Queue CMessage() {
      return new Queue("fanout.C");
      }

      @Bean
      FanoutExchange fanoutExchange() {
      return new FanoutExchange("fanoutExchange");
      }

      @Bean
      Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
      return BindingBuilder.bind(AMessage).to(fanoutExchange);
      }

      @Bean
      Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
      return BindingBuilder.bind(BMessage).to(fanoutExchange);
      }

      @Bean
      Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
      return BindingBuilder.bind(CMessage).to(fanoutExchange);
      }
      //===============以上是验证Fanout Exchange的队列==========

      @Bean
      public Queue helloQueue() {
      return new Queue("newhelloQueue");
      }

    }

  • 2.4 数据交换协议

    package com.ttd.sdk.common;

    import java.io.Serializable;
    import java.util.Date;
    import java.util.Map;

    import com.alibaba.fastjson.JSON;
    import com.ttd.sdk.util.DateUtil;
    import com.ttd.sdk.util.RandomUtils;

    /**

    • 消息体
      *

    • @author wolf
      *
      */
      public class MQMessage implements Serializable {
      private static final long serialVersionUID = 1L;

      private Integer productCode; // 生产者代码
      private Integer consumerCode; // 消费者代码
      private String messageId; // 消息唯一标识
      private Integer event; // 消息监听事件
      private Integer action; //操作:1加,2减
      private Date created; // 消息发送时间
      private Map bussinessBody; // 消息体,封装业务数据

      private MQMessage() {
      super();
      }

      private MQMessage(Integer productCode, Integer consumerCode, String messageId, Integer event, Date created,
      Map bussinessBody, Integer action) {
      super();
      this.productCode = productCode;
      this.consumerCode = consumerCode;
      this.messageId = messageId;
      this.event = event;
      this.created = created;
      this.bussinessBody = bussinessBody;
      this.action = action;
      }

      private MQMessage(Integer productCode, Integer consumerCode, Integer event, Map bussinessBody, Integer action) {
      super();
      this.productCode = productCode;
      this.consumerCode = consumerCode;
      this.event = event;
      this.bussinessBody = bussinessBody;
      this.action = action;
      }

      public static String productMQMessage(Integer productCode, Integer consumerCode, Integer event, Map bussinessBody, Integer action) {
      MQMessage mqObj = new MQMessage(productCode, consumerCode, event, bussinessBody, action);
      mqObj.setCreated(new Date());
      mqObj.setMessageId(generatSeriaeNo());

      return JSON.toJSONString(mqObj);

      }

      //生成消息唯一标识
      private static String generatSeriaeNo() {
      return DateUtil.dateFormat("yyyyMMddHHmmss") + RandomUtils.randomCode(2);
      }

      public Integer getProductCode() {
      return productCode;
      }

      public void setProductCode(Integer productCode) {
      this.productCode = productCode;
      }

      public Integer getConsumerCode() {
      return consumerCode;
      }

      public void setConsumerCode(Integer consumerCode) {
      this.consumerCode = consumerCode;
      }

      public String getMessageId() {
      return messageId;
      }

      public void setMessageId(String messageId) {
      this.messageId = messageId;
      }

      public Integer getEvent() {
      return event;
      }

      public void setEvent(Integer event) {
      this.event = event;
      }

      public Date getCreated() {
      return created;
      }

      public void setCreated(Date created) {
      this.created = created;
      }

      public Map getBussinessBody() {
      return bussinessBody;
      }

      public void setBussinessBody(Map bussinessBody) {
      this.bussinessBody = bussinessBody;
      }

      public Integer getAction() {
      return action;
      }

      public void setAction(Integer action) {
      this.action = action;
      }

      @Override
      public String toString() {
      return "MQMessage [productCode=" + productCode + ", consumerCode="
      + consumerCode + ", messageId=" + messageId + ", event="
      + event + ", action=" + action + ", created=" + created
      + ", bussinessBody=" + bussinessBody + "]";
      }
      }

  • 3.1 生产者

    生产者与消费者数据协议为json,定义统一数据传输实体。
    package com.ttd.trustProduct.mq.rabbit;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    @Component
    public class ProductTopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    private static final Logger logger = LoggerFactory.getLogger(ProductTopicSender.class);
    
    public void send(String msg) {
        System.out.println("ProductTopicSender : " + msg);
        this.rabbitTemplate.convertAndSend("exchange", "ttd.trust.product", msg);
    }

    }

  • 3.2 消费者

    package com.ttd.trustProduct.mq.rabbit;

    import javax.annotation.Resource;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;

    import com.alibaba.fastjson.JSON;
    import com.rabbitmq.client.Channel;
    import com.ttd.sdk.common.MQMessage;
    import com.ttd.sdk.common.enumerate.ActionCodeEnum;
    import com.ttd.sdk.common.enumerate.EventCodeEnum;
    import com.ttd.trustProduct.domain.LogMqMessage;
    import com.ttd.trustProduct.domain.SaleInfo;
    import com.ttd.trustProduct.service.LogMqMessageService;
    import com.ttd.trustProduct.service.SaleInfoService;
    import com.ttd.trustProduct.utils.IntegerUtils;

    @Component
    @RabbitListener(queues = "ttd.trust.product")
    public class ProductMessageReceiver {
    @Resource private SaleInfoService saleInfoService;
    @Resource private LogMqMessageService logMqMessageService;
    private final Logger logger = LoggerFactory.getLogger(ProductMessageReceiver.class);

    @Autowired  private ConnectionFactory connectionff;
    
    /*
     * Map<String, Object> bussiness = Maps.newHashMap();
     **公共必填:**
     bussiness.put("productId", 11);
     bussiness.put("companyId", 100); //B公司id
     bussiness.put("isSmallPerson", 1); //1 or 0
     bussiness.put("assignType", 1) //'1指定派发,2抢购派发',
     **预约事件必填**
     bussiness.put("bookNum", 1);
     bussiness.put("bookAmount", 100);
     **报单事件必填**
     bussiness.put("formNum", 1);
     bussiness.put("formAmount", 100);
     **募集事件必填**
     bussiness.put("raiseNum", 1);
     bussiness.put("raiseAmount", 100);
     **签章事件必填**
     bussiness.put("signedNum", 1); //电子签章数
     bussiness.put("paperSignedNum", 1);//纸质签章数
     **双录事件必填**
     bussiness.put("signingNum", 100);
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionff);
        container.setQueueNames("ttd.trust.product");
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
        container.setMessageListener(new ChannelAwareMessageListener() {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            byte[] body = message.getBody();
            String jsonString =  new String(body);
            logger.info("|============ProductMessageReceiver  : " + jsonString);
    
            MQMessage msg = JSON.parseObject(jsonString, MQMessage.class);
    
            boolean preRet = preEventHandler(msg);
            if (preRet == false) {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                return ;
            }
    
            boolean postRet = postEventHandler(msg);
            if (postRet == false) {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                return ;
            }
    
            afterEventHandler(msg);
    
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    });
    return container;
    }

    /* @RabbitHandler
    public void process(String msg) {
    System.out.println("|============ProductMessageReceiver : " +msg);
    MQMessage message = JSON.parseObject(msg, MQMessage.class);

        boolean preRet = preEventHandler(message);
        if (preRet == false)
            return ;
    boolean postRet = postEventHandler(message);
    if (postRet == false)
        return ;
    
    afterEventHandler(message);
    }*/ private void recordLogMQ(MQMessage message, Integer state) { LogMqMessage log = new LogMqMessage(); log.setMessageId(message.getMessageId()); log.setProductCode(message.getProductCode()); log.setConsumerCode(message.getConsumerCode()); log.setEvent(message.getEvent()); log.setBussinessBody(JSON.toJSONString(message)); log.setState(state);
    logMqMessageService.insertEntry(log);
    } /** * 消息体检查 * @param message * @return */ private boolean preEventHandler(MQMessage message) { //不能重复消费 LogMqMessage logMQ = new LogMqMessage(); logMQ.setMessageId(message.getMessageId()); int count = logMqMessageService.selectEntryListCount(logMQ); System.out.println(count); if (count > 0) { return false; }
    //消息体格式错误
    if (message.getEvent() == null ||  message.getAction() == null || message.getBussinessBody() == null ||
        !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("productId")) ||
        !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("companyId")) ||
        (Integer) message.getBussinessBody().get("isSmallPerson") == null ) {
        recordLogMQ(message, -1);
        return false;
    }
    
    //业务类型检查
    //预约事件
    if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.BOOK_EVENT.getValue())) {
        if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("bookNum")) ||
            !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("bookAmount"))) {
            recordLogMQ(message, -1);
            return false;
        }
    
    //报单事件
    } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.FORM_EVENT.getValue())) {
        if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("formNum")) ||
            !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("formAmount"))) {
                recordLogMQ(message, -1);
                return false;
        }
    
    //双录事件
    } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) {
        if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("signingNum"))) {
                recordLogMQ(message, -1);
                return false;
        }
    
    //签章事件
    } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) {
        if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("signedNum"))
            || !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("paperSignedNum"))) {
            recordLogMQ(message, -1);
            return false;
        }
    
    //募集事件
    } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) {
        if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("raiseNum")) ||
                IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("raiseAmount"))) {
                recordLogMQ(message, -1);
                return false;
        }
    }
    
    return true;
    } /** * 业务处理 * @param message * @return */ private boolean postEventHandler(MQMessage message) { Integer productId = (Integer) message.getBussinessBody().get("productId"); Integer companyId = (Integer) message.getBussinessBody().get("companyId"); Integer isSmallPerson = (Integer) message.getBussinessBody().get("isSmallPerson"); Integer assignType = (Integer) message.getBussinessBody().get("assignType");
    //查询
    SaleInfo saleInfo = new SaleInfo();
    saleInfo.setCompanyId(IntegerUtils.parseLong(companyId));
    saleInfo.setProductId(IntegerUtils.parseLong(productId));
    saleInfo.setAssignType(assignType);
    SaleInfo cond = saleInfoService.selectEntryOne(saleInfo);
    
    //预约事件
    if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.BOOK_EVENT.getValue())) {
        Integer bookNum = (Integer) message.getBussinessBody().get("bookNum");
        Integer bookAmount = (Integer) message.getBussinessBody().get("bookAmount");
    
        //insert and plus
        if (cond == null &amp;&amp; IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
            saleInfo.setBookTotalNum(bookNum);
            saleInfo.setBookTotalAmount(bookAmount);
            if (IntegerUtils.greatThanZero(isSmallPerson)) {
                saleInfo.setSmallPersonBookNum(bookNum);
                saleInfo.setSmallPersonBookAmount(bookAmount);
            }
            saleInfoService.insertEntry(saleInfo);
    
        //update
        } else {
            if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
                cond.setBookTotalNum(cond.getBookTotalNum() + bookNum);
                cond.setBookTotalAmount(cond.getBookTotalAmount() + bookAmount);
                if (IntegerUtils.greatThanZero(isSmallPerson)) {
                    cond.setSmallPersonBookNum(cond.getSmallPersonBookNum() + bookNum);
                    cond.setSmallPersonBookAmount(cond.getSmallPersonBookAmount() + bookAmount);
                }
            } else {
                cond.setBookTotalNum(cond.getBookTotalNum() - bookNum);
                cond.setBookTotalAmount(cond.getBookTotalAmount() - bookAmount);
                if (IntegerUtils.greatThanZero(isSmallPerson)) {
                    cond.setSmallPersonBookNum(cond.getSmallPersonBookNum() - bookNum);
                    cond.setSmallPersonBookAmount(cond.getSmallPersonBookAmount() - bookAmount);
                }
            }
            saleInfoService.updateByKey(cond);
        }   
    
    //报单事件
    } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.FORM_EVENT.getValue())) {
        Integer formNum = (Integer) message.getBussinessBody().get("formNum");
        Integer formAmount = (Integer) message.getBussinessBody().get("formAmount");
    
        //insert and plus
        if (cond == null &amp;&amp; IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
            saleInfo.setFormTotalNum(formNum);
            saleInfo.setFormTotalAmount(formAmount);
            if (IntegerUtils.greatThanZero(isSmallPerson)) {
                saleInfo.setSmallPersonFormNum(formNum);
                saleInfo.setSmallPersonFormAmount(formAmount);
            }
            saleInfoService.insertEntry(saleInfo);
    
        //update
        } else {
            if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
                cond.setFormTotalNum(cond.getFormTotalNum() + formNum);
                cond.setFormTotalAmount(cond.getFormTotalAmount() + formAmount);
                if (IntegerUtils.greatThanZero(isSmallPerson)) {
                    cond.setSmallPersonFormNum(cond.getSmallPersonFormNum() + formNum);
                    cond.setSmallPersonFormAmount(saleInfo.getSmallPersonFormAmount() + formAmount);
                }
            } else {
                cond.setFormTotalNum(cond.getFormTotalNum() - formNum);
                cond.setFormTotalAmount(cond.getFormTotalAmount() - formAmount);
                if (IntegerUtils.greatThanZero(isSmallPerson)) {
                    cond.setSmallPersonFormNum(cond.getSmallPersonFormNum() - formNum);
                    cond.setSmallPersonFormAmount(cond.getSmallPersonFormAmount() - formAmount);
                }
            }
            saleInfoService.updateByKey(cond);
        }   
    
    //双录事件
    } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) {
        Integer signingNum = (Integer) message.getBussinessBody().get("signingNum");
    
        //insert and plus
        if (cond == null &amp;&amp; IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
            saleInfo.setSigningContractNum(signingNum);
            saleInfoService.insertEntry(saleInfo);
    
        //update
        } else {
            if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
                cond.setSigningContractNum(cond.getSigningContractNum() + signingNum);
            } else {
                cond.setSigningContractNum(cond.getSigningContractNum() - signingNum);
            }
            saleInfoService.updateByKey(cond);
        }
    
    //签章事件
    } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) {
        Integer signedNum = (Integer) message.getBussinessBody().get("signedNum");
        Integer paperSignedNum = (Integer) message.getBussinessBody().get("paperSignedNum");
    
        //insert and plus
        if (cond == null &amp;&amp; IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
            saleInfo.setSignedContractNum(signedNum);
            saleInfo.setPaperSignedContractNum(paperSignedNum);
            saleInfoService.insertEntry(saleInfo);
    
        //update
        } else {
            if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
                cond.setSignedContractNum(cond.getSignedContractNum() + signedNum);
                cond.setPaperSignedContractNum(cond.getPaperSignedContractNum() + paperSignedNum);
            } else {
                cond.setSignedContractNum(cond.getSignedContractNum() - signedNum);
                cond.setPaperSignedContractNum(cond.getPaperSignedContractNum() - paperSignedNum);
            }
            saleInfoService.updateByKey(cond);
        }
    
    //募集事件
    } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) {
        Integer raiseNum = (Integer) message.getBussinessBody().get("raiseNum");
        Integer raiseAmount = (Integer) message.getBussinessBody().get("raiseAmount");
    
        //insert and plus
        if (cond == null &amp;&amp; IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
            saleInfo.setRaiseTotalNum(raiseNum);
            saleInfo.setRaiseTotalAmount(raiseAmount);
            saleInfoService.insertEntry(saleInfo);
    
        //update
        } else {
            if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) {
                cond.setRaiseTotalNum(cond.getRaiseTotalNum() + raiseNum);
                cond.setRaiseTotalAmount(cond.getRaiseTotalAmount() + raiseAmount);
            } else {
                cond.setRaiseTotalNum(cond.getRaiseTotalNum() - raiseNum);
                cond.setRaiseTotalAmount(cond.getRaiseTotalAmount() - raiseAmount);
            }
            saleInfoService.updateByKey(cond);
        }
    
    }
    
    return true;
    } /** * 消息日志处理 * @param message */ private void afterEventHandler(MQMessage message) { recordLogMQ(message, 1); }

    }

  • 3.3 单元测试

    package com.ttd.test;

    import java.util.Map;

    import javax.annotation.Resource;

    import org.junit.Test;

    import com.google.common.collect.Maps;
    import com.ttd.sdk.common.MQMessage;
    import com.ttd.sdk.common.enumerate.ActionCodeEnum;
    import com.ttd.sdk.common.enumerate.EventCodeEnum;
    import com.ttd.sdk.common.enumerate.ServiceCodeEnum;
    import com.ttd.trustProduct.mq.kafka.MsgProducer;
    import com.ttd.trustProduct.mq.rabbit.ProductTopicSender;

    public class TestMsg extends BaseTestService{

    @Resource private MsgProducer producer;
    @Resource private ProductTopicSender topicSender;

    // @Resource private FanoutSender fanoutSender;
    // @Resource private CallBackSender callBackSender;
    // @Resource private HelloSender1 hellSender1;

    //kafka

    // @Test
    public void testSendMsg() {
    String seriNo = System.currentTimeMillis()/1000 + "";
    producer.send("edwintest", "{\"seriNo\":" + seriNo + ",\"code\":200, \"msg\":\"发送一条消息,1-topic-8partion-1-replication\"}");
    }

    // topic exchange
    @Test
    public void testTopicRabbit() throws InterruptedException {
        Map<String, Object> bussiness = Maps.newHashMap();
        bussiness.put("productId", 15);
        bussiness.put("companyId", 100); //B公司id
        bussiness.put("isSmallPerson", 1); //1 or 0
        bussiness.put("assignType", 1);
    bussiness.put("bookNum", 1);
    bussiness.put("bookAmount", 100);
    /*bussiness.put("formNum", 1);
    bussiness.put("formAmount", 100);
    bussiness.put("raiseNum", 1);
    bussiness.put("raiseAmount", 100);
    bussiness.put("signedNum", 1);
    bussiness.put("signingNum", 100); */
    
    String messageBody = MQMessage.productMQMessage(
            ServiceCodeEnum.TTD_TRUST_ORDER.getValue(),
            ServiceCodeEnum.TTD_TRUST_PRODUCT.getValue(),
            EventCodeEnum.BOOK_EVENT.getValue(),
            bussiness,
            ActionCodeEnum.PLUS_ACTION.getValue());
    topicSender.send(messageBody);
    }

    }

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章