RabbitMQ的web页面介绍(三)
阅读原文时间:2021年10月14日阅读:1

一、Virtual Hosts

每一个 RabbitMQ 服务器都能创建虚拟的消息服务器,我们称之为虚拟主机 (virtual host) ,简称为vhost。每一个 vhost 本质上是一个独立的小型 RabbitMQ 服务器,拥有自己独立的队列、交换器及绑定关系等,井且它拥有自己独立的权限。vhost 就像是虚拟机与物理服务器一样,它们在各个实例间提供逻辑上的分离,为不同程序安全保密地运行数据,它既能将同一个RabbitMQ 中的众多客户区分开,又可以避免队列和交换器等命名冲突。vhost 之间是绝对隔离的,无法将 vhostl 中的交换器与 vhost2 中的队列进行绑定,这样既保证了安全性,又可以确保可移植性。如果在使用 RabbitMQ 达到一定规模的时候,建议用户对业务功能、场景进行归类区分,并为之分配独立的 vhost。

vhost可以限制最大连接数和最大队列数,并且可以设置vhost下的用户资源权限和Topic权限,具体权限见下方说明。

  • 在 Admin -> Limits 页面可以设置vhost的最大连接数和最大队列数,达到限制后,继续创建,将会报错。

  • 用户资源权限是指RabbitMQ 用户在客户端执行AMQP操作命令时,拥有对资源的操作和使用权限。权限分为三个部分: configure、write、read ,见下方表格说明。参考:http://www.rabbitmq.com/access-control.html#permissions

AMQP 0-9-1 Operation

 

configure

write

read

exchange.declare

(passive=false)

exchange

 

 

exchange.declare

(passive=true)

 

 

 

exchange.declare

(with [AE](ae.html))

exchange

exchange (AE)

exchange

exchange.delete

 

exchange

 

 

queue.declare

(passive=false)

queue

 

 

queue.declare

(passive=true)

 

 

 

queue.declare

(with [DLX](dlx.html))

queue

exchange (DLX)

queue

queue.delete

 

queue

 

 

exchange.bind

 

 

exchange (destination)

exchange (source)

exchange.unbind

 

 

exchange (destination)

exchange (source)

queue.bind

 

 

queue

exchange

queue.unbind

 

 

queue

exchange

basic.publish

 

 

exchange

 

basic.get

 

 

 

queue

basic.consume

 

 

 

queue

queue.purge

 

 

 

queue

举例说明:

  • * 比如创建队列时,会调用 queue.declare 方法,此时会使用到 configure 权限,会校验队列名是否与 configure 的表达式匹配。

    • 比如队列绑定交换器时,会调用 queue.bind 方法,此时会用到 write 和 read 权限,会检验队列名是否与 write 的表达式匹配,交换器名是否与 read 的表达式匹配。
  • Topic权限,参考:http://www.rabbitmq.com/access-control.html#topic-authorisation

  • * Topic权限是RabbitMQ 针对STOMP和MQTT等协议实现的一种权限。由于这类协议都是基于Topic消费的,而AMQP是基于Queue消费,所以AMQP的标准资源权限不适合用在这类协议中,而Topic权限也不适用于AMQP协议。所以,我们一般不会去使用它,只用在使用了MQTT这类的协议时才可能会用到。

1. 使用管理员用户登录Web管理界面。

2.页面添加一个名为 v1 的Virtual Hosts。(此时还需要为此vhost分配用户,添加一个新用户)

3.在 Admin -> Users 页面添加一个名为 order-user 的用户,并设置为 management 角色。

4. 从 Admin 进入 order-user 的用户设置界面,在 Permissions 中,为用户分配vhost为/v1,并为每种权限设置需要匹配的目标名称的正则表达式。

 

字段名

值 

说明 

Virtual Host 

/v1 

指定用户的vhost,以下权限都只限于 /v1 vhost中

Configure regexp

eq-.*

只能操作名称以eq-开头的exchange或queue;为空则不能操作任何exchange和queue 

Write regexp 

.*

能够发送消息到任意名称的exchange,并且能绑定到任意名称的队列和任意名称的目标交

换器(指交换器绑定到交换器),为空表示没有权限 

Read regexp 

^test$ 

只能消费名为test队列上的消息,并且只能绑定到名为test的交换器

5.代码演示

