目录
一个企业可能同时运行着多个不同的业务系统,
这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。
现在的问题是,如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。
中间件便是解决之道,它用自己的复杂换取了企业应用的简单。
中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。
人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),
但在这组中间件中必须要有一个通信中间件,
即中间件=平台+通信,
这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。
比如我MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,
发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的响应结果和反馈是什么,
然后按照对应的执行顺序进行处理。
例如:大家每天都在接触的http请求协议:
1:语法:http规定了请求报文和响应报文的格式。
2:语义:客户端主动发起请求称之为请求。(这是一种定义,同时你发起的是post/get请求)
3:时序:一个请求对应一个响应。(一定先有请求在有响应,这个是时序)
而消息中间件采用的并不是http协议,
而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。
为什么消息中间件不直接使用http协议呢?
因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加密解密,状态码,响应码等附加的功能,
但是对于一个消息而言,我们并不需要这么复杂,
它其实就是负责数据传递,存储,分发就行,
一定要追求的是高性能。尽量简洁,快速。
大部分情况下http大部分都是短链接,
一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。
这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,
目的是为了保证消息和数据的高可靠和稳健的运行。
AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。
是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
特性:
MQTT协议:(Message Queueing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。
特点:
轻量
结构简单
传输快,不支持事务
没有持久化设计。
应用场景:
由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
特点:
Kafka协议是基于TCP/IP的二进制协议。
消息内部是通过长度来分割,由一些基本数据类型组成。
特点是:
简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。
ActiveMQ
RabbitMQ
Kafka
RocketMQ
文件存储
支持
支持
支持
支持
数据库
支持
/
/
/
MQ消息队列有如下几个角色
那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?
一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,
我们发送的http请求就是一种典型的拉取数据库数据返回的过程。
而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。
ActiveMQ
RabbitMQ
Kafka
RocketMQ
发布订阅
支持
支持
支持
支持
轮询分发
支持
支持
支持
/
公平分发
/
支持
支持
/
重发
支持
支持
/
支持
消息拉取
/
支持
支持
支持
所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,
一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。
解说:生产者讲消费发送到Master节点,所有的都连接这个消息队列共享这块数据区域,Master节点负责写入,一旦Master挂掉,slave节点继续服务。从而形成高可用,
解释:这种模式写入消息同样在Master主节点上,但是主节点会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类同。
这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点就行消费,以为消息的拷贝和同步会暂用很大的带宽和网络资源。
解释:和上面的区别不是特别的大,但是它的写入可以往任意节点去写入。
解释:如果你插入的数据是broker-1中,元数据信息会存储数据的相关描述和记录存放的位置(队列)。
它会对描述信息也就是元数据信息就行同步,如果消费者在broker-2中进行消费,发现自己几点没有对应的消息,
可以从对应的元数据信息中去查询,然后返回对应的消息信息
场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他会去联系其他的黄牛询问,如果有就返回。
解释:实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高。
这么多集群模式,他们的最终目的都是为保证:消息服务器不会挂掉,出现了故障依然可以抱着消息服务继续使用。
三句话:
所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。
如何保证中间件消息的可靠性呢?可以从两个方面考虑:
什么是RabbitMQ,官方给出来这样的解释:
RabbitMQ是部署最广泛的开源消息代理。
RabbitMQ拥有成千上万的用户,是最受欢迎的开源消息代理之一。从T-Mobile 到Runtastic,RabbitMQ在全球范围内的小型初创企业和大型企业中都得到使用。
RabbitMQ轻巧,易于在内部和云中部署。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。
RabbitMQ可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具。
简单概述:
RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。
下载地址:https://www.rabbitmq.com/download.html
环境准备:CentOS7.x+ / Erlang
RabbitMQ是采用Erlang语言开发的,所以系统环境必须提供Erlang环境,第一步就是安装Erlang。
Erlang和RabbitMQ版本的按照比较: https://www.rabbitmq.com/which-erlang.html
安装Erlang 时要注意安装的RabbityMQ 所依赖的Erlang版本,根据RabbitMQ的要求选择一个版本,下载Erlang安装包后直接安装就可以了。
设置ERLANG_HOME 环境变量
在开始菜单查找Erlang,点击启动 打开如下界面,那么Erlang就安装成功了。
可以在RabbitMQ的官方网站下载最新版本的RabbitMQ服务器安装程序,RabbitMQ下载地址
RabbitMQ安装好后是作为windows service 运行在后台。
设置环境变量 RABBITQM_SERVER变量
然后在系统的path变量中配置如下: 加入sbin的路径
启动 rabbitmq-server
用下列命令安装rabbitmq_management插件,这款插件是可以可视化的方式查看RabbitMQ 服务器实例的状态,以及操控RabbitMQ服务器。
rabbitmq-plugins enable rabbitmq_management
现在我们在浏览器中输入:http://localhost:15672 可以看到一个登录界面
这里可以使用默认账号guest/guest登录
在浏览器中输入 http://localhost:15672/api/ 就可以看到 RabbitMQ Management HTTP API 文档
5672:RabbitMQ的通讯端口
25672:RabbitMQ的节点间的CLI通讯端口是
15672:RabbitMQ HTTP_API的端口,管理员用户才能访问,用于管理RabbitMQ,需要启动Management插件。
1883,8883:MQTT插件启动时的端口。
61613、61614:STOMP客户端插件启用的时候的端口。
15674、15675:基于webscoket的STOMP端口和MOTT端口
新增用户
rabbitmqctl add_user admin admin
设置用户分配操作权限
rabbitmqctl set_user_tags admin administrator
用户级别:
为用户添加资源权限
rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl change_password Username Newpassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_users 查看用户清单
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置administrator角色
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
[root@iZm5eauu5f1ulwtdgwqnsbZ ~]# lsb_release -a
LSB Version: :core-4.1-amd64:core-4.1-noarch
Distributor ID: CentOS
Description: CentOS Linux release 8.3.2011
Release: 8.3.2011
Codename: n/a
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v
yum install -y socat
> wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.13/rabbitmq-server-3.8.13-1.el8.noarch.rpm
> rpm -Uvh rabbitmq-server-3.8.13-1.el8.noarch.rpm
# 启动服务
> systemctl start rabbitmq-server
# 查看服务状态
> systemctl status rabbitmq-server
# 停止服务
> systemctl stop rabbitmq-server
# 开机启动服务
> systemctl enable rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"
docker pull rabbitmq:management
docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
docker logs -f myrabbit
> more xxx.log 查看日记信息
> netstat -naop | grep 5672 查看端口是否被占用
> ps -ef | grep 5672 查看进程
> systemctl stop 服务
不能访问management plugin
列出自己可以通过AMQP登入的虚拟机
查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
查看和关闭自己的channels和connections
查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。
包含management所有权限
查看和创建和删除自己的virtual hosts所属的policies和parameters信息。
包含management所有权限
罗列出所有的virtual hosts,包括不能登录的virtual hosts。
查看其他用户的connections和channels信息
查看节点级别的数据如clustering和memory使用情况
查看所有的virtual hosts的全局统计信息。
最高权限
可以创建和删除virtual hosts
可以查看,创建和删除users
查看创建permisssions
关闭所有用户的connections
https://gitee.com/zwtgit/rabbit-mq
1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.2</version>
</dependency>
4:启动rabbitmq-server服务
systemctl start rabbitmq-server
或者
docker start myrabbit
5:定义生产者
package com.zwt.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author: ML李嘉图
* @description: Producer 简单队列生产者
* @Date : 2021/3/2
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性,本机的地址,默认端口5679
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,Ml李嘉图!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
6:定义消费者
package com.zwt.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author: ML李嘉图
* @description: Producer 简单队列生产者
* @Date : 2021/3/2
*/
// 简单模式----消费者
public class Consumer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协议基础上构建新型的协议规范,rabbitmq遵循的是amqp
// 协议遵循 ip port
// 1、创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 通过连接工厂设置账号密码
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 虚拟访问节点
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、创建连接connection
connection = connectionFactory.newConnection("生产者");
// 3、通过连接获取通道channel
channel = connection.createChannel();
// 4、 通过创建交换机、声明队列,绑定关系,路由key,发送消息和接受消息
channel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受消息失败了。。。。");
}
});
System.out.println("开始接受消息");
// 进行阻断,接受消息不关闭
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、关闭通道(先关通道,再关连接)
if(channel!=null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、关闭连接
if(connection!=null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
7:观察消息的在rabbitmq-server服务中的过程
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。
核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
参考官网:https://www.rabbitmq.com/getstarted.html
简单模式 Simple
工作模式 Work
发布订阅模式
路由模式
主题Topic模式
参数模式
rabbitmq发送消息一定有一个交换机
发布订阅模式
代码都是与简单模式类似,只是调用的函数以及参数改变了
发布订阅模式:大概就是咱们的交换机选定fanout模式,binding你自己需要使用的队列,然后就可以通过这个交换机发布消息了。
路由模式(Routing key)
例如:我只想发给微信用户,不发QQ用户,就是增加了 过滤的条件
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
public void makeOrder(){
// 1 :保存订单
orderService.saveOrder();
// 2: 发送短信服务
messageService.sendSMS("order");//1-2 s
// 3: 发送email服务
emailService.sendEmail("order");//1-2 s
// 4: 发送APP服务
appService.sendApp("order");
}
并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间 。
public void makeOrder(){
// 1 :保存订单
orderService.saveOrder();
// 相关发送
relationMessage();
}
public void relationMessage(){
// 异步
theadpool.submit(new Callable<Object>{
public Object call(){
// 2: 发送短信服务
messageService.sendSMS("order");
}
})
// 异步
theadpool.submit(new Callable<Object>{
public Object call(){
// 3: 发送email服务
emailService.sendEmail("order");
}
})
// 异步
theadpool.submit(new Callable<Object>{
public Object call(){
// 4: 发送短信服务
appService.sendApp("order");
}
})
// 异步
theadpool.submit(new Callable<Object>{
public Object call(){
// 4: 发送短信服务
appService.sendApp("order");
}
})
}
存在问题:
1:耦合度高
2:需要自己写线程池自己维护成本太高
3:出现了消息可能会丢失,需要你自己做消息补偿
4:如何保证消息的可靠性你自己写
5:如果服务器承载不了,你需要自己去写高可用
好处
1:完全解耦,用MQ建立桥接
2:有独立的线程池和运行模型
3:出现了消息可能会丢失,MQ有持久化功能
4:如何保证消息的可靠性,死信队列和消息转移的等
5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。
注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。
因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍
public void makeOrder(){
// 1 :保存订单
orderService.saveOrder();
rabbitTemplate.convertSend("ex","2","消息内容");
}
流量削峰
要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,
把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,
在另一端平滑地将消息推送出去。
消息队列中间件主要解决应用耦合,异步消息, 流量削锋等问题。
常用消息队列系统:目前在生产环境,使用较多的消息队列有 ActiveMQ、RabbitMQ、 ZeroMQ、Kafka、MetaMQ、RocketMQ 等。
在这里,消息队列就像“水库”一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。
针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求。 分层过滤其实就是采用“漏斗”式设计来处理请求的。
还有一些场景(后面有文章补充介绍):
分布式事务的可靠消费和可靠生产
索引、缓存、静态化处理的数据同步
流量监控
日志监控(ELK)
下单、订单分发、抢票
在pom.xml中引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
在application.yml进行配置
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 47.104.141.27
port: 5672
定义订单的生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author: ML
* @description: OrderService
* @Date : 2021/3/4
*/
@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1: 定义交换机
private String exchangeName = "fanout_order_exchange";
// 2: 路由key
private String routeKey = "";
public void makeOrder(Long userId, Long productId, int num) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
// 2: 根据商品id productId 去查询商品的库存
// int numstore = productSerivce.getProductNum(productId);
// 3:判断库存是否充足
// if(num > numstore ){ return "商品库存不足..."; }
// 4: 下单逻辑
// orderService.saveOrder(order);
// 5: 下单成功要扣减库存
// 6: 下单完成以后
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ fanout
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
}
绑定关系
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : ML
* @CreateTime : 2021/9/3
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue emailQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue smsQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue weixinQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("weixin.fanout.queue", true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
public DirectExchange fanoutOrderExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("fanout_order_exchange", true, false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
public Binding bindingDirect1() {
return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
}
@Bean
public Binding bindingDirect2() {
return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
}
@Bean
public Binding bindingDirect3() {
return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
}
}
测试
import com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRabbitmqFanoutProducerApplicationTests {
@Autowired
OrderService orderService;
@Test
public void contextLoads() throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
Long userId = 100L + i;
Long productId = 10001L + i;
int num = 10;
orderService.makeOrder(userId, productId, num);
}
}
}
定义消费者
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("email-------------->" + message);
}
}
消费者 - 短信服务
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Component
public class SMSService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("sms-------------->" + message);
}
}
消费者 - XX服务
启动服务SpringbootRabbitmqFanoutConsumerApplication,查看效果
代码 + @EnableRabbit
//消费者 - XX服务
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("email-------------->" + message);
}
}
代码其实都是类似的,主要是理解原理。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章