学习RabbitMQ(一)
阅读原文时间:2023年07月09日阅读:1

消息中间件

一、简介

消息中间件就是在消息的传输过程中保存消息的容器。消息中间件再将消息从它的源中继到它的目标时充当中间人的作用。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功的传递它为止,当然,消息队列保存消息也是有期限点的。

二、消息中间件特点

1、采用异步处理模式

消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或队列)上,消息接受者则订阅或是监听该通道。一条消息可能最终转发给一个或多个消息接受者,这些接受者都无须对消息发送者做出同步回应。整个过程是异步的。比如用户消息注册,注册完毕后过段时间发送邮件或者短息

2、应用程序和应用程序调用关系为松耦合关系

发送者和接受者不必了解对方、只需要确认消息

发送者和接受者不必同时在线

比如在线交易系统为了保证数据的最终一致,在支付系统处理完成后会把支付结果放到消息中间件里通过订单系统修改订单支付状态。两个系统通过 消息中间件解耦

三、消息传递服务模型:

四、消息中间件的传输模式

1、点对点模型

点对点模型用于消息生产者和消息消费者之间点到点的通信。消息生产者将消息发送到由某个名字标识的特定消费者。这个名字实际上对于消费服务中的一个队列(Queue),在消息传递给消费者之前它被存储在这个队列中。队列消息可以放在内存中也可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。

传统的点对点消息中间件通常由消息队列服务、消息传递服务、消息队列和消息应用程序接口API组成,其典型的结构如下图所示。

特点:

a、每个消息只用一个消费者

b、发送者和接受者没有时间依赖

c、接受者确认消息接受和处理成功

2、发布-订阅模型(Pub/Sub)

发布者/订阅者模型支持向一个特定的消息主题生产消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式就好比是匿名公告板。这种模式被概况为:多个消费者可以获得消息,在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便能够消费者订阅。订阅者必须保持持续的活动状态及接收消息,除非订阅者建立了持久的订阅。在这种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

发布订阅模型特性

1)每个消息有多个订阅者
2)客户端只有订阅后才能接受到消息
如下图,当有消息更新后会同时更新订阅者,并且一个消息会有多个订阅者,如下图:

3)持久订阅和非持久订阅
如下图,如果此时持久订阅出现故障,在故障后重启则会重新发送一次消息,如果非持久订阅则不会,消息则错过,如下图:

消费者订阅后,中间件会投递给消费者,消费者确认后在返回给中间件,如下图:

发布者和订阅者是存在时间以来的,当接受者和发布者只有建立订阅关系才能收到消息。当订阅关系建立后,消息就不会丢失,不管订阅者是否都在线。
当订阅者接受了消息,必须一直在线,当只有一个订阅者时约等于点对点模式,如一个消息中间件分为多个块,如下图:

注意:
(1)发布者和订阅者有时间依赖
接受者和发布者只有建立订阅关系才能收到消息
(2)持久订阅
订阅关系建立后,消息就不会消失,不管订阅者是否都在线
(3)非持久订阅
订阅者为了接受消息,必须一直在线。
当只有一个订阅者时约等于点对点模式

五、消息中间件应用场景

1、用户注册异步处理

用户注册,成功会发生邮件或短信
当正常情况下回发生成功,如果在期间发生网络抖动或者服务意外停止,则可能需要消息超时机制,在或者重试机制,如下图:

以上模式一般是用户注册成功后,写入一条数据到mysql,在发送一条消息到MQ!

如果不用消息中间件(或者简单的做成异步发送),做成了用户提交了注册之后,成功后,就同步立即执行发送邮件和短信服务脚本(这样耗时间),这样用户体验不好时间慢!

术语: SOA

2、日志分析使用

把日志进行集中收集,用于计算PV、用户行为分析

术语:灰度发布,小流量

3、数据复制

(1)将数据从源头复制到多个目的地,一般是要求顺序或者保证因果序列

