rabbitmq安装部署和常用命令
阅读原文时间:2023年08月25日阅读:1

python操作rabbitmq

rabbitmq实现可以使用java或者springboot的封装方法,自己创建实现,也可以使用中间件实现,相对于自建,使用rabbitmq应用场景及使用更系统安全。本文具体介绍rabbitmq中间件部署。

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(大数据),MetaMQ,RocketMQ

在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景

以下本人linux部署步骤:

配置相关依赖环境

##yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget
yum install -y make gcc gcc-c++ m4 openssl openssl-devel ncurses-devel unixODBC unixODBC-devel java java-devel socat

下载安装包(本人rabbitmq要求的版本是22以上)

cd /home/erlang
wget http://www.erlang.org/download/otp_src_23.3.tar.gz

解压并安装

tar -xzvf otp_src_23.3.tar.gz
cd otp_src_23.3
./configure --prefix=/usr/local/erlang
make && make install

配置环境

vim /etc/profile

###为文件添加值start:
ERL_PATH=/home/erlang/otp_src_23.3/bin
PATH=$ERL_PATH:$PATH
###end

source /etc/profile 使配置生效,在shell中使用

查看安装情况

erl -version 查看版本号
#启动erlang命令
./bin/erl

rabbitmq官网下载地址:https://www.rabbitmq.com/ 下载压缩包(注意下载linux版的,类似:rabbitmq-server-3.9.6-1.el7.noarch.rpm

)本人下载的 centos7相应版本:在window机中下载相关包上传到linux机中

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-3.8.16-1.el7.noarch.rpm

rpm包安装后没具象文件位置集,相关文件应该是融入主文件里其他位置中,使用模糊查询,查看安装位置,如: rpm -aq rabbitmq*

安装rabbitmq

##安装命令
rpm -Uvh rabbitmq-server-3.9.6-1.el7.noarch.rpm
##若报错:erlang >= 23.2 被 rabbitmq-server-3.9.6-1.el7.noarch 需要;可使用如下命令:(nodeps是忽视依赖关系,可不加)
rpm -ivh --nodeps rabbitmq-server-3.8.16-1.el7.noarch --force --nodeps

启动rabbitmq

service rabbitmq-server start

配置用户及WEBUI插件

##启用插件:
rabbitmq-plugins enable rabbitmq_management
##添加用户:
rabbitmqctl add_user admin admin
##设置用户tag:
rabbitmqctl set_user_tags admin administrator
##赋予用户默认vhost的全部操作权限:
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

##开机自动启动
chkconfig rabbitmq-server on

找不到。启动程序找不到erl命令,那么添加erlang安装的bin目录,我这里是编译安装的,目录如下。

vim /usr/lib/rabbitmq/bin/rabbitmq-server

在set -e 下面加两行

#erlang
export PATH=$PATH:/opt/otp_src_23.3/bin

无法访问页面,查看message日志,发现命令没有发现,rabbitmq失败

查看位置

在环境文件前面也加上他的环境变量

这下重启没有报错了

添加admin用户

http://localhost:15672 登录,无法直接使用远程ip登录访问,需要进行配置,在配置之前先创建一个admin账号,并进行授权:

查看用户 :rabbitmqctl list_users

创建一个admin用户:rabbitmqctl add_user admin admin

用户授权 :rabbitmqctl set_user_tags admin administrator

#  rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"  #这条命令好像不用执行就可以

创建用户之后需要进行配置,若使用rpm安装,则修改配置/etc/rabbitmq/rabbitmq.config,配置内容大致如下:

[
{rabbit,
[%%
%% Network Connectivity
%% ====================
%%
%% By default, RabbitMQ will listen on all interfaces, using
%% the standard (reserved) AMQP port.
%%
{tcp_listeners, [5672]},
{loopback_users, ["admin"]}
]}
].

最后面那个点必填的

配置之后,如果rabbitmq无法正常启动,出现错误:

init terminating in do_boot ()

Crash dump is being written to: erl_crash.dump…done

此时有可能是配置文件配置有问题,确保配置文件没问题之后,重启即可。

可以看到,没有那个配置文件,创建并写上面的配置,然后重启rabbitmq,可以看到rabbitmq服务状态中,显示使用这个新建的配置

再次查看用户,可以看到多了个admin用户

开启管理页面

需要开启15672 web管理页面端口,需要执行命令 ,不然页面无法访问,这个端口没有监听

rabbitmq-plugins enable rabbitmq_management

访问Rabbitmq http://xxx.xxx.xxx.xxx:15672/

默认账号密码:guest guest 只能在本地访问

使用新添用户:admin admin

