redis的发布端
package dubbo.wangbiao.project.pubsub;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.JedisPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PublishClient {
public static void main(String[] args) throws InterruptedException {
//创建连接池
GenericObjectPoolConfig poolConfig=new GenericObjectPoolConfig();
poolConfig.setMaxIdle(5);
JedisPool jedisPool = new JedisPool(poolConfig,"127.0.0.1",6379,1000,"123456");
//创建线程池,并设定线程数量
ExecutorService executorService = Executors.newFixedThreadPool(5);
//创建一个发布者
Publisher publisher = new Publisher(jedisPool,"发布者1");
executorService.submit(publisher);
executorService.shutdown();
executorService.awaitTermination(600, TimeUnit.SECONDS);
}
}
redis订阅端
package dubbo.wangbiao.project.pubsub;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.JedisPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SubscriberClient {
public static void main(String\[\] args) throws InterruptedException {
//创建redis连接池
GenericObjectPoolConfig poolConfig=new GenericObjectPoolConfig();
poolConfig.setMaxIdle(5);
JedisPool jedisPool = new JedisPool(poolConfig,"127.0.0.1",6379,1000,"123456");
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
//创建订阅者
final SubscriberListener subscriberListener = new SubscriberListener("订阅者一号");
//订阅频道
Subscriber subscriber = new Subscriber(jedisPool, subscriberListener, "发布者1");
executorService.submit(subscriber);
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
//30s后取消订阅
Thread.sleep(3000);
subscriberListener.onUnsubscribe("发布者1", 0);
}
}
redis的发布功能
package dubbo.wangbiao.project.pubsub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* redis的发布
*/
public class Publisher extends Thread{
private final JedisPool jedisPool;
private String chanelName;
public Publisher(JedisPool jedisPool, String chanelName) {
this.jedisPool = jedisPool;
this.chanelName = chanelName;
System.out.println("【发布者\\""+chanelName+"\\"初始化成功】");
System.out.println("请输入要发布的消息:");
}
@Override
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource();
while (true) {
String line = null;
try {
line = reader.readLine();
if (!"quit".equals(line)) {
System.out.println(chanelName+"发布消息成功");
jedis.publish(chanelName, line);
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
redis的订阅功能
package dubbo.wangbiao.project.pubsub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* 订阅者
*/
public class Subscriber extends Thread{
//jedis连接池
private final JedisPool jedisPool;
private final SubscriberListener subscriberListener;
private String channelName;
public Subscriber(JedisPool jedisPool, SubscriberListener subscriberListener, String channelName) {
super();
this.jedisPool = jedisPool;
this.subscriberListener = subscriberListener;
this.channelName = channelName;
}
@Override
public void run() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(subscriberListener,channelName);// 通过jedis.subscribe()方法去订阅,入参是1.订阅者、2.频道名称
} catch (Exception e) {
e.printStackTrace();
System.out.println(String.format("频道订阅失败:%s",e));
} finally {
if (null != jedis) {
jedis.close();
}
}
}
}
发布订阅监听端
package dubbo.wangbiao.project.pubsub;
import redis.clients.jedis.JedisPubSub;
/**
* 订阅的监听
*/
public class SubscriberListener extends JedisPubSub {
private String subName;
public SubscriberListener(String subName) {
this.subName = subName;
}
// 取得订阅的消息后的处理
@Override
public void onMessage(String channel, String message) {
System.out.println(String.format("【"+subName + "接收到消息】频道:%s;消息:%s。" , channel , message));
}
// 初始化订阅时候的处理
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format( "【"+subName + "订阅频道成功】频道:%s;频道数:%d。" , channel , subscribedChannels));
}
// 取消订阅时候的处理
@Override
public void onUnsubscribe(String channelName, int subscribedChannels) {
System.out.println(String.format( "【"+subName + "取消订阅】频道:%s;频道数:%d。",channelName , subscribedChannels));
}
}
客户端命令演示:
手机扫一扫
移动阅读更方便
你可能感兴趣的文章