(2)用于跨机房数据传输、搜索、离线数据计算等。

术语:AOP

4、延迟消息发送和暂存

(1)把消息中间件当成可靠的消息暂存地
(2)定时进行消息投递,比如模拟用户秒杀访问,进行系统性能压测

消费者消费完后不删除消息! 这种压测方式比较真实,比一般的并发压测软件更符合真实环境!

5、消息广播

(1)缓存数据同步更新
(2)往应用推送数据
例如更新本地缓存、变更商品价格

就像很多数据都是缓存在本地的应用中的如tomcat应用,如一个数据价格缓存,当有数据更新的时候,就需要及时(而不是通过租约到期去解决) 这个时候就需要中间件,不是一个一个去通知更新

六、消息中间件分类:

1、(push)推消息模型:

消息生产者将消息发送给消息传递服务,消息传递服务又将消息推送给消息消费者。 --场景:交易,注册

2、(pull)拉消息模型:

消费者请求消息服务接收消息,消息生产者从消息中间件拉该消息。  --场景:行为分析,PV计算

3、两种类型的区别

七、metaq消息中间件(pull)

1、概述

metaq是一款完全的队列模型消息中间件,服务器使用java语言编写,可在多种软硬件平台上部署。客户端支持java/c++编程语言。单台服务器可支持1万以上各消息队列,通过扩容服务器,队列数几乎可任意横向扩展。每个队列都是持久化,长度无限(取决于磁盘空间大小),并且可从队列任意位置开始消费

官网:http://metaq.taobao.org

2、总体架构

3、metaq特点

1)生产者、服务器和消费者都可分布式

2)消息存储顺序读写

3)性能极高,吞吐量大

4)支持消息顺序

5)客户端pull,随机读,批量拉数据

6)数据迁移,扩容对用户透明

7)消费状态保存在客户端

4、metaq重要术语

1)topic:消息的主题,可理解为队列名,由用户定义并在中间件配置,producer发送消息到某个topic下,consumer从某个topic下消费消息

2)offset:消息在中间件broker上的每个分区都组织成一个文件列表,consumer拉取数据需要知道数据在文件中的offset,是绝对偏移量,中间件会将offset转化为具体文件的相对偏移量

3)broker:metaq的服务端,在消息中间件中通常称为broker

4)partition:分区,同一个topic下有多个partition,例如meta-test这个topic可分10个partition,分别有两台broker提供,那可能每台broker提供5个partition,若broker id分别为0和1,所有分区为0-0、0-1、0-2、0-3、0-4、1-0、1-1、1-2、1-3、1-4

5、metaq 主要配置

[zookeeper]
zk.zkEnable=true #是否注册到zk,默认为true
zk.zkConnect=localhost:2181 #zk的服务器列表
zk.zkSessionTimeoutMs=30000 #zk心跳超市,单位毫秒,默认30秒
zk.zkConnection TImeoutMs=30000 #zk链接超时时间,单位毫秒,默认30秒

[system]
brokerld=0 #服务器id,必须唯一,0-1024之间
serverprot=8123 #服务端口
hostName= #默认将取本机IP,如果多机网卡需要指明
dataLogPath= #日志存放路径,默认和datapath一样
datapath= #指定默认数据存储路径

daletepolicy=delete,168
#数据删除策略,默认超过7天则删除,这里是168是小时,10s表示秒,10m表示分钟,10h表示小时,默认为小时

deleteWhen=0 0 6,18 * * ?
#何时执行删除策略的cron表达式,默认是每天6:00和18:00执行

flushTxLogAtCommit=1
#事务日志的同步设置,0表示os决定,1表示每次commit都同步,2表示每隔1s同步一次;此参数严重影响性能,可根据需要的性能和可靠性权衡作出合理选择,建议设为2,有问题最多丢失1s内运行的事务,这个级别对大多数服务是可靠的;最安全的是设为1这将严重影响事务性能;而0的安全级别最低,在安全级别上1>=2>0,而在性能上0>=2>1