访问即可,创建成功:

admin admin 进入的

1、可以查看服务状态:

[root@localhost ~]# rabbitmqctl status

[root@mcw14 opt]# rabbitmqctl status
Status of node rabbit@mcw14 …
Runtime

OS PID: 114466
OS: Linux
Uptime (seconds): 1512
Is under maintenance?: false
RabbitMQ version: 3.8.16
Node name: rabbit@mcw14
Erlang configuration: Erlang/OTP 23 [erts-11.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
Erlang processes: 371 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:

* rabbitmq_management
* amqp_client
* rabbitmq_web_dispatch
* cowboy
* cowlib
* rabbitmq_management_agent

Data directory

Node data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14
Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14/quorum/rabbit@mcw14

Config files

* /etc/rabbitmq/rabbitmq.config

Log file(s)

* /var/log/rabbitmq/rabbit@mcw14.log
* /var/log/rabbitmq/rabbit@mcw14_upgrade.log

Alarms

(none)

Memory

Total memory used: 0.0906 gb
Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 1.4708 gb

code: 0.0282 gb (28.15 %)
other_proc: 0.0273 gb (27.19 %)
allocated_unused: 0.0202 gb (20.11 %)
other_system: 0.0124 gb (12.36 %)
plugins: 0.006 gb (5.97 %)
other_ets: 0.0033 gb (3.26 %)
atom: 0.0015 gb (1.45 %)
binary: 0.0011 gb (1.1 %)
mgmt_db: 0.0002 gb (0.21 %)
mnesia: 0.0001 gb (0.09 %)
metrics: 0.0001 gb (0.07 %)
msg_index: 0.0 gb (0.03 %)
quorum_ets: 0.0 gb (0.01 %)
connection_other: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)
reserved_unallocated: 0.0 gb (0.0 %)

File Descriptors

Total: 2, limit: 32671
Sockets: 0, limit: 29401

Free Disk Space

Low free disk space watermark: 0.05 gb
Free disk space: 16.7942 gb

Totals

Connection count: 0
Queue count: 0
Virtual host count: 1

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API
[root@mcw14 opt]#

命令执行结果

2、重启服务:

[root@localhost ~]# service rabbitmq-server restart

Restarting rabbitmq-server (via systemctl):

[ 确定 ]

3、关闭服务

[root@localhost ~]# service rabbitmq-server stop

4、重置服务

[root@localhost ~]# service rabbitmq-server reset

force_reset

强制RabbitMQ node还原到最初状态.

不同于reset , force_reset 命令会无条件地重设node,不论当前管理数据库的状态和集群配置是什么。它只能在数据库或集群配置已损坏的情况下才可使用。

执行reset和force_reset之前,必须停止RabbitMQ application

将RabbitMQ node还原到最初状态.包括从所在群集中删除此node,从管理数据库中删除所有配置数据,如已配置的用户和虚拟主机,以及删除所有持久化消息.

角色及权限

1.RabbitMQ的用户角色分类:

none、management、policymaker、monitoring、administrator

2.RabbitMQ各类角色描述:

none

不能访问 management plugin

management

用户可以通过AMQP做的任何事外加:

列出自己可以通过AMQP登入的virtual hosts

查看自己的virtual hosts中的queues, exchanges 和 bindings

查看和关闭自己的channels 和 connections

查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。

policymaker

management可以做的任何事外加:

查看、创建和删除自己的virtual hosts所属的policies和parameters

monitoring

management可以做的任何事外加:

列出所有virtual hosts,包括他们不能登录的virtual hosts

查看其他用户的connections和channels

查看节点级别的数据如clustering和memory使用情况

查看真正的关于所有virtual hosts的全局的统计信息

administrator

policymaker和monitoring可以做的任何事外加:

创建和删除virtual hosts

查看、创建和删除users

查看创建和删除permissions

关闭其他用户的connections

3.创建用户并设置角色

可以创建管理员用户,负责整个MQ的运维;可以创建RabbitMQ监控用户,负责整个MQ的监控;可以创建某个项目的专用用户,只能访问项目自己的virtual hosts

服务管理

启动: service rabbitmq-server start 或 rabbitmq-service start
关闭: service rabbitmq-server stop 或 rabbitmq-service stop
重启: service rabbitmq-server restart
状态: rabbitmqctl status

[root@mcw14 ~]# rabbitmq-service stop
-bash: rabbitmq-service: command not found
[root@mcw14 ~]# service rabbitmq-server restart
Redirecting to /bin/systemctl restart rabbitmq-server.service
[root@mcw14 ~]# rabbitmqctl status
Status of node rabbit@mcw14 …
Runtime

