目录
需求说明:两个系统间需要数据互通,订单系统需要把一些订单信息、订单明细、回款、其他发送给B系统,但这些数据不是同时生成,还会有修改。直到订单的的状态改变为"审核通过",订单信息(所有的)才不会再继续推送。
两个系统是双向的,订单系统也会发送一些信息告诉B系统订单已完成/已取消,B系统也可以发送一些信息告诉订单系统订单已完成/已取消。从而促使对方的业务逻辑发生相应的变化。该篇文章假定为单向请求即订单系统向B系统发送数据
消息队列(rabbitMQ)
定时任务(xxl-job)
socket长连接或短连接
生产者
消息是否发送到交换机
消息是否由交换机转发到队列
消费者
消费者是否接收到消息
ack/nack
如果未接收到消息,是否重试?重试几次?时间间隔多久?如果重试失败该如何处理
如果保证消息的幂等性(即针对消息重复推送如何只消费一条消息)
如果消费失败,如果把消息转入死信队列
首先,创建两个demo,分别叫做rabbit-producer和rabbit-consumer。两个demo的项目架构如下:
pom.xml内容如下:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--重点-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--redis用于处理消息的幂等性-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--如果是消息是否有问题,可以发邮件给开发人员进行通知-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
</dependencies>
consumer.yml内容如下:
spring:
application:
# 应用名称
name: rabbit-consumer
redis:
host: 127.0.0.1
port: 6379
password:
rabbitmq:
# 连接地址
host: 127.0.0.1
# 端口
port: 5672
# 登录账号
username: guest
# 登录密码
password: guest
# 虚拟主机
virtual-host: /
listener:
simple:
#手动签收消息
acknowledge-mode: manual
# 投递失败时是否重新排队 默认值:true
default-requeue-rejected: false
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试时间间隔
producer.yml内容如下:
spring:
application:
# 应用名称
name: rabbit-producer
redis:
host: 127.0.0.1
port: 6379
password:
rabbitmq:
# 连接地址
host: 127.0.0.1
# 端口
port: 5672
# 登录账号
username: guest
# 登录密码
password: guest
# 虚拟主机
virtual-host: /
#开启生产者确认机制,是否到达交换机,也可以填sample
publisher-confirm-type: correlated
#交换机是否到达队列
publisher-returns: true
#消息是否到达交换机
publisher-confirms: true
listener:
simple:
acknowledge-mode: manual
# 投递失败时是否重新排队 默认值:true
default-requeue-rejected: false
生产者主要由一个配置类RabbitConfig和一个Controller组成,配置类用于创建交换机、队列和配置绑定关系等。生产者用于发送消息,确认消息是否到达
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
//业务交换机
public static final String ORDER_EXCHANGE = "order_exchange";
//死信交换机
public static final String DEAD_LETTER_EXCHANGE = "order_exchange_dead_letter";
//业务队列
public static final String ORDER_QUEUE = "order_queue";
//死信队列
public static final String DEAD_LETTER_ORDER_QUEUE = "order_queue_dead_letter";
//路由
public static final String ROUTING_KEY_QUEUE_ORDER = "key_order";
public static final String DEAD_LETTER_ROUTING_KEY_QUEUE_ORDER = "key_order_dead_letter";
@Bean
public DirectExchange orderExchange(){
return new DirectExchange(ORDER_EXCHANGE,true,false);
}
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE,true,false);
}
@Bean
public Queue orderQueue(){
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_QUEUE_ORDER);
return new Queue(ORDER_QUEUE,true,false,false,args);
}
@Bean
public Queue deadLetterOrderqueue(){
return new Queue(DEAD_LETTER_ORDER_QUEUE,true);
}
@Bean
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_QUEUE_ORDER);
return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
}
@Bean
public Binding orderBinding(){
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ROUTING_KEY_QUEUE_ORDER);
}
@Bean
public Binding orderDeadLetterBinding(){
return BindingBuilder.bind(deadLetterOrderqueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY_QUEUE_ORDER);
}
// java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setMessageConverter(new SerializerMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
package com.example.demo.controller;
import lombok.extern.slf4j.Slf4j;
import net.minidev.json.JSONObject;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@Slf4j
@RestController
@RequestMapping("/rabbitProducer")
public class Producer {
//业务交换机
public static final String ORDER_EXCHANGE = "order_exchange";
public static final String ROUTING_KEY_QUEUE_ORDER = "key_order";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void sendMessage(){
JSONObject jsonObject = new JSONObject();
jsonObject.put("email","11111111111");
jsonObject.put("timestamp",System.currentTimeMillis());
String json = jsonObject.toJSONString();
Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
System.out.println(json);
/**
* 消息是否到达交换机
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("发送消息到交换器成功");
}else{
log.info("发送消息到交换器失败");
}
System.out.println(correlationData);
System.out.println("发送消息到交换器标志(true-成功 false-失败): "+ack);
System.out.println(cause);
}
});
/**
* 消息是否达到队列
*/
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("------------- 没到达队列 --------------");
System.out.println(returnedMessage);
System.out.println("------------- 没到达队列 --------------");
}
});
//
rabbitTemplate.convertAndSend(ORDER_EXCHANGE,ROUTING_KEY_QUEUE_ORDER,message);
}
}
package com.example.demo.controller;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.minidev.json.JSONObject;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@Slf4j
@Component
public class Consumer {
//业务队列
public static final String ORDER_QUEUE = "order_queue";
//死信队列
public static final String DEAD_LETTER_ORDER_QUEUE = "order_queue_dead_letter";
@Autowired
RedisTemplate redisTemplate;
@Autowired
private JavaMailSender mailSender;
@RabbitListener(queues = ORDER_QUEUE)
@RabbitHandler
public void receiveMessage(Message message, Channel channel) throws IOException {
try{
//用于测试是否会进入死信队列被消费
int x = 1 / 0;
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"UTF-8");
System.out.println("接收导的消息为:"+msg+"==消息id为:"+messageId);
String messageIdRedis = null;
//验证是否是重复消息
if(redisTemplate.hasKey("messageId")){
messageIdRedis = redisTemplate.opsForValue().get("messageId").toString();
if(messageId.equals(messageIdRedis)){
//说明消息已被消费
return;
}
}
redisTemplate.opsForValue().set("messageId",messageId);
System.out.println("-----------------------------------------------------------");
System.out.println("接收到的消息为"+msg);
System.out.println("-----------------------------------------------------------");
//手动签收
//给接收到消息打个标记。默认应由RabbitMQ随机生成并用来它自己区分接收到的消息。所以此处应赋值为message.getMessageProperties().getDeliveryTag()
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println(deliveryTag);
//可以做一些确认,比如code=200,才手动确认
channel.basicAck(deliveryTag,false);
// 第二个参数是否批量确认,第三个参数是否重新回队列
//channel.basicNack(deliveryTag,false,true);
}catch (Exception e){
/* SimpleMailMessage mailMsg = new SimpleMailMessage();
// 发件人
mailMsg.setFrom("hexiangli@chosenmedtech.com");
// 收件人
mailMsg.setTo("hexiangli@chosenmedtech.com");
// 邮件标题
mailMsg.setSubject("消息队列异常,请及时解决");
// 邮件内容
mailMsg.setText("crm与limis消息队列消费异常");
// 抄送人
mailMsg.setCc("2393545826@qq.com");
mailSender.send(mailMsg);*/
log.error("消息消费发生异常,error msg:{}", e.getMessage());
channel.basicNack((Long)message.getMessageProperties().getDeliveryTag(), false, false);
}
}
@RabbitListener(queues = DEAD_LETTER_ORDER_QUEUE)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
https://gitee.com/lhx890/rabbitmq-demo.git
比如有关数据库操作,新增/修改/删除 或者 新增/删除/新增/修改,如果顺序错了,数据库操作也将失败。如果对于同一个订单进行数据库操作需保持它的顺序性。即把消息推送到同一个queue,一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章