学习RabbitMQ(四)
阅读原文时间:2023年07月08日阅读:1

1,异步处理模式

消息发送者可以发送一个消息而无需等待响应,消息发送者将消息发送到一条虚拟的通道或队列上,消息接收者则订阅或监听该通道,一条消息可能最终转发给一个或多个消息接受者,这些接受者都无需对消息发送者做出同步回应,整个过程是异步的,如:用户注册

2,应用程序和应用程序调用关系为松耦合关系,发送者和接受者不必了解对方,只需要确认消息,发送者和接受者不必同时在线

比如在线交易系统为了保证数据一致性,在支付系统处理完成后会把支付结果放到消息中间件里通知订单系统修改订单支付状态,两个系统通过消息中间件解耦

消息传递服务模型,如下图:

1,点对点模型PTP

点对点模型用于消息生产者和消息消费者之间点对点的通信,消息生产者将消息发送到由某个名字标示的特定消费者,此名字实际上对应消息服务中的一个队列queue,在消息传递给消费者之前他被存储在这个队列中,队列消息可以放在内存中也可以是持久的,以保障消息服务出现故障时仍然能够传递消息

1.1 模型特性:

每个消息只有一个消费者,发送者和消费者没有时间依赖,接受者确认消息接受和处理成功,如下图:

2,发布订阅模型PUB/SUB

发布者/订阅者模型支持向一个特定的消息主题生产消息,0或多个订阅者可能对接收来自特定消息主题的消息敢兴趣,在这种模型下,发布者和订阅者彼此不知道对方,这种模式好比匿名公告,这种模式被概括为:多个消费者可以获得消息,在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅subscription,以便能够消费者订阅。订阅者必须保持持续的活动状态用来接收消息,除非订阅者建立了持久订阅,在这种情况下,在订阅者未连接时发布的消息在订阅者重新连接时重新发布

2.1 发布订阅模型特性

1,每个消息有多个订阅者

2,客户端只有订阅后才能接受到消息

如下图,当有消息更新后会同时更新订阅者,并且一个消息会有多个订阅者,如下图:

3,持久订阅和非持久订阅

如下图,如果此时持久订阅出现故障,在故障后重启则会重新发送一次消息,如果非持久订阅则不会,消息则错过,如下图:

消费者订阅后,中间件会投递给消费者,消费者确认后在返回给中间件,如下图:

1,发布者和订阅者是存在时间以来的,当接受者和发布者只有建立订阅关系才能收到消息。当订阅关系建立后,消息就不会丢失,不管订阅者是否都在线。

当订阅者接受了消息,必须一直在线,当只有一个订阅者时约等于点对点模式,如一个消息中间件分为多个块,如下图:

异步处理

1,用户注册

用户注册,成功会发生邮件或短信

当正常情况下回发生成功,如果在期间发生网络抖动或者服务意外停止,则可能需要消息超时机制,在或者重试机制,如下图:

2,日志分析

使用案例,计算pv和用户行为分析,订阅模式,如下图:

3,数据复制

数据从源复制到多个目的地,一般要求保持数据一致性,顺序或者保证因果序列。用于跨机房数据传输,搜索,离线数据计算等,如下图:

4,延迟消息发送和暂存

把消息中间件当成可靠的消息暂存地,定时进行消息投递,比如模拟用户秒杀等做习题压测,如下图:

5,消息广播

缓存数据同步更新,应用推送数据,比如更新本地缓存,如下图:

消息中间件主要分为push和 pull

push推消息模型,消息生产者将消息发送给消息传递服务,消息传递服务又将消息推送给消息消费者

pull拉取消息模型,消费者请求消息服务接收消息,消息生产者从消息中间件拉取消息

在一般场景中,消费者拉取更容易控制,如果是生产者推送给消费者的话,需要提供更多的处理,如负载均衡,后端订阅者信息收集的集群等,显而易见,拉取则更容易控制范围,但是,拉取通常可能一次拉取成千上百万的数据,这时,数据存储设备和带宽也是必备考量的环节,#原文链接www#linu#xea#com/1506.html#,类似注册用户发送短信这种实时的信息则不适用于拉取,更偏向于推送,对比如下图:

######################################################################################

metaq使用java语言编写,可以在多种平台部署,客户端支持c++和java,单台服务可支持1w以上各消息队列,事实上测试可能在4-6千之间,可横向扩展,队列持久化,队列长度无限(取决于磁盘大小),可从队列任何位置开始消费

通常在磁盘中的二进制文件id为8位定长,通常情况下还需要有int,4个字节,数据字节,此时如果有个数据为11个字节,如下

在id,int date就看做一个消息体,如下图:

metaq特点:

生产者和服务器和消费者都可分布式,消息存储通常安装顺序写,他的性能也非常高,吞吐量达,并且支持消息顺序,客户pull,随机读,批量拉数据,数据迁移,以及扩展并且对用户同名,最后,消费状态保持在客户端

总体架构,如下图:

metaq术语

topic:消息的主题,由用户定义并在服务器端配置,producer发送消息到某个topic,consumer从某个topic下的消费消息

offset:消息在broker上的每个分区都是组织成一个文件列表,消费者拉取数据需要指定数据在文件中的偏移量,这个偏移就是所谓的offset,offset是绝对偏移量,服务器会将offset转化为具体文件的相对偏移量

broker:meta的服务器或者书服务器,在消息中间件中通常称为broker

partition(分区):同一个topic下面还分为多个分区,如meta-test这个topic我们可以分为10个分区,分别有两台服务器提供,那么可能每台服务器提供5个分区,假设服务器id分别是0和1,则所有分区为0-0,0-1,0-2,0-3,0-4,1-0,1-1,1-2,1-3,1-4

metaq主要配置

zk.zkEnable=true 是否注册到zk,默认为true

zk.zkConnect=localhost:2181 zk的服务器列表

zk.zkSessionTimeoutMs=30000 zk心跳超市,单位毫秒,默认30秒

zk.zkConnection TImeoutMs=30000 zk链接超时时间,单位毫秒,默认30秒

brokerld 服务器id,必须唯一,0-1024之间

serverprot 服务端口

hostName 默认将取本机IP,如果多机网卡需要指明

dataLogPath 日志存放路径,默认和datapath一样

datapath 指定默认数据存储路径

daletepolicy=delete,168 数据删除策略,默认超过7天则删除,这里是168是小时,10s表示秒,10m表示分钟,10h表示小时,默认为小时

deleteWhen 何时执行删除策略的cron表达式,默认是0 0 6,18 * * ?,也就是每天的早晚执行处理策略

deletewhen 删除策略的执行时间,cron表达式

flushTxLogAtCommit=1 事务日志的同步设置,0表示让操作系统决定,1表示每次commit都同步,2表示每隔1秒同步一次,不过会影响事务性能,可根据需要的性能和可靠性直接权衡做出一个合理的选择。通常设置为2,表示每隔1秒刷一次,也就最多丢失一秒内的运行时事务,最安全的设置为1,但是严重影响性能,而0的安全级别最低,安全级别上1>=2>0 而性能则是0>=2>1

unflushthreshold 每隔多少条消息做一次磁盘的sync,强制将更改的数据刷入磁盘,默认1000,也就是说在掉电的情况下,最多允许丢失1000条消息,可设置为0,强制每次写如都sync,在设置0的情况下,服务器会自动启用group commit技术,将多个消息合并成一次sync来提高io性能,而group commit情况下消息发送者的tps没有收到太大影响,但是服务端的负责会上升很多

unflushinterval 间隔多少毫秒定期做一个磁盘的sync,默认是10秒,也就是说服务器掉电情况下,最多丢失10秒内发送来的消息,不可设置为小于或者等于0

metaq集群:所有broker注册到zookeeper,客户端链接zookeeper并返回可用的broker列表,选择一个broker并发送消息

RabbitMQ

在AMQP协议标准基础上完整的,可复用的企业消息系统,遵循Mozilla Public License开源协议,采用Erlang实现的工业级的消息队列MQ服务器

AMQP高级消息队列协议,是一个一步消息传递所使用的应用层协议规范,作为线路层协议,而不是API,例如JMS,AMQP客户端能够无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件MOM系统,例如发布/订阅队列,没有作为基本元素实现,反而通过发送简化的AMQ实体,用户被赋予 了构建这些实体的能力,这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级,AMQP模型,这个模型同意了消息模式,例如之前提到的发布订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由

RabbitMQ整体架构,如下图:

RabbitMQ核心组件分别是exchange和queue,如下图:

RabbitMQ术语

Server(broker) 接收客户端链接,实现AMQP消息队列和路由功能的进程

Virtual Host 一个虚拟概念,类似权限控制组,一个virtual host里面可有多个exchange和queue,但是权限控制 的最小粒度是virtual host

exchange 接收生产者发送的消息,并根据binding规则将消息路由给服务器中的队列,exchangeType决定了exchnage路由消息的行为,如,在RabbitMQ中,exchangeType有Direct,fanout和topic三种,不同类型的exchange路由的行为是不一样的

message queue 消息队列,用于存储还未被消费者消费的消息

message 由header和body组成,header是由生产者添加的各种属性的集合,包括message是否被持久化,由那个message queue接受,优先级是多少等,而body是真正需要传输的app数据