OS PID: 81996
OS: Linux
Uptime (seconds): 28
Is under maintenance?: false
RabbitMQ version: 3.8.16
Node name: rabbit@mcw14
Erlang configuration: Erlang/OTP 23 [erts-11.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
Erlang processes: 367 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:

* rabbitmq_management
* amqp_client
* rabbitmq_web_dispatch
* cowboy
* cowlib
* rabbitmq_management_agent

Data directory

Node data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14
Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14/quorum/rabbit@mcw14

Config files

* /etc/rabbitmq/rabbitmq.config

Log file(s)

* /var/log/rabbitmq/rabbit@mcw14.log
* /var/log/rabbitmq/rabbit@mcw14_upgrade.log

Alarms

(none)

Memory

Total memory used: 0.1046 gb
Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 1.4708 gb

other_proc: 0.0357 gb (34.15 %)
code: 0.0282 gb (27.01 %)
other_system: 0.0124 gb (11.84 %)
allocated_unused: 0.0105 gb (10.05 %)
reserved_unallocated: 0.0069 gb (6.62 %)
plugins: 0.0055 gb (5.27 %)
other_ets: 0.0033 gb (3.12 %)
atom: 0.0015 gb (1.4 %)
binary: 0.0002 gb (0.22 %)
mgmt_db: 0.0002 gb (0.15 %)
mnesia: 0.0001 gb (0.09 %)
metrics: 0.0001 gb (0.06 %)
msg_index: 0.0 gb (0.03 %)
quorum_ets: 0.0 gb (0.01 %)
connection_other: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)

File Descriptors

Total: 2, limit: 32671
Sockets: 0, limit: 29401

Free Disk Space

Low free disk space watermark: 0.05 gb
Free disk space: 16.5811 gb

Totals

Connection count: 0
Queue count: 0
Virtual host count: 1

Listeners

Interface: [::], port: 15672, protocol: http, purpose: HTTP API
Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
[root@mcw14 ~]#

rabbitmqctl status

或者systemctl管理

新增账号: rabbitmqctl add_user username password
删除用户: rabbitmqctl delete_user username
所有用户: rabbitmqctl list_users
修改密码: rabbitmqctl change_password username newpassword
清除密码: rabbitmqctl clear_password {userName}

admin登录

创建用户和查看用户:

rabbitmqctl add_user machangwei  123456

页面上面也可以查看到这个用户

刚刚创建的用户没有授权,不是管理用户,上面两个账号是管理用户,所以可以登录,好像是授权管理用户才能登录管理页面

根据下面的角色管理,给账号赋予administrator角色后,就i你登录到管理页面了

修改machangwei密码:

rabbitmqctl change_password machangwei 12

再看已经登录的页面,提示登录失败

刷新页面就这样了

使用新密码登录

接下来演示清除密码

清除密码之后

页面又退出登录了

admin用户登录查看,machangwei账号是没有密码了的

重新加上密码,直接修改密码就可以

清除了密码,跟你查看用户没有关系,照样是查到的

清除密码必须接账号

角色管理

用户角色分为5中类型:
  none:无任何角色。新创建的用户的角色默认为 none。
  management:可以访问web管理页面。
  policymaker: 包含managerment所有权限,并且可以管理策略(Policy)和参数(Parameter)
  monitoring: 包含management所有权限,并且可以看到所有链接、信道及节点相关的信息
  administartor:包含monitoring所有权限,并且可以管理用户、虚拟机、权限、策略、参数等。(最高权限)
设置用户角色: rabbitmqctl set_user_tags zhaojigang administrator
设置多个角色: rabbitmqctl set_user_tags hncscwc monitoring policymaker
查看用户角色: rabbitmqctl list_users

设置上面创建的machangwei为管理用户

页面上查看machangwei账号拥有的角色。

继续执行命令,设置machangwei账号有多个角色

页面上查看,可以看到,它这个是设置标签,每次都是重新设置,会将之前的角色清空,换成本次设置的角色,这里不是新增,这点需要注意

我们再加上管理角色,

这下machangwei账号有登录页面的权限了,查看页面,显示是machangwei登录。

命令行查看用户,也可以看到用户有哪些角色

Vhost管理

所有虚拟主机: rabbitmqctl list_vhosts
添加虚拟主机: rabbitmqctl add_vhost vhostname
删除虚拟主机: rabbitmqctl delete_vhost vhostname

查看虚拟主机

页面上面查看下虚拟机主机

添加一个虚拟主机

可以看到刚刚创建的虚拟主机跟账号没有绑定,名字也能看到页面和命令行看到的一样

权限管理

