1、hello word - 简单模式
2、work queues - 工作模式
3、publish / subscribe - 发布订阅模式
4、Routing - 路由模式
5、Topics - 主题模式
6、RPC模式 - 不用了解也行
7、publisher confirms - 发布确认模式
以下的方式自行选择一种即可
查看自己的Linux版本
uname -a
准备工作
1、下载Erlang,因为:RabbitMQ是Erlang语言写的,Erlang下载地址【 ps:这是官网 】:https://www.erlang.org/downloads,选择自己要的版本即可
要是github下载慢的话,都有自己的文明上网加速方式,要是没有的话,可以进入 https://github.com/fhefh2015/Fast-GitHub 下载好了然后集成到自己浏览器的扩展程序中即可,而如果进入github很慢的话,可以选择去gitee中搜索一个叫做:dev-sidecar
的东西安装,这样以后进入github就很快了,还有另外的很多方式,不介绍了。
2、执行rpm -ivh erlang文件
命令
yum install rpm
指令即可安装rpm3、安装RabbitMQ需要的依赖环境
yum install socat -y
4、下载RabbitMQ的rpm文件,github地址:https://github.com/rabbitmq/rabbitmq-server/releases , 选择自己要的版本即可
5、安装RabbitMQ
6、启动RabbitMQ服务
启动服务
sbin/service rabbitmq-server start停止服务
/sbin/service rabbitmq-server stop
查看启动状态
/sbin/service rabbitmq-server status
开启开机自动
chkconfig rabbitmq-server on</code></pre>
![]()
- 查看启动状态
![]()
- 这表示正在启动,需要等一会儿,看到下面的样子就表示启动成功
![]()
7、安装web管理插件
1、停止RabbitMQ服务
service rabbitmq-server stop // 使用上面的命令 /sbin/service rabbitmq-server stop也行
2、安装插件
rabbitmq-plugins enable rabbitmq_management
3、开启RabbitMQ服务
service rabbitmq-server start
要是访问不了,看看自己的防火墙关没关啊
systemctl status firewalld
systemctl stop firewalld
systemctl enable firewalld
同时查看自己的服务器有没有开放15672端口,不同的东西有不同的处理方式,如我的云服务器直接在服务器网址中添加规则即可,其他的方式自行百度
需要保证自己的Linux中有Docker容器,教程链接:https://www.cnblogs.com/xiegongzi/p/15621992.html
使用下面的两种方式都不需要进行web管理插件的安装和erlang的安装
1、查看自己的docker容器中是否已有了rabbitmq这个名字的镜像
docker images
删除镜像
docker rmi 镜像ID // 如上例的 dockerrmi 16c 即可删除镜像
2、拉取RabbitMQ镜像 并 启动Docker容器
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
3、查看Docker容器是否启动
docker ps
采用了第二种方式的话,记得把已经启动的Docker容器关了,以下是附加的一些Docker的基操
docker pull 镜像名称
docker images
docker rmi 镜像ID
docker save -o 导出的路径 镜像id
docker load -i 镜像文件
docker tag 镜像id 新镜像名称:版本
docker run 镜像ID | 镜像名称
docker run -d -p 宿主机端口:容器端口 --name 容器名称 镜像ID | 镜像名称
docker ps [-qa]
docker logs -f 容器id
docker exec -it 容器id bash
docker cp 文件名称 容器id:容器内部路径
docker cp index.html 982:/usr/local/tomcat/webapps/ROOT
=====================================================================
docker restart 容器id
docker start 容器id
docker stop 容器id
docker stop $(docker ps -qa)
docker rm 容器id
docker rm $(docker ps -qa)
1、创建一个文件夹,这些我很早之前就玩过了,所以建好了的
# 创建文件夹
mkdir 文件夹名
2、进入文件夹,创建docker-compose.yml文件,注意:文件名必须是这个
# 创建文件
touch docker-compose.yml
3、编辑docker-compose.yml文件
# 编辑文件
vim docker-compose.yml
里面编写的内容如下,编写好保存即可。注意:别用tab缩进啊,会出问题的,另外:每句的后面别有空格,严格遵循yml格式的
version: "3.1"
services:
rabbitmq:
image: rabbitmq:3.9-management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./data:/opt/install/rabbitMQ-docker/
4、在docker-compose.yml所在路径执行如下命令,注意:一定要在此文件路径中才行,因为默认是在当前文件夹下找寻docker-compose文件
docker-compose up -d
=========================================================
docker-compose up -d
docker-compose down
docker-compose start|stop|restart
docker-compose ps
docker-compose logs -f
这是因为guest是游客身份,不能进入,需要添加新用户
查看当前用户 / 角色有哪些
rabbitmqctl list_users
删除用户
rabbitmqctl delete_user 用户名
添加用户
rabbitmqctl add_user 用户名 密码
设置用户角色
rabbitmqctl set_user_tags 用户名 administrator
设置用户权限【 ps:guest角色就是没有这一步 】
rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
创建Maven项目 并导入如下依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
回到前面的RabbitMQ原理图
生产者
package cn.zixieqing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static final String HOST = "ip"; // 放RabbitMQ服务的服务器ip
public static final int PORT = 5672; // 服务器中RabbitMQ的端口号,在浏览器用的15672是通过5672映射出来的15672
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String QUEUE_NAME = "hello word";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、获取链接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置链接信息
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
/*
当然:这里还可以设置vhost虚拟机 - 前提是自己在web管理界面中设置了vhost
factory.setVirtualHost();
*/
// 3、获取链接Connection
Connection connection = factory.newConnection();
// 4、创建channel信道 - 它才是去和交换机 / 队列打交道的
Channel channel = connection.createChannel();
// 5、准备一个队列queue
// 这里理论上是去和exchange打交道,但是:这里是hello word简单模式,所以直接使用默认的exchange即可
/*
下面这是参数的完整意思,源码中偷懒了,没有见名知意
queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
参数1、队列名字
参数2、是否持久化( 保存到磁盘 ),默认是在内存中的
参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息
参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除
参数5、其他配置项,这涉及到后面的知识,目前选择null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("正在发送信息!!!");
// 6、推送信息到队列中
// 准备发送的信息内容
String message = "it is hello word";
/*
basicPublish( exchangeName,queueName,properties,message )
参数1、交互机名字 - 使用了默认的
参数2、指定路由规则,使用队列名字
参数3、指定传递的消息所携带的properties
参数4、推送的具体消息 - byte类型的
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 7、释放资源 - 倒着关闭即可
if ( null != channel ) channel.close();
if ( null != connection ) connection.close();
System.out.println("消息发送完毕");
}
}
消费者
public class Consumer {
public static final String HOST = "162.14.66.60";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String QUEUE_NAME = "hello word";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置链接信息
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
// 3、创建链接对象
Connection connection = factory.newConnection();
// 4、创建信道channel
Channel channel = connection.createChannel();
// 5、从指定队列中获取消息
/*
basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback )
参数1、队列名
参数2、是否自动应答,为true时,消费者接收到消息后,会立即告诉RabbitMQ
参数3、消费者未成功消费消息的回调
参数4、消费者取消消费的回调
*/
System.out.println("开始接收消息!!!");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到了消息:" + new String(message.getBody(), StandardCharsets.UTF_8) );
};
CancelCallback cancelCallback = consumerTag -> System.out.println("消费者取消了消费信息行为");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
// 6、释放资源 - 但是这里不能直接关闭啊,否则:看不到接收的结果的,可以选择不关,也可以选择加一句代码System.in.read();
// channel.close();
// connection.close();
}
}
抽取RabbitMQ链接的工具类
package cn.zixieqing.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQUtil {
public static final String HOST = "自己的ip";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static Channel getChannel(String vHost ) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
if ( !vHost.isEmpty() ) factory.setVirtualHost(vHost);
return factory.newConnection().createChannel();
}
}
生产者
和hello word没什么两样
package cn.zixieqing.workqueue;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class WorkProducer {
private static final String QUEUE_NAME = "work queue";
public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");
// 1、声明队列
/*
下面这是参数的完整意思,源码中偷懒了,没有见名知意
queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
参数1、队列名字
参数2、是否持久化( 保存到磁盘 ),默认是在内存中的
参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息
参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除
参数5、其他配置项,这涉及到后面的知识,目前选择null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 2、准备消息
System.out.println("请输入要推送的信息,按回车确认:");
Scanner input = new Scanner(System.in);
// 3、推送信息到队列中
while (input.hasNext()) {
/*
basicPublish( exchangeName,routing key,properties,message )
参数1、交互机名字 - 使用了默认的
参数2、指定路由规则,使用队列名字
参数3、指定传递的消息所携带的properties
参数4、推送的具体消息 - byte类型的
*/
String message = input.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息====>" + message + "====>推送完毕!");
}
}
}
消费者
消费者01
package cn.zixieqing.workqueue;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WorkConsumer {
private static final String QUEUE_NAME = "work queue";
public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到了消息====>" + new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println( consumerTag + "消费者中断了接收消息====>" );
};
System.out.println("消费者01正在接收消息......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消费者02
package cn.zixieqing.workqueue;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WorkConsumer {
private static final String QUEUE_NAME = "work queue";
public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到了消息====>" + new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println( consumerTag + "消费者中断了接收消息====>" );
};
System.out.println("消费者02正在接收消息......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
这个东西已经见过了
/*
basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback )
参数1、队列名
参数2、是否自动应答,为true时,消费者接收到消息后,会立即告诉RabbitMQ
参数3、消费者未成功消费消息的回调
参数4、消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
就是我们自己去设定,好处是可以批量应答并且减少网络拥堵
调用的API如下:
Channel.basicACK( long, boolean ); // 用于肯定确认,即:MQ已知道该消息 并且 该消息已经成功被处理了,所以MQ可以将其丢弃了
Channel.basicNack( long, boolena, boolean ); // 用于否定确认
Channel.basicReject( long, boolea ); // 用于否定确认
与Channel.basicNack( long, boolena, boolean )相比,少了一个参数,这个参数名字叫做:multiple</code></pre></li></ul>
multiple参数说明,它为true和false有着截然不同的意义【 ps:建议弄成false,虽然是挨个去处理,从而应答,效率慢,但是:数据安全,否则:很大可能造成数据丢失 】
生产者
package cn.zixieqing.ACK;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class AckProducer {
private static final String QUEUE_NAME = "ack queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
// 声明队列
/*
下面这是参数的完整意思,源码中偷懒了,没有见名知意
queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
参数1、队列名字
参数2、是否持久化( 保存到磁盘 ),默认是在内存中的
参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息
参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除
参数5、其他配置项,这涉及到后面的知识,目前选择null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("请输入要推送的消息:");
Scanner input = new Scanner(System.in);
while (input.hasNext()) {
/*
basicPublish( exchangeName,routing key,properties,message )
参数1、交互机名字 - 使用了默认的
参数2、指定路由规则,使用队列名字
参数3、指定传递的消息所携带的properties
参数4、推送的具体消息 - byte类型的
*/
String message = input.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息====>" + message + "推送完毕");
}
}
}
消费者01
package cn.zixieqing.ACK;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class AckConsumer {
private static final String QUEUE_NAME = "ack queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Thread.sleep(5*1000);
System.out.println("接收到了消息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 ));
// 添加手动应答
/*
basicAck( long, boolean )
参数1、消息的标识tag,这个标识就相当于是消息的ID
参数2、是否批量应答multiple
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
System.out.println("消费者01正在接收消息,需要5秒处理完");
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
System.out.println("触发消费者取消消费消息行为的回调");
System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8)));
});
}
}
消费者02
package cn.zixieqing.ACK;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class AckConsumer {
private static final String QUEUE_NAME = "ack queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Thread.sleep(10*1000);
System.out.println("接收到了消息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 ));
// 添加手动应答
/*
basicAck( long, boolean )
参数1、消息的标识tag,这个标识就相当于是消息的ID
参数2、是否批量应答multiple
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
System.out.println("消费者02正在接收消息,需要10秒处理完");
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
System.out.println("触发消费者取消消费消息行为的回调");
System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8)));
});
}
}
这个玩意儿的配置吧,早间就过了,在生产者消息发送时,有一个声明队列的过程,那里面就有一个是否持久化的配置
/*
下面这是参数的完整意思,源码中偷懒了,没有见名知意
queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
参数1、队列名字
参数2、是否持久化( 保存到磁盘 ),默认是在内存中的
参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息
参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除
参数5、其他配置项,这涉及到后面的知识,目前选择null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
而如果没有持久化,那么RabbitMQ服务由于其他什么原因导致挂彩的时候,那么重启之后,这个没有持久化的队列就灰飞烟灭了【 ps:注意和里面的消息还没关系啊,不是说队列持久化了,那么消息就持久化了 】
在这个队列持久化配置中,它的默认值就是false,所以要改成true时,需要注意一个点:选择队列持久化,那么必须保证当前这个队列是新的,即:RabbitMQ中没有当前队列,否则:需要进到web管理界面把已有的同名队列删了,然后重新配置当前队列持久化选项为true,不然:报错
inequivalent arg 'durable' for queue 'queue durable' in vhost '/': received 'true' but current is 'false'
注意:这里说的消息持久化不是说配置之后消息就一定不会丢失,而是:把消息标记为持久化,然后RabbitMQ尽量让其持久化到磁盘
但是:也会有意外,比如:RabbitMQ在将消息持久化到磁盘时,这是有一个时间间隔的,数据还没完全刷写到磁盘呢,RabbitMQ万一出问题了,那么消息 / 数据还是会丢失的,所以:消息持久化配置是一个弱持久化,但是:对于简单队列模式完全足够了,强持久化的实现方式在后续的publisher / confirm发布确认模式中
至于配置机极其的简单,在前面都已经见过这个配置项,就是生产者发消息时做文章,就是下面的第三个参数,把它改为MessageProperties.PERSISTENT_TEXT_PLAIN
即可
/*
basicPublish( exchangeName,routing key,properties,message )
参数1、交互机名字 - 使用了默认的
参数2、指定路由规则,使用队列名字
参数3、指定传递的消息所携带的properties
参数4、推送的具体消息 - byte类型的
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 改成消息持久化
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());</code></pre></li>
MessageProperties类的源码如下:
public class MessageProperties {
public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public MessageProperties() {
}
}
上面用到了BasicProperties类型,它的属性如下:
public static class BasicProperties extends AMQBasicProperties {
// 消息内容的类型
private String contentType;
// 消息内容的编码格式
private String contentEncoding;
// 消息的header
private Map<String, Object> headers;
// 消息是否持久化,1:否,2:是
private Integer deliveryMode;
// 消息的优先级
private Integer priority;
// 关联ID
private String correlationId;
// :用于指定回复的队列的名称
private String replyTo;
// 消息的失效时间
private String expiration;
// 消息ID
private String messageId;
// 消息的发送时间
private Date timestamp;
// 类型
private String type;
// 用户ID
private String userId;
// 应用程序ID
private String appId;
// 集群ID
private String clusterId;
}
不公平分发
这个东西是在消费者那一方进行设置的
RabbitMQ默认是公平分发,即:轮询分发
轮询分发有缺点:如前面消费者01( 谁5秒的那个 )和 消费者02 ( 谁10秒的那个 ),这种情况如果采用轮询分发,那么:01要快一点,而02要慢一点,所以01很快处理完了,然后处于空闲状态,而02还在拼命奋斗中,最后的结果就是02不停干,而01悠悠闲闲的,浪费了时间,所以:应该压榨一下01,让它不能停
设置方式:在消费者接收消息之前进行channel.basicQos( int prefetchCount )设置
// 不公平分发,就是在这里接收消息之前做处理
/*
basicQos( int prefetchCount )
为0、轮询分发 也是RabbitMQ的默认值
为1、不公平分发
*/
channel.basicQos(1);channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> {
System.out.println("消费者中断了接收消息行为触发的回调");
});</code></pre></li>
预取值
而要设置这种效果,和前面不公平分发的设置是一样的,只是把里面的参数改一下即可
// 预取值,也是在这里接收消息之前做处理,和不公平分发调的是同一个API
/*
basicQos( int prefetchCount ) 为0、轮询分发 也是RabbitMQ的默认值;为1、不公平分发
而当这里的数字变成其他的,如:上图中上面的那个消费者要消费20条消息,那么把下面的数字改成对应的即可
注意点:这是要设置哪个消费的预取值,那就是在哪个消费者代码中进行设定啊
*/
channel.basicQos(10); // 这样就表示这个代码所在的消费者需要消费10条消息了channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> {
System.out.println("消费者中断了接收消息行为触发的回调");
});</code></pre></li>
在上面的过程中,想要让数据持久化,那么需要具备以下的条件
而所谓的发布确认指的就是:数据在刷写到磁盘时,成功了,那么MQ就回复生产者一下,数据确认刷写到磁盘了,否则:只具备前面的二者的话,那也有可能出问题,如:数据推到了队列中,但是还没来得及刷写到磁盘呢,结果RabbitMQ宕机了,那数据也有可能会丢失,所以:现在持久化的过程就是如下的样子:
开启发布确认
在发送消息之前( 即:调basicPublish() 之前 )调一个API就可以了
channel.confirmSelect(); // 没有参数
一句话:一手交钱一手交货,即 生产者发布一条消息,RabbitMQ就要回复确认状态,否则不再发放消息,因此:这种模式是同步发布确认的方式,缺点:很慢,优点:能够实时地了解到那条消息出异常 / 哪些消息都发布成功了
这种方式,
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {// 单个确认发布
singleConfirm(); // 单个确认发布发送这些消息花费4797ms
}
public static void singleConfirm() throws IOException, TimeoutException, InterruptedException {Channel channel = MQUtil.getChannel("");
// 开启确认发布
channel.confirmSelect();
// 声明队列 并 让队列持久化
channel.queueDeclare("singleConfirm", true, false, false, null);
long begin = System.currentTimeMillis();
for (int i = 1; i <= 100; i++) {
// 发送消息 并 让消息持久化
channel.basicPublish("","singleConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() );
// 发布一个 确认一个 channel.waitForConfirms()
if ( channel.waitForConfirms() )
System.out.println("消息".concat( String.valueOf(i) ).concat( "发送成功") );
}
long end = System.currentTimeMillis();
System.out.println("单个确认发布发送这些消息花费".concat( String.valueOf( end-begin ) ).concat("ms") );
}
一句话:只要结果,是怎么一个批量管不着,只需要把一堆消息发布之后,回复一个结果即可,这种发布也是同步的
优点:效率相比单个发布要高
缺点:如果因为什么系统故障而导致发布消息出现问题,那么就会导致是批量发了一些消息,然后再回复的,中间有哪个消息出问题了鬼知道
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
// 单个确认发布
// singleConfirm(); // 单个确认发布发送这些消息花费4797ms// 批量发布
batchConfirm(); // 批量发布发送的消息共耗时:456ms
}
public static void batchConfirm() throws IOException, TimeoutException, InterruptedException {Channel channel = MQUtil.getChannel("");
// 开启确认发布
channel.confirmSelect();
// 声明队列 并 让队列持久化
channel.queueDeclare("batchConfirm", true, false, false, null);
long begin = System.currentTimeMillis();
for (int i = 1; i <= 100; i++) {
// 发送消息 并 让消息持久化
channel.basicPublish("","batchConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() );
// 批量发布 并 回复批量发布的结果 - 发了10条之后再确认
if (i % 10 == 0) {
channel.waitForConfirms();
System.out.println("消息" + ( i-10 ) + "====>" + i + "的消息发布成功");
}
}
// 为了以防还有另外的消息未被确认,再次确认一下
channel.waitForConfirms();
long end = System.currentTimeMillis();
System.out.println("批量发布发送的消息共耗时:" + (end - begin) + "ms");
}
由上图可知:所谓的异步确认发布的就是:
而上述牵扯到一个map集合,那么这个集合需要具备如下的条件:
代码实现
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
// 单个确认发布
// singleConfirm(); // 单个确认发布发送这些消息花费4797ms
// 批量发布
// batchConfirm(); // 批量发布发送的消息共耗时:456ms
asyncConfirm(); // 异步发布确认耗时:10ms
}
// 异步发布确认
public static void asyncConfirm() throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
channel.confirmSelect();
channel.queueDeclare("async confirm", true, false, false, null);
// 1、准备符合条件的map
ConcurrentSkipListMap<Long, Object> messagePoolMap = new ConcurrentSkipListMap<>();
// 3、对信道channel进行监听
// 成功确认发布回调
ConfirmCallback ackCallback = (messageTag, multiple) -> {
System.out.println("确认发布了消息=====>" + messagePoolMap.headMap(messageTag) );
// 4、把确认发布的消息删掉,减少内存开销
// 判断是否是批量删除
if ( multiple ){
// 通过消息标识tag 把 确认发布的消息取出
messagePoolMap.headMap(messageTag).clear();
/**
* 上面这句代码拆分写法
* ConcurrentNavigableMap<Long, Object> confirmed = messagePoolMap.headMap(messageTag);
* confirmed.clear();
*/
}else {
messagePoolMap.remove(messageTag);
}
};
// 没成功发布确认回调
ConfirmCallback nackCallback = (messageTag, multiple) -> {
System.out.println("未确认的消息是:" + messagePoolMap.get(messageTag) );
};
// 进行channel监听 这是异步的
/**
* channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)
* 参数1、消息成功发布的回调函数 ackCallback()
* 参数2、消息未成功发布的回调函数 nackCallback()
*/
channel.addConfirmListener( ackCallback,nackCallback );
long begin = System.currentTimeMillis();
for (int i = 1; i <= 100; i++) {
// 2、将要发布的全部信息保存到map中去
/*
channel.getNextPublishSeqNo() 获取下一次将要发送的消息标识tag
*/
messagePoolMap.put(channel.getNextPublishSeqNo(),String.valueOf(i) );
// 生产者只管发布就行
channel.basicPublish("","async confirm",MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes());
System.out.println("消息=====>" + i + "发送完毕");
}
long end = System.currentTimeMillis();
System.out.println("异步发布确认耗时:" + ( end-begin ) + "ms" );
}
临时队列
所谓的临时队列指的就是:自动帮我们生成的队列名 并且 当生产者和队列断开之后,这个队列会被自动删除
所以这么一说:前面玩过的一种就属于临时队列,即:将下面的第四个参数改成true即可【 ps:当然让队列名随机生成就完全匹配了 】
/*
下面这是参数的完整意思,源码中偷懒了,没有见名知意
queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties )
参数1、队列名字
参数2、是否持久化( 保存到磁盘 ),默认是在内存中的
参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息
参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除
参数5、其他配置项,这涉及到后面的知识,目前选择null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
而如果要更简单的生成临时队列,那么调用如下的API即可
String queueName = channel.queueDeclare().getQueue();
这样帮我们生成的队列效果就和channel.queueDeclare(QUEUE_NAME, false, false, true, null);
是一样的了
生产者
package cn.zixieqing.fanout;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class FanoutProducer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
/**
* 定义交换机
* 参数1、交换机名字
* 参数2、交换机类型
*/
channel.exchangeDeclare("fanoutExchange", BuiltinExchangeType.FANOUT);
System.out.println("请输入要发送的内容:");
Scanner input = new Scanner(System.in);
while (input.hasNext()){
String message = input.next();
channel.basicPublish("fanoutExchange","", null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息=====>" + message + "发送完毕");
}
}
}
消费者01
package cn.zixieqing.fanout;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class FanoutConsumer01 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
// 绑定队列
/**
* 参数1、队列名字
* 参数2、交换机名字
* 参数3、用于绑定的routing key / binding key
*/
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "fanoutExchange", "");
System.out.println("01消费者正在接收消息........");
channel.basicConsume(queueName,true,(consumerTag,message)->{
// 这里面接收到消息之后就可以用来做其他事情了,如:存到磁盘
System.out.println("接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
},consumerTage->{});
}
}
消费者02
package cn.zixieqing.fanout;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class FanoutConsumer02 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
// 绑定队列
/**
* 参数1、队列名字
* 参数2、交换机名字
* 参数3、用于绑定的routing key / binding key
*/
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "fanoutExchange", "");
System.out.println("02消费者正在接收消息........");
channel.basicConsume(queueName,true,(consumerTag,message)->{
// 这里面接收到消息之后就可以用来做其他事情了,如:存到磁盘
System.out.println("接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
},consumerTage->{});
}
}
生产者
package cn.zixieqing.direct;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class DirectProducer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
channel.exchangeDeclare("directExchange", BuiltinExchangeType.DIRECT);
System.out.println("请输入要发送的消息:");
Scanner input = new Scanner(System.in);
while (input.hasNext()){
String message = input.next();
/**
* 对第二个参数routing key做文章
* 假如这里的routing key为zixieqing 那么:就意味着消费者只能是绑定了zixieqing的队列才可以进行接收这里发的消息内容
*/
channel.basicPublish("directExchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息=====>" + message + "====>发送完毕");
}
}
}
消费者01
package cn.zixieqing.direct;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class DirectConsumer01 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
channel.queueDeclare("direct", false, false, false, null);
/**
* 队列绑定
* 参数1、队列名
* 参数2、交换机名字
* 参数3、routing key 这里的routing key 就需要和生产者中的一样了,这样才可以通过这个routing key去对应的队列中取消息
*/
channel.queueBind("direct", "directExchange", "zixieqing");
System.out.println("01消费者正在接收消息.......");
channel.basicConsume("direct",true,(consumerTag,message)->{
System.out.println("01消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
},consumerTag->{});
}
}
而此时再加一个消费者,让它的routing key值和消费者中的不同
package cn.zixieqing.direct;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class DirectConsumer02 {
public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");
channel.queueDeclare("direct", false, false, false, null);
/**
* 队列绑定
* 参数1、队列名
* 参数2、交换机名字
* 参数3、routing key 这里的routing key 就需要和生产者中的一样了,这样才可以通过这个routing key去对应的队列中取消息
*/
// 搞点事情:这里的routing key的值zixieqing和生产者的不同
channel.queueBind("direct", "directExchange", "xiegongzi");
System.out.println("02消费者正在接收消息.......");
channel.basicConsume("direct",true,(consumerTag,message)->{
System.out.println("02消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
},consumerTag->{});
}
}
topic中routing key的要求
只要交换机类型是topic类型的,那么其routing key就不能乱写,要求:routing key只能是一个单词列表,多个单词之间采用点隔开,如:cn.zixieqing.rabbit
单词列表的长度不能超过255个字节
在routing key的规则列表中有两个替换符可以用
*
代表一个单词#
代表零活无数个单词假如有如下的一个绑定关系图
Q1绑定的是:中间带 orange 带 3 个单词的字符串(.orange.)
Q2绑定的是:
熟悉一下这种绑定关系( 左为一些routes路由规则,右为能匹配到上图绑定关系的结果 )
quick.orange.rabbit 被队列 Q1Q2 接收到
lazy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定,但只被队列 Q2 接收一次
quick.brown.fox 不满足任何绑定关系,不会被任何队列接收到,会被丢弃
quick.orange.male.rabbit 是四个单词,不满足任何绑定关系,会被丢弃
lazy.orange.male.rabbit 虽是四个单词,但匹配 Q2,因:符合lazy.#这个规则
当队列绑定关系是下列这种情况时需要引起注意
把上面的绑定关系和测试转换成代码玩一波
生产者
package cn.zixieqing.topic;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class TopicProducer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);
/**
* 准备大量的routing key 和 message
*/
HashMap<String, String> routesAndMessageMap = new HashMap<>();
routesAndMessageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
routesAndMessageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
routesAndMessageMap.put("quick.orange.fox", "被队列 Q1 接收到");
routesAndMessageMap.put("lazy.brown.fox", "被队列 Q2 接收到");
routesAndMessageMap.put("lazy.pink.rabbit", "虽然满足两个绑定,但只被队列 Q2 接收一次");
routesAndMessageMap.put("quick.brown.fox", "不满足任何绑定关系,不会被任何队列接收到,会被丢弃");
routesAndMessageMap.put("quick.orange.male.rabbit", "是四个单词,不满足任何绑定关系,会被丢弃");
routesAndMessageMap.put("lazy.orange.male.rabbit ", "虽是四个单词,但匹配 Q2,因:符合lazy.#这个规则");
System.out.println("生产者正在发送消息.......");
for (Map.Entry<String, String> routesAndMessageEntry : routesAndMessageMap.entrySet()) {
String routingKey = routesAndMessageEntry.getKey();
String message = routesAndMessageEntry.getValue();
channel.basicPublish("topicExchange",routingKey,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息====>" + message + "===>发送完毕");
}
}
}
消费者01
package cn.zixieqing.topic;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class TopicConsumer01 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q1", false, false, false, null);
channel.queueBind("Q1", "topicExchange", "*.orange.*");
System.out.println("消费者01正在接收消息......");
channel.basicConsume("Q1",true,(consumerTage,message)->{
System.out.println("01消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
System.out.println("此条消息的交换机名为:" + message.getEnvelope().getExchange() + ",路由键为:" + message.getEnvelope().getRoutingKey());
},consumerTag->{});
}
}
消费者02
package cn.zixieqing.topic;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class TopicConsumer02 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q2", false, false, false, null);
channel.queueBind("Q2", "topicExchange", "*.*.rabbit");
channel.queueBind("Q2", "topicExchange", "lazy.#");
System.out.println("消费者02正在接收消息......");
channel.basicConsume("Q2",true,(consumerTage,message)->{
System.out.println("02消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8));
System.out.println("此条消息的交换机名为:" + message.getEnvelope().getExchange() + ",路由键为:" + message.getEnvelope().getRoutingKey());
},consumerTag->{});
}
}
生产者
就是一个正常的生产者发送消息而已
package cn.zixieqing.dead_letter_queue.queuelength.queuenumber;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {Channel channel = MQUtil.getChannel("");
channel.exchangeDeclare("messageNumber_normal_exchange", BuiltinExchangeType.DIRECT);
for (int i = 1; i < 11; i++) {
String message = "生产者发送了消息" + i;
channel.basicPublish("messageNumber_normal_exchange","zi",null,
message.getBytes(StandardCharsets.UTF_8) );
System.out.println("消息====>" + message + "====>发送完毕");
}
}
}
01消费者
package cn.zixieqing.dead_letter_queue.queuelength.queuenumber;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
/**
* 正常交换机名称
*/
public static final String NORMAL_EXCHANGE = "messageNumber_normal_exchange";
/**
* 正常队列名称
*/
public static final String NORMAL_QUEUE = "messageNumber_queue";
/**
* 死信交换机名称
*/
public static final String DEAD_EXCHANGE = "messageNumber_dead_exchange";
/**
* 死信队列名称
*/
public static final String DEAD_QUEUE = "messageNumber_dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
// 声明正常交换机、死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 死信交换机和死信队列进行绑定
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie");
// 声明正常队列 并 考虑达到条件时和死信交换机进行联系
HashMap<String, Object> params = new HashMap<>();
// 死信交换机
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 死信路由键
params.put("x-dead-letter-routing-key", "xie");
// 达到队列能接受的最大个数限制就多了如下的配置
params.put("x-max-length", 6);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
// 正常队列和正常交换机进行绑定
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zi");
System.out.println("01消费者正在接收消息......");
channel.basicConsume(NORMAL_QUEUE,true,(consumeTag,message)->{
System.out.println("01消费者接收到了消息:" + new String( message.getBody(), StandardCharsets.UTF_8));
},consumeTag->{});
}
}
和前面一种相比,在01消费者方做另一个配置即可
params.put("x-max-length-bytes", 255);
注意:关于两种情况同时使用的问题
如配置的如下两个
params.put("x-max-length", 6);
params.put("x-max-length-bytes", 255);
那么先达到哪个上限设置就执行哪个
注意点:必须开启手动应答
// 第二个参数改成false
channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{},consumeTag->{});
生产者
package cn.zixieqing.dead_letter_queue.reack;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
channel.exchangeDeclare("reack_normal_exchange", BuiltinExchangeType.DIRECT);
for (int i = 1; i < 11; i++) {
String message = "生产者发送的消息" + i;
channel.basicPublish("reack_normal_exchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息===>" + message + "===>发送完毕");
}
}
}
消费者
package cn.zixieqing.dead_letter_queue.reack;
import cn.zixieqing.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
public static final String NORMAL_EXCHANGE = "reack_normal_exchange";
public static final String DEAD_EXCHANGE = "reack_dead_exchange";
public static final String DEAD_QUEUE = "reack_dead_queue";
public static final String NORMAL_QUEUE = "reack_normal_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtil.getChannel("");
// 声明正常交换机、死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 死信队列绑定死信交换机
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie");
// 声明正常队列
HashMap<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key", "xie");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zixieqing");
System.out.println("01消费者正在接收消息.....");
// 1、注意:需要开启手动应答( 第二个参数为false )
channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
// 如果发送的消息为:生产者发送的消息5 则:拒收
if ( "生产者发送的消息5".equals( msg ) ) {
System.out.println("此消息====>" + msg + "===>是拒收的");
// 2、做拒收处理 - 注意:第二个参数设为false,表示不再重新入正常队列的队,这样消息才可以进入死信队列
channel.basicReject( message.getEnvelope().getDeliveryTag(),false);
}else {
System.out.println("01消费者接收到了消息=====>" + msg);
}
},consumeTag->{});
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章