bindingkey 所谓绑定就是将一个特定的exchange和一个特定的queue绑定起来,绑定关键字成为bindingkey

Exchange分类

直接式交换器类型

Direct Exchange 直接交互式处理路由键,需要将一个队列绑定到交换机上,需要该消息与一个特定的路由键完全匹配,这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键“dog",则只有被标记为“dog"的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog,如下图:

广播式交换器类型

Fanout Exchange 广播式路由键,只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,很像子网广播,每台子网内的主机都获得了一份复制的消息,fanout交换机转发消息是最快,如下图:

主题式交换器类型

Topic exchange,主题式交换器,通过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中,这种路由器类型可以被用来支持经典的发布,订阅消息传输模型,使用主题名字空间作为消息寻址模式,将消息传递给那些部分或者全部匹配主题模式的多个消费者,主题交换机类型的工资方式如下:绑定关键字用零个或多个标记构成,每一个标记之间用"."字符分隔,绑定关键字必须用这种形式明确说明,并支持通配符“”匹配一个词组,“#”零个或多个词组,因此绑定关键字“ *。stock.#" 匹配路由关键字”usd.stock“和”eur.stock.db",但是不匹配“stock.nasdaq”,如下图:

配置

配置文件存放:/etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODE_IP_ADDRESS:IP地址
RABBITMQ_NODE_PORT:端口号
RABBITMQ_CONFIG_FILE:默认路径
ABBITMQ_LOG_BASE:日志路径
rabbitmq.config是标准的erlang配置文件,必须符合erlang配置标准,erlangtuple,结构为{key,value},key为atom类型,value为一个term,其中关键参数为
tcp_listerners设置rabimq监听端口,默认5672
disk_free_limit磁盘低水位线,若磁盘容量低于指定值则停止接收数据
vm_memory_high_watermark,设置内存低水位线,则开启流控机制,默认值是0.4,即内存总量的40%

安装

[root@LinuxEA ~]# yum install epel-release
[root@LinuxEA ~]#  yum install rabbitmq-server -y
[root@LinuxEA ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins list
[ ] amqp_client                       3.3.5
[ ] cowboy                            0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap                             3.3.5-gite309de4
[ ] mochiweb                          2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0                  3.3.5
[ ] rabbitmq_auth_backend_ldap        3.3.5
[ ] rabbitmq_auth_mechanism_ssl       3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation               3.3.5
[ ] rabbitmq_federation_management    3.3.5
[ ] rabbitmq_management               3.3.5
[ ] rabbitmq_management_agent         3.3.5
[ ] rabbitmq_management_visualiser    3.3.5
[ ] rabbitmq_mqtt                     3.3.5
[ ] rabbitmq_shovel                   3.3.5
[ ] rabbitmq_shovel_management        3.3.5
[ ] rabbitmq_stomp                    3.3.5
[ ] rabbitmq_test                     3.3.5
[ ] rabbitmq_tracing                  3.3.5
[ ] rabbitmq_web_dispatch             3.3.5
[ ] rabbitmq_web_stomp                3.3.5
[ ] rabbitmq_web_stomp_examples       3.3.5
[ ] sockjs                            0.3.4-rmq3.3.5-git3132eb9
[ ] webmachine                        1.10.3-rmq3.3.5-gite9359c7
[root@LinuxEA ~]#


[root@LinuxEA ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
  mochiweb
  webmachine
  rabbitmq_web_dispatch
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
[root@LinuxEA ~]#
[root@LinuxEA ~]# systemctl restart rabbitmq-server
创建vhost


[root@LinuxEA ~]# rabbitmqctl add_vhost linuxea
Creating vhost "linuxea" ...
...done.
[root@LinuxEA ~]#


[root@LinuxEA ~]# cat LinuxEA
#!/bin/bash
###www.#linu#xea#.com
#The source address of this article:http://www.lin#uxea.com/1514.html

查看vhosts

[root@LinuxEA ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
linuxea
...done.
[root@LinuxEA ~]#

删除vhosts:rabbitmqctl delete_vhosts vhostname

添加用户名和密码:

[root@LinuxEA ~]# rabbitmqctl add_user linuxea linuxea
Creating user "linuxea" ...
...done.
[root@LinuxEA ~]#

修改用户名密码:rabbitmqctl change_password username newpassword

授权读写权限:

[root@LinuxEA ~]# rabbitmqctl set_permissions -p linuxea linuxea ".*" ".*" ".*"
Setting permissions for user "linuxea" in vhost "linuxea" ...
...done.
[root@LinuxEA ~]#

查看:-p 即可

[root@LinuxEA ~]# rabbitmqctl list_queues -p linuxea
Listing queues ...
...done.
[root@LinuxEA ~]#