命令格式如下:rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
查询所有权限:rabbitmqctl list_permissions  [-p  VHostPath]
查看用户权限:rabbitmqctl list_user_permissions username
清除用户权限:rabbitmqctl clear_permissions [-p VHostPath] username

rabbitmqctl set_permissions   -p mcw_vhost1  machangwei   /etc/rabbitmq/rabbitmq.config  read     [-p vhost] {user} {conf} {write} {read}

rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}

表示设置用户权限。 {vhost} 表示待授权用户访问的vhost名称,默认为 "/"; {user} 表示待授权反问特定vhost的用户名称; {conf}表示待授权用户的配置权限,是一个匹配资源名称的正则表达式; {write} 表示待授权用户的写权限,是一个匹配资源名称的正则表达式; {read}表示待授权用户的读权限,是一个资源名称的正则表达式。

rabbitmqctl set_permissions -p / admin "^mip-.*" ".*" ".*"

例如上面例子,表示授权给用户 "admin" 具有所有资源名称以 "mip-" 开头的 配置权限;所有资源的写权限和读权限。

设置相关 vhost 权限案例

rabbitmqctl add_vhost /myhost # 添加 vhost
rabbitmqctl add_user me me123 # 设置用户和密码
rabbitmqctl set_permissions -p /myhost me ".*" ".*" ".*" # vhost 设置权限

下面查看所有用户权限,查看用户权限:

查看,me这个没有添加角色,列出权限的时候没有看到它,指定vhost的列出权限,就看到me了。也就是/myhost目前只有这个用户。而权限,是分为对vhost的配置权限,写权限,读权限。配置权限那里,可以用以什么开头的权限,这种来通配,就像之前在redis作为celery的bakend一样,写入的数据是以指定字符串开头,这样来区分消息,这里,不清楚是不是这样的作用。

页面显示如下

清除用户的vhost权限

没有了

其它几个页面

这里有vhost

也可以页面上添加用户

也可以页面上添加vhost

查看插件

rabbitmq-plugins list

如下,之前开启了管理页面的插件,才能通过页面访问rabbitmq,现在可以看到,前面是有标记的,也就是没有标记的应该就是没有开通,而且标记不同,大写的标记,应该是插件的服务端程序吧。

[root@mcw14 ~]# rabbitmq-plugins list
Listing plugins with pattern ".*" …
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@mcw14
|/
[ ] rabbitmq_amqp1_0 3.8.16
[ ] rabbitmq_auth_backend_cache 3.8.16
[ ] rabbitmq_auth_backend_http 3.8.16
[ ] rabbitmq_auth_backend_ldap 3.8.16
[ ] rabbitmq_auth_backend_oauth2 3.8.16
[ ] rabbitmq_auth_mechanism_ssl 3.8.16
[ ] rabbitmq_consistent_hash_exchange 3.8.16
[ ] rabbitmq_event_exchange 3.8.16
[ ] rabbitmq_federation 3.8.16
[ ] rabbitmq_federation_management 3.8.16
[ ] rabbitmq_jms_topic_exchange 3.8.16
[E*] rabbitmq_management 3.8.16
[e*] rabbitmq_management_agent 3.8.16
[ ] rabbitmq_mqtt 3.8.16
[ ] rabbitmq_peer_discovery_aws 3.8.16
[ ] rabbitmq_peer_discovery_common 3.8.16
[ ] rabbitmq_peer_discovery_consul 3.8.16
[ ] rabbitmq_peer_discovery_etcd 3.8.16
[ ] rabbitmq_peer_discovery_k8s 3.8.16
[ ] rabbitmq_prometheus 3.8.16
[ ] rabbitmq_random_exchange 3.8.16
[ ] rabbitmq_recent_history_exchange 3.8.16
[ ] rabbitmq_sharding 3.8.16
[ ] rabbitmq_shovel 3.8.16
[ ] rabbitmq_shovel_management 3.8.16
[ ] rabbitmq_stomp 3.8.16
[ ] rabbitmq_top 3.8.16
[ ] rabbitmq_tracing 3.8.16
[ ] rabbitmq_trust_store 3.8.16
[e*] rabbitmq_web_dispatch 3.8.16
[ ] rabbitmq_web_mqtt 3.8.16
[ ] rabbitmq_web_mqtt_examples 3.8.16
[ ] rabbitmq_web_stomp 3.8.16
[ ] rabbitmq_web_stomp_examples 3.8.16
[root@mcw14 ~]#

监控管理器

这个就是前面开启和关闭rabbitmq的管理页面的命令

rabbitmq-plugins enable rabbitmq_management #启动
rabbitmq-plugins disable rabbitmq_management #关闭