unflushthreshold=0
#每隔多少条消息做一次disk sync,强制将更改的数据刷入disk,默认0表示强制每次写入都sync,当为0时服务器会自动启用groupcommit技术, 将多个消息合并成一次sync来提升io性能, 经测试group commit情况下消息发送者的tps没有受到太大影响,但server负载会上升很多;若为1000表示在掉电时最多允许丢失1000条消息

unflushinterval=10000
#间隔多少ms做一次disk sync,默认10s,在掉电时最多丢失10s内发送过来的消息,不可设为0或<0

6、metaq的集群实现:

所有的broker注册到zookeeper;

客户端连接zookeeper并返回可用的broker列表,选择一个broker发送消息

7、metaq主要命令

1、启动
#./metaServer.sh start &

2、停止
#./metaServer.sh stop

3、重启
#./metaServer.sh restart &

4、重新加载
#./metaServer.sh reload &

5、查看状态
#./metaServer.sh stats

8、安装zookeeper

1、安装Java运行环境(即安装JDK)
rpm -ivh jdk-7u40-linux-x64.rpm

2、配置zookeeper环境
mkdir -p /app/zk1 /app/zk2 /app/zk3
cp zookeeper-3.4.6.tar.gz /app/zk1
cd /app/zk1
tar xf zookeeper-3.4.6.tar.gz
mv zookeeper-3.4.6 zookeeper

3、配置zookeeper
cd zookeeper
mkdir dataDir dataLogDir (dataDir用于存放数据文件,dataLogDir用于存放日志文件)
echo 1 > dataDir/myid
mv conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg
12 dataDir=/app/zk1/zookeeper/dataDir
13 dataLogDir=/app/zk1/zookeeper/dataLogDir
16 server.1=127.0.0.1:8880:7770
17 server.2=127.0.0.1:8881:7771
18 server.3=127.0.0.1:8882:7772
###############################
cd /app/zk1
cp -a zookeeper zk2
cp -a zookeeper zk3
echo 2 > /app/zk2/zookeeper/dataDir/myid
echo 3 > /app/zk3/zookeeper/dataDir/myid
vim /app/zk2/zookeeper/conf/zoo.cfg
12 dataDir=/app/zk2/zookeeper/dataDir
13 dataLogDir=/app/zk2/zookeeper/dataLogDir
15 clientPort=2182
vim /app/zk3/zookeeper/conf/zoo.cfg
12 dataDir=/app/zk3/zookeeper/dataDir
13 dataLogDir=/app/zk3/zookeeper/dataLogDir
15 clientPort=2183
##############################

3、启动zookeeper并验证
./app/zk1/zookeeper/bin/zkServer.sh start
./app/zk2/zookeeper/bin/zkServer.sh start
./app/zk3/zookeeper/bin/zkServer.sh start
验证
./app/zk1/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
./app/zk1/zookeeper/bin/zkCli.sh -server 127.0.0.1:2182
./app/zk1/zookeeper/bin/zkCli.sh -server 127.0.0.1:2183
进入ls 查看,quit退出

9、安装多实例 metaq

1、创建目录
mkdir -p /app/metaq1 /app/metaq2
tar xf metaq-server-1.4.6.2.tar.gz -C /app/metaq1
tar xf metaq-server-1.4.6.2.tar.gz -C /app/metaq1
#######################################

2、配置metaq1
cd /app/metaq1/taobao/metamorphosis-server-wrapper/
vim conf/server.ini #配置文件

[system]
brokerId=0
serverPort=8123
dashboardHttpPort=8120

[zookeeper]
zk.zkConnect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

[topic=xl] #自定义的消息xl的队列
##################################

3、配置metaq2
cd /app/metaq2/taobao/metamorphosis-server-wrapper/
vim conf/server.ini #配置文件

[system]
brokerId=1
serverPort=8124
dashboardHttpPort=8121

[zookeeper]
zk.zkConnect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

