Rabbit-用户上线接收消息
阅读原文时间:2023年07月08日阅读:2

application-dev.yml

spring:
  rabbitmq:
    username: admin
    password: admin
    host: 192.168.0.45
    port: 5672

消费者实现类

MyConsumer.java

package com.meeno.inner.oa.common.rabbit.test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * @description:
 * @author: Wzq
 * @create: 2020-05-20 15:07
 */
public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }

}

动态发送到队列,接收demo

RabbitProduce.java

package com.meeno.inner.oa.common.rabbit.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @description:
 * @author: Wzq
 * @create: 2020-05-20 14:43
 */
public class RabbitProduce {

    public static void produce() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // "guest"/"guest" by default, limited to localhost connections
        String virtualHost = "/";
        String queueName = "queue1";
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost(virtualHost);
        factory.setHost("192.168.0.45");
        factory.setPort(5672);

        Connection conn = factory.newConnection();
        //创建通道
        Channel channel = conn.createChannel();
        channel.queueDeclare(queueName,true,false,false,null);

        //声明交换机
        channel.exchangeDeclare("direct","direct");

        //队列绑定到交换机并指定rouing_key
        channel.queueBind(queueName,"direct","user1");

        String msg = "吴志奇好帅Two!";
        // 基本发布消息
        // 第一个参数为交换机名称、
        // 第二个参数为队列映射的路由key、
        // 第三个参数为消息的其他属性、
        // 第四个参数为发送信息的主体
        channel.basicPublish("direct","user2",null,msg.getBytes());

        channel.close();
        conn.close();
    }

    public static void consumer() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // "guest"/"guest" by default, limited to localhost connections
        String virtualHost = "/";
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost(virtualHost);
        factory.setHost("192.168.0.45");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "direct";
        String routingKey = "user2";
        String queueName = "queue1";

        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queueDeclare(queueName,true,false,false,null);
        //声明交换机
        channel.exchangeDeclare("direct","direct");
        //队列绑定到交换机并指定rouing_key
        channel.queueBind(queueName, exchangeName, routingKey);

        channel.basicConsume(queueName, true, new MyConsumer(channel));
        /*// 创建一个消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消费收到消息的时候调用的回调
                System.out.println("C3接收到:" + new String(body));
            }
        };*/
    }

    public static void main(String[] args) throws IOException, TimeoutException {

//        produce();

        consumer();

    }

}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章