应用管理

慎用,好像是开启和停止节点用的

关闭应用:rabbitmqctl stop_app
启动应用:rabbitmqctl start_app

stop之后,就不能访问页面了,根据命令返回打印信息,好像是停止的这个节点

stop之后,程序进程还是在的,再次start一下,又能正常访问页面了。我这里只有一个node,如果有多个node,停止一个,应该是可以在页面访问看到情况的

队列管理

查看所有队列:rabbitmqctl list_queues
清除所有队列:rabbitmqctl reset #需要先执行rabbitmqctl stop_app
强制清除队列:rabbitmqctl force_reset

删除单个队列:rabbitmqctl delete_queue queue_name

目前没有队列

添加之后,访问拒绝

参考:https://blog.csdn.net/hefeng_aspnet/article/details/125865990

查看权限

有正则匹配的配置

点击设置权限,把以mip-开头的配置,给重新设置了,为有所有权限

再次添加队列

成功添加队列,点击进入队列页面

这个消息下面,我们看到了之前添加的三个键值对。

再次查询消息队列,可以看到我们刚刚创建的队列,但是消息,显示的是0,不知道是为什么

这里有个push消息,我们试一试

消息被推送了

之前添加的参数,不是消息,我们上面点击发布消息,才是真的往这个队列里面添加了消息,可以看到多了条消息,persistent应该是是否持久化的意思吧,而memory是指内存里面有条消息,但是没有持久化存储呢吧,持久化存储为0,总共一条消息

此时再去Linux上查看,可以看到消息队列mcwcountage已经多了一条消息了。

再次发布一条消息

可以看到,两条消息了

再次查看,两条消息了

点击get消息,可以看到我们刚刚推送的消息

两条消息都get一下。如果填的消息数据,大于实际 有的消息数量,那么只显示已存在的消息

点击之后,消息被清空了

再次push两个消息

移除消息,没有安装插件,那么安装一下

显示开启了两个插件在这个节点

可以看到,这两个插件已经开启了

刚刚的操作没有截图,我们将多出来的队列1删除掉

看上面,可以知道此时只有一个队列。我们进入之后,点击移动,这时候会新建mcwqueue2队列,并把mcwcountage下的所有消息移动到新的队列,有点像是备份似的,不清楚,如果下面移动时,填写的是已有的队列名称,是不是只是新增到已有的队列里面,大概率是这样子吧。

点击移动消息,跳转到展示所有消息队列的页面,可以看到新增的队列,并且两条 消息已经从旧的mcwcountage队列移走,移动到新的队列mcwqueue2里面去了。

我们再测试一下删除,这里的删除好像是删除队列

可以看到,进入队列的详情页面,点击删除,的确把该队列删除掉了,如下,已经看不到mcwqueue2队列了

测试重启消息队列服务,对没有持久化消息的影响

我们push两个消息进入消息队列

我们重启之后,可以看到消息丢失。那么重启之后是否需要将消息持久化呢,怎么做持久化呢,持久化后是否上图中persistent就是2了呢

消息如何持久化呢?

下面看下有两条消息