[topic=xl] #自定义的消息xl的队列

vim bin/metaServer.sh 启动脚本
PID_FILE="$PID_DIR/.run1.pid" 看环境,可以不做

vim bin/env.sh
export JMX_PORT=9124 默认是9123端口
#################################################

4、启动
./app/metaq1/taobao/metamorphosis-server-wrapper/bin/metaServer.sh start & #实例一
./app/metaq1/taobao/metamorphosis-server-wrapper/bin/metaServer.sh start & #实例二
###################################
5、查看状态
./app/zk1/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 #登陆
ls /meta/brokers/ids #有两个broker就表面集群部署成功
get /meta/brokers/ids/1

八、RabbitMQ

1、概述

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议

RabbitMQ是流行的开源消息队列系统,用erlang语言开发

RabbitMQ是AMQP(高级消息队列协议)的标准实现

官网:http://www.rabbitmq.com/

在AMQP协议标准基础上完整的,可复用的企业消息系统,遵循Mozilla Public License开源协议,采用Erlang实现的工业级的消息队列MQ服务器
AMQP高级消息队列协议,是一个一步消息传递所使用的应用层协议规范,作为线路层协议,而不是API,例如JMS,AMQP客户端能够无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件MOM系统,例如发布/订阅队列,没有作为基本元素实现,反而通过发送简化的AMQ实体,用户被赋予 了构建这些实体的能力,这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级,AMQP模型,这个模型同意了消息模式,例如之前提到的发布订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由

2、RabbitMQ整体架构,如下图:

3、运行原理图

RabbitMQ核心组件分别是exchange和queue,如下图:

4、RabbitMQ术语

1)sever(broker):接受客户端连接,实现AMQP消息队列路由功能的进程

2)Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange 和Quece,但是权限控制的最小粒度是Virtual Host

3)Exchang:接受生产者发送的消息,并根据Binding规则江消息路由给服务器中的队列,Exchange Type 决定了Exchange路由消息的行为,例如:在Rabbitmq中,

Exchange Type有direct ,fanout和topic三种,不用类型的exchange路由的行为是不一样的

4)Message Queue:消息队列,用于存储还未被消费者消费的消息

5)Message:由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化,由那个Message Queue接受,优先级是多少等,而Body是真正需要传输的APP数据

6)Binding key:所谓绑定就是将一个特定的exchange 和一个特定的queue绑定起来,绑定关键字为bindinkey

5、Exchange分类

直接式交换器类型

Direct Exchange 直接交互式处理路由键,需要将一个队列绑定到交换机上,需要该消息与一个特定的路由键完全匹配,这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键“dog",则只有被标记为“dog"的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog,如下图:

广播式交换器类型

Fanout Exchange 广播式路由键,只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,很像子网广播,每台子网内的主机都获得了一份复制的消息,fanout交换机转发消息是最快,如下图:

主题式交换器类型

Topic exchange,主题式交换器,通过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中,这种路由器类型可以被用来支持经典的发布,订阅消息传输模型,使用主题名字空间作为消息寻址模式,将消息传递给那些部分或者全部匹配主题模式的多个消费者,主题交换机类型的工资方式如下:绑定关键字用零个或多个标记构成,每一个标记之间用"."字符分隔,绑定关键字必须用这种形式明确说明,并支持通配符“”匹配一个词组,“#”零个或多个词组,因此绑定关键字“ *。stock.#" 匹配路由关键字”usd.stock“和”eur.stock.db",但是不匹配“stock.nasdaq”,如下图:

6、rabbitMQ常用配置介绍

一般情况下,rabbitMQ的默认配置就足够了,如果希望特殊配置的话,有2个途径

1)环境变量的配置文件rabbitmq-env.conf

2)配置信息的配置文件rabbitmq.config

注意,这2个文件默认是没有的,如果需要必须自己创建

1)rabbitmq-env.conf这个文件的位置是确定和不能改变的,位于:/etc/rabbitmq目录下,这个目录需要自己创建