public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setUsername("order-user");
factory.setPassword("order-user");
factory.setVirtualHost("v1");

    Connection connection = null;  
    Channel channel = null;

    // 3、设置每个节点的链接地址和端口  
    Address\[\] addresses = new Address\[\]{  
            new Address("192.168.0.1", 5672),  
            new Address("192.168.0.2", 5672)  
    };

    try {  
        // 开启/关闭连接自动恢复,默认是开启状态  
        factory.setAutomaticRecoveryEnabled(true);

        // 设置每100毫秒尝试恢复一次,默认是5秒:com.rabbitmq.client.ConnectionFactory.DEFAULT\_NETWORK\_RECOVERY\_INTERVAL  
        factory.setNetworkRecoveryInterval(100);

        factory.setTopologyRecoveryEnabled(false);

        // 4、使用连接集合里面的地址获取连接  
        connection = factory.newConnection(addresses, "生产者");

        // 添加重连监听器  
        ((Recoverable) connection).addRecoveryListener(new RecoveryListener() {  
            /\*\*  
             \* 重连成功后的回调  
             \* @param recoverable  
             \*/  
            public void handleRecovery(Recoverable recoverable) {  
                System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 已重新建立连接!");  
            }

            /\*\*  
             \* 开始重连时的回调  
             \* @param recoverable  
             \*/  
            public void handleRecoveryStarted(Recoverable recoverable) {  
                System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 开始尝试重连!");  
            }  
        });

        // 5、从链接中创建通道  
        channel = connection.createChannel();

        /\*\*  
         \* 6、声明(创建)队列  
         \* 如果队列不存在,才会创建  
         \* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错  
         \*  
         \* queueDeclare参数说明:  
         \* @param queue 队列名称  
         \* @param durable 队列是否持久化  
         \* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制  
         \* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除  
         \* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等  
         \*/  
        channel.exchangeDeclare("test-exchange", "fanout");  
        channel.queueDeclare("queue1", false, false, false, null);  
        channel.queueBind("queue1", "test-exchange", "xxoo");  
        for (int i = 0; i < 100; i++) {  
            // 消息内容  
            String message = "Hello World " + i;  
            try {  
                // 7、发送消息  
                channel.basicPublish("test-exchange", "queue1", null, message.getBytes());  
            } catch (AlreadyClosedException e) {  
                // 可能连接已关闭,等待重连  
                System.out.println("消息 " + message + " 发送失败!");  
                i--;  
                TimeUnit.SECONDS.sleep(2);  
                continue;  
            }  
            System.out.println("消息 " + i + " 已发送!");  
            TimeUnit.SECONDS.sleep(2);  
        }

    } catch (IOException e) {  
        e.printStackTrace();  
    } catch (TimeoutException e) {  
        e.printStackTrace();  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    } finally {  
        // 8、关闭通道  
        if (channel != null && channel.isOpen()) {  
            try {  
                channel.close();  
            } catch (IOException e) {  
                e.printStackTrace();  
            } catch (TimeoutException e) {  
                e.printStackTrace();  
            }  
        }

        // 9、关闭连接  
        if (connection != null && connection.isOpen()) {  
            try {  
                connection.close();  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}  

}

public class VirtualHosts {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setUsername("order-user");
factory.setPassword("order-user");
factory.setVirtualHost("v1");

    Connection connection = null;  
    Channel prducerChannel = null;  
    Channel consumerChannel = null;

    // 3、设置每个节点的链接地址和端口  
    Address\[\] addresses = new Address\[\]{  
            new Address("192.168.0.1", 5672),  
            new Address("192.168.0.2", 5672)  
    };

    try {  
        // 4、从连接工厂获取连接  
        connection = factory.newConnection(addresses, "消费者");

        // 5、从链接中创建通道  
        prducerChannel = connection.createChannel();

        prducerChannel.exchangeDeclare("test-exchange", "fanout");  
        prducerChannel.queueDeclare("queue1", false, false, true, null);  
        prducerChannel.queueBind("queue1", "test-exchange", "xxoo");  
        // 消息内容  
        String message = "Hello A";  
        prducerChannel.basicPublish("test-exchange", "c1", null, message.getBytes());

        consumerChannel = connection.createChannel();  
        // 创建一个消费者对象  
        Consumer consumer = new DefaultConsumer(consumerChannel) {  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte\[\] body) throws IOException {  
                System.out.println("收到消息:" + new String(body, "UTF-8"));  
            }  
        };  
        consumerChannel.basicConsume("queue1", true, consumer);

        System.out.println("等待接收消息");  
        System.in.read();  
    } catch (IOException e) {  
        e.printStackTrace();  
    } catch (TimeoutException e) {  
        e.printStackTrace();  
    } finally {  
        // 9、关闭通道  
        if (prducerChannel != null && prducerChannel.isOpen()) {  
            try {  
                prducerChannel.close();  
            } catch (IOException e) {  
                e.printStackTrace();  
            } catch (TimeoutException e) {  
                e.printStackTrace();  
            }  
        }

        // 10、关闭连接  
        if (connection != null && connection.isOpen()) {  
            try {  
                connection.close();  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}  

}

官方资料:https://www.rabbitmq.com/api-guide.html#connection-recovery;根据官方文档说明可知:

  • 通过 factory.setAutomaticRecoveryEnabled(true); 可以设置连接自动恢复的开关,默认已开启
  • 通过 factory.setNetworkRecoveryInterval(10000); 可以设置间隔多长时间尝试恢复一次,默认是5秒: com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL

什么时候会触发连接恢复?https://www.rabbitmq.com/api-guide.html#recovery-triggers

  • 如果启用了自动连接恢复,将由以下事件触发:

  • * 连接的I/O循环中抛出IOExceiption

    • 读取Socket套接字超时
    • 检测不到服务器心跳
    • 在连接的I/O循环中引发任何其他异常
  • 如果客户端第一次连接失败,不会自动恢复连接。需要我们自己负责重试连接、记录失败的尝试、实现重试次数的限制等等。

ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings

try {
Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {
Thread.sleep(5000);
// apply retry logic
}

  • * 如果程序中调用了 Connection.Close ,也不会自动恢复连接。

    • 如果是 Channel-level 的异常,也不会自动恢复连接,因为这些异常通常是应用程序中存在语义问题(例如试图从不存在的队列消费)。
  • 在Connection和Channel上,可以设置重新连接的监听器,开始重连和重连成功时,会触发监听器。添加和移除监听,需要将Connection或Channel强制转换成Recoverable接口。

((Recoverable) connection).addRecoveryListener()
((Recoverable) connection).removeRecoveryListener()

git源码:https://gitee.com/TongHuaShuShuoWoDeJieJu/rabbit.git