[root@controller ~]# rabbitmqctl list_queues
Listing queues …
q-agent-notifier-security_group-update_fanout_0134e1dc992d4f3291571f4f1150f033 0
q-l3-plugin_fanout_67ad4da1e291429b9a561a0b03734552 0
q-plugin 0
conductor_fanout_e2d9de48537d4ec09edb946f2fdb5008 0
q-agent-notifier-l2population-update_fanout_48739ab147b04dc895fe2fd340aa7eae 0
q-agent-notifier-network-delete_fanout_ed0e5abc181c4fe0bb4a802ea928a2e8 0
reply_20ac7e9ae5a349aea2ea680a5aec7d79 0
consoleauth.controller 0
q-agent-notifier-network-delete.controller 0
q-plugin.controller 0
compute_fanout_9f8cf7456e73451e8ce7d65460849817 0
reply_2455b8e5c5dc40da8650f56ae4e45174 0
q-agent-notifier-l2population-update.controller 0
cinder-volume_fanout_8dc268f1cf7d46d8bbfe4a521da8b202 0
cinder-volume.controller@lvm_fanout_b606cbf8a67c4600879b2a64a5320db4 0
q-agent-notifier-network-update 0
q-agent-notifier-port-update.controller 0
conductor_fanout_2b33be46a0d448868f6dc32678a21500 0
q-agent-notifier-port-update_fanout_2b5d1a5c039e4e2caddea9ed34185fe8 0
q-agent-notifier-port-update_fanout_ebc99b8c158c48529e653e9ceb2f7760 0
consoleauth_fanout_4964cf77ca7944f8a8d85fe555703a0c 0
q-server-resource-versions_fanout_753834a40abd430998a0ba30955f1122 0
cinder-volume.compute2@nfs_fanout_2868e267441e4e528f4d300f33282d19 0
conductor.controller 0
q-agent-notifier-l2population-update_fanout_9cae8c6abdc84202bc35ebbee373b422 0
reply_c9910c029ba1432c98ae4c3576ef7df6 0
conductor 0
q-agent-notifier-network-update.controller 0
scheduler_fanout_5eea12b3d0114d58ac0d10fc3799f0d1 0
q-agent-notifier-security_group-update_fanout_1452a407fcd54a5ab47285254b86a8d3 0
q-agent-notifier-security_group-update.compute1 0
reply_52a5090b02f5436c9e7aa541e732ef52 0
cinder-volume 0
q-agent-notifier-network-update_fanout_a4a9e02adfc748fd9a8835996e802bfa 0
cinder-volume_fanout_75d1c7f92a394bd088b7f65e121d6f9a 0
q-server-resource-versions 0
neutron-vo-Trunk-1.1_fanout_138759917974485faad26891827e7447 0
compute.compute2 0
neutron-vo-Trunk-1.1_fanout_676122d00eb34a59be324d0604c9111c 0
scheduler.controller 0
neutron-vo-SubPort-1.0_fanout_e49ddc715b1943389e7e0ddd6dfb5032 0
cinder-volume.compute2@nfs.compute2 0
dhcp_agent.controller 0
neutron-vo-Trunk-1.1.controller 0
q-agent-notifier-port-update.compute1 0
neutron-vo-SubPort-1.0.compute2 0
dhcp_agent_fanout_54b2d45e5d0d490f8d62283a9abcb0b8 0
cinder-volume.compute2@nfs 0
q-reports-plugin.controller 0
neutron-vo-SubPort-1.0_fanout_496e5f4cee6445af8a1edcb5c312abe6 0
q-reports-plugin_fanout_d4e460c745f445c8a22c39cbc15b9f92 0
q-agent-notifier-network-update.compute1 0
cinder-scheduler 0
consoleauth 0
q-reports-plugin_fanout_89d519d055464103a1ec5029a36390a8 0
dhcp_agent 0
q-agent-notifier-security_group-update.compute2 0
q-agent-notifier-l2population-update.compute1 0
q-agent-notifier-network-delete_fanout_d518f4f111774991971c7cdc5302fb0d 0
q-reports-plugin 0
q-agent-notifier-network-delete_fanout_4368adffe18e4742a12e2c4559b07985 0
neutron-vo-SubPort-1.0_fanout_2a542add26d8456396efe5ef3cb4c23f 0
cinder-volume.controller@lvm.controller 0
q-agent-notifier-l2population-update_fanout_5e726a5d09f942e1afb8b02d89cdf9fe 0
q-agent-notifier-network-update.compute2 0
reply_f3414e08cc93430a914663877436085d 0
cinder-volume.controller@lvm 0
scheduler 0
neutron-vo-SubPort-1.0 0
q-l3-plugin.controller 0
neutron-vo-Trunk-1.1.compute1 0
q-agent-notifier-l2population-update.compute2 0
neutron-vo-SubPort-1.0.controller 0
q-agent-notifier-port-update.compute2 0
reply_4961a61a307a428fb0b03043c7654f7d 0
q-agent-notifier-security_group-update.controller 0
q-agent-notifier-security_group-update 0
compute_fanout_b980ee35b47c47688d4fda1a1eb9fe34 0
cinder-scheduler_fanout_3b662fb1565b4a9f8c1224380cf8c8da 0
neutron-vo-Trunk-1.1 0
compute 0
q-agent-notifier-network-delete.compute1 0
cinder-scheduler.controller 0
compute.compute1 0
q-l3-plugin_fanout_31c332c953be4c9194085590c844f934 0
neutron-vo-Trunk-1.1_fanout_a8dca45b75d8436a9ba44f8832d04d58 0
q-agent-notifier-l2population-update 0
q-agent-notifier-network-update_fanout_84a4aa333c7e435482fc64b04d120051 0
q-server-resource-versions.controller 0
neutron-vo-SubPort-1.0.compute1 0
q-agent-notifier-port-update 0
q-agent-notifier-security_group-update_fanout_49d673aeec024d8fa36a49523cf335fd 0
q-agent-notifier-network-delete.compute2 0
q-agent-notifier-port-update_fanout_e3f806abc57744b290540a21aba6626f 0
q-l3-plugin 0
reply_80e9590155d64a2480314f86613ea532 0
neutron-vo-Trunk-1.1.compute2 0
q-agent-notifier-network-delete 0
q-agent-notifier-network-update_fanout_c60c9bcfa3eb40c7b5c025980bece3f5 0
q-plugin_fanout_a07db20790814d52ba25d914e01fe82f 0
[root@controller ~]#

