redis subscribe/publish(发布订阅)
阅读原文时间:2023年07月09日阅读:1

redis的发布端

package dubbo.wangbiao.project.pubsub;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.JedisPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PublishClient {
public static void main(String[] args) throws InterruptedException {
//创建连接池
GenericObjectPoolConfig poolConfig=new GenericObjectPoolConfig();
poolConfig.setMaxIdle(5);
JedisPool jedisPool = new JedisPool(poolConfig,"127.0.0.1",6379,1000,"123456");
//创建线程池,并设定线程数量
ExecutorService executorService = Executors.newFixedThreadPool(5);
//创建一个发布者
Publisher publisher = new Publisher(jedisPool,"发布者1");
executorService.submit(publisher);
executorService.shutdown();
executorService.awaitTermination(600, TimeUnit.SECONDS);
}
}

redis订阅端

package dubbo.wangbiao.project.pubsub;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.JedisPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SubscriberClient {

public static void main(String\[\] args) throws InterruptedException {  
    //创建redis连接池

    GenericObjectPoolConfig poolConfig=new GenericObjectPoolConfig();  
    poolConfig.setMaxIdle(5);  
    JedisPool jedisPool = new JedisPool(poolConfig,"127.0.0.1",6379,1000,"123456");

    //创建线程池  
    ExecutorService executorService = Executors.newFixedThreadPool(5);  
    //创建订阅者  
    final SubscriberListener subscriberListener = new SubscriberListener("订阅者一号");  
    //订阅频道  
    Subscriber subscriber = new Subscriber(jedisPool, subscriberListener, "发布者1");  
    executorService.submit(subscriber);  
    executorService.shutdown();  
    executorService.awaitTermination(60, TimeUnit.SECONDS);

    //30s后取消订阅  
    Thread.sleep(3000);  
    subscriberListener.onUnsubscribe("发布者1", 0);  
}  

}

redis的发布功能

package dubbo.wangbiao.project.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
* redis的发布
*/
public class Publisher extends Thread{

    private final JedisPool jedisPool;  
    private String chanelName;

    public Publisher(JedisPool jedisPool, String chanelName) {  
        this.jedisPool = jedisPool;  
        this.chanelName = chanelName;  
        System.out.println("【发布者\\""+chanelName+"\\"初始化成功】");  
        System.out.println("请输入要发布的消息:");  
    }

    @Override  
    public void run() {  
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));  
        Jedis jedis = jedisPool.getResource();  
        while (true) {  
            String line = null;  
            try {  
                line = reader.readLine();  
                if (!"quit".equals(line)) {  
                    System.out.println(chanelName+"发布消息成功");  
                    jedis.publish(chanelName, line);  
                } else {  
                    break;  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }

        }  
    }  
}

redis的订阅功能

package dubbo.wangbiao.project.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
* 订阅者
*/
public class Subscriber extends Thread{
//jedis连接池
private final JedisPool jedisPool;
private final SubscriberListener subscriberListener;

private String channelName;

public Subscriber(JedisPool jedisPool, SubscriberListener subscriberListener, String channelName) {  
    super();  
    this.jedisPool = jedisPool;  
    this.subscriberListener = subscriberListener;  
    this.channelName = channelName;  
}

@Override  
public void run() {  
    Jedis jedis = null;  
    try {  
        jedis = jedisPool.getResource();  
        jedis.subscribe(subscriberListener,channelName);// 通过jedis.subscribe()方法去订阅,入参是1.订阅者、2.频道名称  
    } catch (Exception e) {  
        e.printStackTrace();  
        System.out.println(String.format("频道订阅失败:%s",e));  
    } finally {  
        if (null != jedis) {  
            jedis.close();  
        }  
    }  
}  

}

发布订阅监听端

package dubbo.wangbiao.project.pubsub;

import redis.clients.jedis.JedisPubSub;

/**
* 订阅的监听
*/
public class SubscriberListener extends JedisPubSub {

private String subName;  
public SubscriberListener(String subName) {  
    this.subName = subName;  
}

// 取得订阅的消息后的处理  
@Override  
public void onMessage(String channel, String message) {  
    System.out.println(String.format("【"+subName + "接收到消息】频道:%s;消息:%s。" , channel , message));  
}

// 初始化订阅时候的处理  
@Override  
public void onSubscribe(String channel, int subscribedChannels) {  
    System.out.println(String.format( "【"+subName + "订阅频道成功】频道:%s;频道数:%d。" , channel  , subscribedChannels));  
}

// 取消订阅时候的处理  
@Override  
public void onUnsubscribe(String channelName, int subscribedChannels) {  
    System.out.println(String.format( "【"+subName + "取消订阅】频道:%s;频道数:%d。",channelName , subscribedChannels));  
}  

}

客户端命令演示:

  • * publish/subscribe是一对多的关系,
    • Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。
    • 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等)。 news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。