rabbitmq_node_ip_address: #指定ip地址
rabbitmq_node_port: #指定端口号,more5672
rabbitmq_config_file: #配置文件的路径,注意配置文件后缀必须是.config
rabbitmq_log_base: #日志文件路径

2)rabbitmq.config这是一个标准的erlang配置文件,它必须符合erlang配置文件的标准。erlangtuple,结构为{key,value},key为atm类型,value为一个term,其中关键参数为:

tcp_listerners #设置rabbimq的监听端口,默认为5672
disk_free_limit #磁盘低水位线,若磁盘容量低于指定值则停止接收数据
vm_memory_high_watemark #设置内存低水位线,若低于该水位线,则开启流控机制,默认值是0.4,即内存总量的40%

7、RabbitMQ命令介绍

1、/etc/init.d/rabbitmq-server start | restart | stop | reload #服务启动、停止
2、rabbitmqctl add_vhost vhostname #创建虚拟主机
3、rabbitmqctl delete_vhost vhostname #删除虚拟主机
4、rabbitmqctl list_vhosts #遍历所有虚拟主机信息
5、rabbitmqctl add_user username password #添加用户
6、rabbitmqctl change_password username password #修改用户密码
7、rabbitmqctl set_user_tags username administrator #设置username的角色(administrator)
8、rabbitmqctl set_permissions -p vhost user ".*" ".*" ".*" #绑定权限,并且具备读写的权限
9、rabbitmqctl list_queues #显示所有队列

8、RabbitMQ安装

1、yum安装
yum install -y rabbitmq-server

2、安装插件
/usr/lib/rabbitmq/bin/rabbitmq-plugins list #列出所有的插件
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management #启用web监控插件

3、启动
/etc/init.d/rabbitmq-server restart

4、验证
netstat -lnt |grep 15672
ps -ef |grep rabbit

5、配置
默认不需要配置,如果有需要/etc/rabbitmq/ 下可以创建

6、实例
rabbitmqctl add_vhost test #新添加vhost;/为默认就有t
rabbitmqctl list_vhosts #查看所有虚拟主机信息
rabbutmqctl list_users #查看用户列表
rabbitmqctl add_user admin admin #添加用户及密码
rabbitmqctl set_permissions -p test admin ".*" ".*" ".*" #绑定权限,并且具备读写的权限
rabbitmqctl list_queues #显示队列
rabbitmqctl list_user_permissions admin #查看用户权限
/etc/init.d/rabbitmq-server restart #一定要restart

9、web管理连接

http://127.0.0.1:15672/
用户和密码:guest/guest

10、python程序模拟生产者

参考官网:http://www.rabbitmq.com/tutorials/tutorial-one-python.html

来自最简单的模型P-mq--C

cat send.py

#!/usr/bin/env python
#encoding==utf-8
#http://cuidehua.blog.51cto.com
#from http://www.rabbitmq.com/tutorials/tutorial-one-python.html
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

模拟发送100条消息到queue=hello的这个列队中

for i in {1..100};do python send.py ;done

命令查看队列:(不指定vhost 列队在默认的vhost "/"中)

rabbitmqctl list_queues -p "/"

web端查看

11、python程序模拟消费者进行消费

cat receive.py

#!/usr/bin/env python
#encoding=utf-8
#http://cuidehua.blog.51cto.com
#from http://www.rabbitmq.com/tutorials/tutorial-one-python.html

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

#定义回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

channel.basic_consume(callback,
queue='hello',
no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

执行消费,并等待messages! 直到ctrl+c 终止

python receive.py

再次查看队列

rabbitmqctl list_queues -p "/"

12、常见问题:

服务起不来? 报错:Error: unable to connect to node  rabbit@stu:  nodedown

原因是不能解析 本主机,原因是 默认作为解析的节点和 本机的hostname -s 结果一致

只要把hostname -s的结果, 加入到/etc/hosts中即可