查看openstack中有的消息队列

集群管理

查看集群状态: rabbitmqctl cluster_status
摘除节点: rabbitmqctl forget_cluster_node [--offline]
组成集群命令: rabbitmqctl join_cluster [--ram]
修改节点存储形式: rabbitmqctl change_cluster_node_type disc | ram
修改节点名称: rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2 newnode2] [oldnode3 newnode3…]

[root@mcw14 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@mcw14 …
Basics

Cluster name: rabbit@mcw14

Disk Nodes

rabbit@mcw14

Running Nodes

rabbit@mcw14

Versions

rabbit@mcw14: RabbitMQ 3.8.16 on Erlang 23

Maintenance status

Node: rabbit@mcw14, status: not under maintenance

Alarms

(none)

Network Partitions

(none)

Listeners

Node: rabbit@mcw14, interface: [::], port: 15672, protocol: http, purpose: HTTP API
Node: rabbit@mcw14, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mcw14, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

Feature flags

Flag: drop_unroutable_metric, state: disabled
Flag: empty_basic_get_metric, state: disabled
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
[root@mcw14 ~]#

rabbitmqctl cluster_status

看下我们部署的openstack,它的集群状态是什么样的,也就一个节点,跟上面命令访问结果不一样,那么上面可能是没有形成集群,只是一个单节点。

加入集群失败

信息查看

rabbitmqadmin list connections #查看所有连接
rabbitmqadmin show overview #概览 Overview
rabbitmqadmin list nodes #查看所有节点 Node
rabbitmqadmin list channels #查看所有通道 Channel
rabbitmqadmin list consumers #查看所有消费者 Consumer
rabbitmqadmin list exchanges #查看所有路由 Exchange
rabbitmqadmin list bindings #查看所有路由与队列的关系绑定 Binding

找到这个命令,然后加个软连接或者复制一份。软连接的话,也是要将源文件添加执行权限的,不然无法执行,没有权限。

这样就可以了,之前节点被关闭了。开启节点后,然后查看有哪些节点,可以正常看到

查看所有连接

概览

查看所有节点

查看所有通道

查看所有消费者

查看所有路由

查看所有路由与队列的关系绑定

RabbitMq生产者消费者模型

生产者(producter) 队列消息的产生者,复制生产消息,并将消息传入队列
生产者代码:

import pika
import json

credentials = pika.PlainCredentials('admin','admin')#mq用户名和密码,用于认证
#虚拟队列需要指定参数virtual_host,如果是默认的可以不填
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.24',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()# 创建一个AMQP信道

#声明队列,并设置durable为True,为了避免rabbitMq-server挂掉数据丢失,将durable设为True
channel.queue_declare(queue='1',durable=True)
for i in range(10): # 创建10个q
message = json.dumps({'OrderId':"1000%s"%i})
# exchange表示交换器,可以精确的指定消息应该发到哪个队列中,route_key设置队列的名称,body表示发送的内容
channel.basic_publish(exchange='',routing_key='1',body=message)
print(message)
connection.close()

操作前

通过pika生命一个认证用的凭证,然后用pika创建rabbitmq的块连接,再用上面的连接创建一个AMQP信道 。创建消息队列的连接时,需要指定ip,断开,虚拟主机,凭证。

然后根据上面的信道,声明一个队列,

我们可以看到,下面信道点队列声明里的queue参数值就队列的名字。这里是遍历0到9,然后打印了下消息,这里的生成的消息,是json序列化后的数据。然后将数据作为i,信道点基础发布的body参数的值。上面信道点队列声明是创建一个队列,队列名字是’1‘,下面我们用信道点基本发布,是将我们创建的消息体发送到队列中,路由_key就是指定队列名称,指定发布消息到哪个队列,消息是作为body的参数,

最后,需要将这个消息队列的连接关闭。

我们通过页面可以看到,已经创建好了这个队列,队列名字为1,并且已经通过遍历生成的10个消息,调用十次信道点基础发布方法,将这十个产生的消息发布到消息队列中

我们可以再看下,可以看到我们创建的消息的具体内容。

消费者(consumer):队列消息的接收者,扶着接收并处理消息队列中的消息

import pika
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.24',
port=5672,
virtual_host='/',
credentials=credentials
))
channel = connection.channel()
#声明消息队列,消息在这个队列中传递,如果不存在,则创建队列
channel.queue_declare(queue='1',durable=True)

