【rabbitmq】单独配置某一个消费者手动ack,其他消费者自动ack
阅读原文时间:2023年07月08日阅读:2

前言:博主才疏学浅,此方案仅供参考,如有更优方案请大佬评论区告知,十分感谢✿✿ヽ(°▽°)ノ✿

问题背景:同一个服务中存在多个不同业务的rabbitmq的消费者,其中一个推送业务的消费者需要加死信队列作为推送失败补偿机制,并且需要根据推送成功与否来判断该消息是否进死信队列,这就需要手动ACK控制,但由于项目配置文件中配置了retry,所以默认是全局自动ack。如果只是在该推送消费者中写手动ack,其他消费者不做改动,会导致其他消费者因没有ack而消息堵塞。

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=2
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms

处理方案:

不同消费者使用不同配置 SimpleRabbitListenerContainerFactory

@Configuration
public class ConsumerConfig {
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactoryManual(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
}

/**
* 需要手动ack的队列消费
*/
@RabbitListener(containerFactory = "rabbitListenerContainerFactoryManual",queues ="send.subscribe",concurrency="${spring.rabbitmq.listener.custom.concurrency}")

_public void handleSendSubscribe(Channel channel, Message message, SendSubscribeMsg msg) throws IOException {
  //推送
boolean sendResult = this.sendHandler.prepareSend(msg);
  //根据推送结果判断是否进死信队列
if (!sendResult) {
log.warn("send to dead letter .");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}else{
log.info("success send");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}  

}

_/**
*需要自动ack的队列消费

*/

@RabbitListener(containerFactory = "rabbitListenerContainerFactory",queues ="sys.property.post",concurrency="${spring.rabbitmq.listener.custom.concurrency}")__

__public void handleDevicePropsPost(DevicePropertiesPostMsg attr) {
//业务逻辑处理
    ……

}__