MQ的三大主要作用: 应用解耦、异步提速、流量削锋
系统的耦合性越高,容错性就越低,可维护性就越低:
解耦:
如果其中一个系统服务宕机,那么系统的其他服务将也无法保证一致性,只能执行失败,可以使用 MQ 进行解耦,提升容错性和可维护性。
提升用户体验和系统吞吐量(单位时间内处理请求的数目):
用户下单请求访问订单系统需要多久才有反馈可以看出访问DB和各种相关系统需要:20 + 300 + 300 + 300 = 920ms
这样的效率算不得快,如果QPS高的话订单一多很多用户在后续排队等不到反馈,体验也不好。
用户下单请求后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。因为订单需要只需要和MQ进行交互,后续由MQ与库存等系统交互,而MQ是异步无需等待其他系统反馈吞吐量自然提升上去了。
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。可以提高系统稳定性。
也许很难给中间件一个严格的定义,但中间件应具有如下的一些特点:
(1)满足大量应用的需要
(2)运行于多种硬件和OS平台
(3)支持分布计算,提供跨网络、硬件和OS平台的透明性的应用或服务的交互
(4)支持标准的协议
(5)支持标准的接口
举例:
1,RMI(Remote Method Invocations, 远程调用)
2,Load Balancing(负载均衡,将访问负荷分散到各个服务器中)
3,Transparent Fail-over(透明的故障切换)
4,Clustering(集群,用多个小的服务器代替大型机)
5,Back-end-Integration(后端集成,用现有的、新开发的系统如何去集成遗留的系统)
6,Transaction事务(全局/局部)全局事务(分布式事务)局部事务(在同一数据库联接内的事务)
7,Dynamic Redeployment(动态重新部署,在不停止原系统的情况下,部署新的系统)
8,System Management(系统管理)
9,Threading(多线程处理)
10,Message-oriented Middleware面向消息的中间件(异步的调用编程)
11,Component Life Cycle(组件的生命周期管理)
12,Resource pooling(资源池)
13,Security(安全)
14,Caching(缓存)
主流的:
消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,你是采用底层的TCP/IP,UDP协议还是其他的自己取构建等,而这些约定成俗的规范就称之为:协议
1.语法。语法是用户数据与控制信息的结构与格式,以及数据出现的顺序。
2.语义。语义是解释控制信息每个部分的意义。它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应。
3.时序。时序是对事件发生顺序的详细说明。
而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。
面试题:为什么消息中间件不直接使用http协议呢?
AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
特性:
1:分布式事务支持。
2:消息的持久化支持。
3:高性能和高可靠的消息处理优势。
MQTT协议:(Message Queueing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。
特点:
1:轻量
2:结构简单
3:传输快,不支持事务
4:没有持久化设计。
应用场景:
1:适用于计算能力有限
2:低带宽
3:网络不稳定的场景。
Kafka协议是基于TCP/IP的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。
特点是:
1:结构简单
2:解析速度快
3:无事务支持
4:有持久化设计
是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
特点:
1:结构简单
2:解析速度快
3:支持事务和持久化设计。
MQ消息队列有如下几个角色
1:生产者
2:存储消息
3:消费者
那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。
所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。
集群模式1 - Master-slave主从共享数据的部署方式
集群模式2 - Master- slave主从同步部署方式
集群模式3 - 多主集群同步部署模式
集群模式4 - 多主集群转发部署模式
集群模式5 Master-slave与Breoker-cluster组合的方案
反正终归三句话:
1:要么消息共享,
2:要么消息同步
3:要么元数据共享
所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。
如何保证中间件消息的可靠性呢?可以从两个方面考虑:
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2:消息的存储可靠:通过持久化来保证消息的可靠性。
阿里云Docker安装RabbitMq:
(1)yum 包更新到最新
> yum update
(2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
> yum install -y yum-utils device-mapper-persistent-data lvm2
(3)设置yum源为阿里云
> yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
(4)安装docker
> yum install docker-ce -y
(5)安装后查看docker版本
> docker -v
(6) 安装加速镜像
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
docker启动命令
# 启动docker:
systemctl start docker
# 停止docker:
systemctl stop docker
# 重启docker:
systemctl restart docker
# 查看docker状态:
systemctl status docker
# 开机启动:
systemctl enable docker
systemctl unenable docker
# 查看docker概要信息
docker info
# 查看docker帮助文档
docker --help
安装
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
启动Docker中的RabbitMQ:
查看容器状态
查看当前正在运行的容器信息,包括容器ID(container port),镜像名,容器名,端口信息,状态等
docker ps
可用参数: - -a : 显示所有容器(默认显示最近曾经运行过的) - -n [num] : 显示最近创建过的num个容器 - -q : 只显示容器ID
从下图测试结果也可以看出 名为my-rabbitMQ 容器的Port信息中有服务器本地45672端口到该容器15672端口的映射
查看容器的端口映射
查看指定容器的端口映射情况
docker port [containerID | containerName]
容器的启动/停止/重启
# 启动指定容器
docker start [containerID | containerName]
# 停止指定容器
docker stop [containerID | containerName]
# 重启指定容器
docker restart [containerID | containerName]
删除容器
# 删除指定容器
docker rm [containerID | containerName]
可用参数:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
package com.gton.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @description: 生产者
* @author: GuoTong
* @createTime: 2021-10-29 17:49
* @since JDK 1.8 OR 11
**/
public class Producer {
public static void main(String[] args) {
//1.RabbitMq使用的是amqp协议,创建连接工程
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
//定义消息队列的名字
String msgQueryName = "queue1";
channel.queueDeclare(msgQueryName, false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,学相伴!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
package com.gton.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @description:消费者,去消息队列消费
* @author: GuoTong
* @createTime: 2021-10-29 18:01
* @since JDK 1.8 OR 11
**/
public class Consumer {
public static void main(String[] args) {
//1.RabbitMq使用的是amqp协议,创建连接工程
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
//定义消息队列的名字
String msgQueryName = "queue1";
//消费basicConsume:String var1, boolean var2, DeliverCallback var3, CancelCallback var4
channel.basicConsume(msgQueryName,
true,
(s, delivery) -> System.out.println("消费者从消息队列获取的信息:" +
new String(delivery.getBody(), "UTF-8")),
s -> System.out.println("消费者消费消息失败了"));
System.out.println("接收成功");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("连接消息队列操作出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者
RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
代码细讲:Direct模式
package com.gton.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @description: 完整版
* @author: GuoTong
* @createTime: 2021-11-08 16:40
* @since JDK 1.8 OR 11
**/
@SuppressWarnings("all")
public class NewProducer {
public static void main(String[] args) {
//1.RabbitMq使用的是amqp协议,创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
//定义消息队列的名称
String msgQueryName1 = "queue1";
String msgQueryName2 = "queue2";
String msgQueryName3 = "queue3";
//交换机的名字
String exchangeName = "direct_message_exchange";
//类型
String exchangeTyppe = "direct";
//交换机声明:交换机名字,交换机类型,是否持久化
channel.exchangeDeclare(exchangeName,exchangeTyppe,true);
/* 声明队列
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare(msgQueryName1,true,false,false,null);
channel.queueDeclare(msgQueryName2,true,false,false,null);
channel.queueDeclare(msgQueryName3,true,false,false,null);
//交换机绑定队列和交换机的关系
/**
* Description:
* 参数一:队列名
* 参数二:交换机名
* 参数三:routingKey
*/
channel.queueBind(msgQueryName1,exchangeName,"routingKey1");
channel.queueBind(msgQueryName2,exchangeName,"routingKey2");
channel.queueBind(msgQueryName3,exchangeName,"routingKey1");
// 6: 准备发送消息的内容
String message = "你好,RabbitMq!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange 写空字符串不代表没交换机:交换机一定会有,不写就就有一个默认的
// @params2: 队列名称/routing :往这里面去发送信息
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchangeName, "routingKey1", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
package com.gton.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @description: 完整版
* @author: GuoTong
* @createTime: 2021-11-08 16:41
* @since JDK 1.8 OR 11
**/
public class NewConsumer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
//定义消息队列的名字
String msgQueryName = "queue1";
//消费basicConsume:String var1, boolean var2, DeliverCallback var3, CancelCallback var4
channel.basicConsume(msgQueryName,
true,
(s, delivery) -> System.out.println("消费者从queue1消息队列获取的信息:" +
new String(delivery.getBody(), "UTF-8")),
s -> System.out.println("消费者消费消息失败了"));
msgQueryName = "queue2";
channel.basicConsume(msgQueryName,
true,
(s, delivery) -> System.out.println("消费者从queue2消息队列获取的信息:" +
new String(delivery.getBody(), "UTF-8")),
s -> System.out.println("消费者消费消息失败了"));
msgQueryName = "queue3";
channel.basicConsume(msgQueryName,
true,
(s, delivery) -> System.out.println("消费者从queue3消息队列获取的信息:" +
new String(delivery.getBody(), "UTF-8")),
s -> System.out.println("消费者消费消息失败了"));
//前面的都是异步:所以这个接收成功可能先执行
System.out.println("接收成功");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("连接消息队列操作出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
手动应答:
特点:由于消息接收者处理消息的能力不同,存在处理快慢的问题,我们就需要能者多劳,处理快的多处理,处理慢的少处理;
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//用户下单为例||把信息发送到MQ的消息队列中
public void makeOrder(String userID, String productID, int number) {
//模拟用户下单生成的订单ID:UUID
String orderID = UUID.randomUUID().toString().replaceAll("-", "");
//通过消息队列完成消息分发(交换机,路由key|队列名,消息内容)
String exchangeName = "fanout_order_exchange";
String routingKey = "";
String msgText = "生成的订单ID:" + orderID;
rabbitTemplate.convertAndSend(exchangeName, routingKey, msgText);
}
}
/**
* @description: 声明fanout模式交换机
* @author: GuoTong
* @createTime: 2021-11-08 18:28
* @since JDK 1.8 OR 11
**/
@Configuration
public class RabbitConfig {
//声明交换机类型
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}
//声明队列
@Bean
public Queue smsQueue() {
//名字和是否持久队列
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue emailQueue() {
//名字和是否持久队列
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue phoneQueue() {
//名字和是否持久队列
return new Queue("phone.fanout.queue", true);
}
//绑定关系交换机与队列
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding phoneBinding() {
return BindingBuilder.bind(phoneQueue()).to(fanoutExchange());
}
}
使用一个监听器,监听一个就消费一个队列:消费多个可以重写多个
@RabbitListener(queues = {
"email.fanout.queue"
})
@Service
public class FanoutConsumerEmail {
@RabbitHandler
public void smsMessageReadByMqQueue(String msg) {
System.out.println("消费者消费了MQ的队列email.fanout.queue的消息-》" + msg);
}
}
设置消息的过期时间,过期了会进入死信队列
//设置消息的过期时间
MessagePostProcessor messagePostProcessor = message -> {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
};
rabbitTemplate.convertAndSend(exchangeName,routingKey2,msgText,messagePostProcessor);
设置队列的过期时间
//声明队列
@Bean
public Queue smsDirectQueue() {
//设置队列的过期时间||时间到了就自动消费移出
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5);//单位秒
//名字和是否持久队列
return new Queue("sms.direct.queue", true, false, false, args);
}
死信队列:消息过期进入死信队列
@Bean
public Queue smsDirectQueue() {
//设置队列的过期时间||时间到了就自动消费移出
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5);//单位秒
//设置过期数据迁移到死信队列
args.put("x-dead-letter-exchange","dead_direct_exchange");
args.put("x-dead-letter-routing-key","dead_routing_key");
//名字和是否持久队列
return new Queue("sms.direct.queue", true, false, false, args);
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章