定义一个回调函数来处理消息队列中消息,这里是打印出来

def callback(ch,method,properties,body):
ch.basic_ack(delivery_tag=method.delivery_tag)
print(body.decode())
#告诉rabbitmq,用callback来接收消息
channel.basic_consume('1',callback)
#开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

获取消息,创建凭证,连接,信道,然后什么一下队列。指定我们要获取哪个队列中的消息,如果没有这个队列,就会创建这个队列,存在,那么后面使用这个信道,就会从这个队列中获取数据。信道是通过rabbitmq的连接对象来生成的,连接对象中放了连接用的凭证。所以,信道点基础消费方法,指定是哪个消息队列,那么就会从这个队列中获取消息。然后传参回调函数。而回调函数中,

我们可以看到,基础消费方法里面有消息回调,就是上面我们自定义的回调函数

这个方法定义了回调函数的写法。第一个参数是信道

第二个参数是方法,第三个参数是属性,第四个是body,这些不用管,只需要按如下格式,就可以从body,做个解码,就将信道点基础消费中指定的队列中的消息,取出来了,我们是用回调函数来接收消息,当需要获取消息的时候,就需要执行信道点开始消费的方法。这里好像是遍历队列一个一个的将消息获取出来。那么怎样实现,实时监听消息,实时消费呢

RabbitMq持久化

RabbitMq持久化
MQ默认建立的临时的queue和exchange,如果不声明持久化,一旦rabbitmq挂掉,queue,exchange将会全部丢失,所以我们一般在创建queue或者exchange的时候会声明持久化
1.queue声明持久化

# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储
result = channel.queue_declare(queue = 'python-test',durable = True)

使用True

重启消息队列服务

消息队列还在,但是消息被清空了

当我改为false的时候,因为队列1已经存在,并且是Tue声明的,所以这里就报错了

我们设置为false,然后声明一个不存在的队列2

创建好了队列,并且10个消息

重启一下消息队列服务

刚刚上面创建的队列2已经不存在,这已经不是消息被清空了,而是队列直接被清除了

也就是这个Ture,是保留队列用的,持久化队列的。

channel.queue_declare(queue='2',durable=True)

2、exchange声明持久化

# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-test', durable = True)

注意:如果已存在一个非持久化的queue或exchange,执行上述代码会报错,因为当前状态不能更该queue 或 exchange存储属性,需要删除重建,如果queue和exchange中一个声明了持久化,另一个没有声明持久化,则不允许绑定

我们在1处改了,但是在2处没有修改。结果有问题。

队列2不存在,所以没有将消息放进去

而exchange这里,没有写将消息推送到声明的python-test里面,所以里面也没有消息

这次是声明的exchange,并且将消息推送到python-test里面

还是没有看到有东西呀

我们这里发布个消息,可以看到,是需要路由的

加上路由,再次执行程序

由于队列2 不存在,好像还是不行

我在这里给它bind一个路由

感觉还是没有弄明白,先放弃了

原来是如下方式呀。

首先,在python-test2里面,

给exchange绑定队列1和2

1和2目前的消息数量

我往路由1里面push一个消息

push成功

然后再看队列1里面,可以看到多了一条刚刚push的消息

接下来用程序实现,声明exchange,然后发布方法不变,发布到exchage中,因为已经绑定了两个路由了,这里指定路由key,根据路由key,可以将消息push到对应的队列中去

我们可以看到,之前是页面点击push了一条,上面程序push了十条到exchange,现在这个队列就有11条数据。可是这个exchange和队列的绑定,是我自己在页面上绑定的,这个应该不合理。以后有时间看下,怎么用程序绑定。

我们可以看到,应该是程序中缺少使用这个绑定方法吧

3、消息持久化
虽然exchange和queue都声明了持久化,但如果消息只存在内存里,rabbitmq重启后,内存里的东西还是会丢失,所以必须声明消息也是持久化,从内存转存到到硬盘

# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))

4、acknowledgement消息不丢失
消费者(consume)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息会丢失,但是也可以选择消费者处理失败时,将消息回退给rabbitmq,重新再被消费者消费,这个时候需要设置确认标识。

channel.basic_consume(callback,queue = 'python-test',
# no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
             no_ack = False)

参考链接:https://www.jianshu.com/p/cc322adf060c
https://blog.csdn.net/weixin_45144837/article/details/104335115

https://blog.csdn.net/weixin_45144837/article/details/104335115

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章