RabbitAdmin
阅读原文时间:2023年07月09日阅读:2

RabbitAdmin底层实现就是从Spring容器中获取Exchange、Binding、Routingkey以及Queue的@声明
然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作
例如添加一个交换机、删除一个绑定、清空一个队列里的消息等

注意:autoStartup必须设置为true,否则Spring容器不会加载RabbitAdmin类

需导入的依赖


com.rabbitmq amqp-client

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

创建RabbitAdmin,使用@Bean将其注入到spring容器中

package com.dwz.spring;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.dwz.spring.*")
public class RabbitMQConfig {

@Bean  
public ConnectionFactory connectionFactory() {  
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
    connectionFactory.setAddresses("127.0.0.1:5672");  
    connectionFactory.setVirtualHost("/vhost\_dwz");  
    connectionFactory.setUsername("root\_dwz");  
    connectionFactory.setPassword("123456");  
    return connectionFactory;  
}

@Bean  
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {  
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);  
    System.err.println("RabbitAdmin启动了。。。");  
    //设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置)  
    rabbitAdmin.setAutoStartup(true);  
    return rabbitAdmin;  
}  

}

在test演示其相关方法

package com.dwz.spring;
import java.util.HashMap;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestDwz {
@Autowired
private RabbitAdmin rabbitAdmin;

@Test  
public void test() {  
    rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));  
    rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));  
    rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));

    rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));  
    rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));  
    rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

    //先声明队列和交换机再绑定  
    rabbitAdmin.declareBinding(new Binding("test.direct.queue",  
            Binding.DestinationType.QUEUE,  
            "test.direct", "direct", new HashMap<>()));

    //绑定的时候再声明队列和交换机  
    rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", false))//直接创建队列  
            .to(new TopicExchange("test.topic", false, false))//直接创建交换机,建立关联关系  
            .with("user.#"));//指定路由key

    rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", false))  
            .to(new FanoutExchange("test.fanout", false, false)));

    //清空队列数据  
    rabbitAdmin.purgeQueue("test.topic.queue", false);  
}  

}

使用SpringAMQP的@Bean方式去声明

/**
* 针对消费者的配置
* 1.设置交换机的类型
* 2.将队列绑定到交换机
* FanoutExchange:将消息分发到所有绑定的队列,无routingkey的概念
* TopicExchange:多关键字匹配
* HeadersExchange:通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
*/
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}

@Bean  
public Queue queue001() {  
    return new Queue("queue001", true);//队列持久化  
}

@Bean  
public Binding binding001() {  
    return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.\*");  
}

@Bean  
public TopicExchange exchange002() {  
    return new TopicExchange("topic002", true, false);  
}

@Bean  
public Queue queue002() {  
    return new Queue("queue002", true);//队列持久化  
}

@Bean  
public Binding binding002() {  
    return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.\*");  
}

@Bean  
public TopicExchange exchange003() {  
    return new TopicExchange("topic003", true, false);  
}

@Bean  
public Queue queue003() {  
    return new Queue("queue003", true);//队列持久化  
}

@Bean  
public Binding binding003() {  
    return BindingBuilder.bind(queue003()).to(exchange003()).with("mq.\*");  
}

@Bean  
public Queue queue\_image() {  
    return new Queue("image\_queue", true);//队列持久化  
}

@Bean  
public Queue queue\_pdf() {  
    return new Queue("pdf\_queue", true);//队列持久化  
}