前面两篇文章提到
NIO的server模式只有5个阶段,但是,NIO的selectionkey里确实有个accept事件,所以,为了区别,衍生出了主reactor和从reactor
并且,从reactor可以根据服务器的负荷,新增多个从reactor进行请求处理
服务器架构如下图
这个就是完整版的reactor模式的架构图了,目前使用到了reactor模式的框架(如netty),基本用的模式就是这个
代码实现:
1 // Reactor線程
2 package server;
3
4 import java.io.IOException;
5 import java.net.InetSocketAddress;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.Selector;
8 import java.nio.channels.ServerSocketChannel;
9 import java.util.Iterator;
10 import java.util.Set;
11
12 public class TCPReactor implements Runnable {
13
14 private final ServerSocketChannel ssc;
15 private final Selector selector; // mainReactor用的selector
16
17 public TCPReactor(int port) throws IOException {
18 selector = Selector.open();
19 ssc = ServerSocketChannel.open();
20 InetSocketAddress addr = new InetSocketAddress(port);
21 ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
22 ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
23 SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
24 sk.attach(new Acceptor(ssc)); // 給定key一個附加的Acceptor對象
25 }
26
27 @Override
28 public void run() {
29 while (!Thread.interrupted()) { // 在線程被中斷前持續運行
30 System.out.println("mainReactor waiting for new event on port: "
31 + ssc.socket().getLocalPort() + "…");
32 try {
33 if (selector.select() == 0) // 若沒有事件就緒則不往下執行
34 continue;
35 } catch (IOException e) {
36 e.printStackTrace();
37 }
38 Set
39 Iterator
40 while (it.hasNext()) {
41 dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
42 it.remove();
43 }
44 }
45 }
46
47 /*
48 * name: dispatch(SelectionKey key)
49 * description: 調度方法,根據事件綁定的對象開新線程
50 */
51 private void dispatch(SelectionKey key) {
52 Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
53 if (r != null)
54 r.run();
55 }
56
57 }
1 // 接受連線請求線程
2 package server;
3
4 import java.io.IOException;
5 import java.nio.channels.SelectionKey;
6 import java.nio.channels.Selector;
7 import java.nio.channels.ServerSocketChannel;
8 import java.nio.channels.SocketChannel;
9
10 public class Acceptor implements Runnable {
11
12 private final ServerSocketChannel ssc; // mainReactor監聽的socket通道
13 private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數
14 private final Selector[] selectors = new Selector[cores]; // 創建核心數個selector給subReactor用
15 private int selIdx = 0; // 當前可使用的subReactor索引
16 private TCPSubReactor[] r = new TCPSubReactor[cores]; // subReactor線程
17 private Thread[] t = new Thread[cores]; // subReactor線程
18
19 public Acceptor(ServerSocketChannel ssc) throws IOException {
20 this.ssc = ssc;
21 // 創建多個selector以及多個subReactor線程
22 for (int i = 0; i < cores; i++) {
23 selectors[i] = Selector.open();
24 r[i] = new TCPSubReactor(selectors[i], ssc, i);
25 t[i] = new Thread(r[i]);
26 t[i].start();
27 }
28 }
29
30 @Override
31 public synchronized void run() {
32 try {
33 SocketChannel sc = ssc.accept(); // 接受client連線請求
34 System.out.println(sc.socket().getRemoteSocketAddress().toString()
35 + " is connected.");
36
37 if (sc != null) {
38 sc.configureBlocking(false); // 設置為非阻塞
39 r[selIdx].setRestart(true); // 暫停線程
40 selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回
41 SelectionKey sk = sc.register(selectors[selIdx],
42 SelectionKey.OP_READ); // SocketChannel向selector[selIdx]註冊一個OP_READ事件,然後返回該通道的key
43 selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回
44 r[selIdx].setRestart(false); // 重啟線程
45 sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
46 if (++selIdx == selectors.length)
47 selIdx = 0;
48 }
49 } catch (IOException e) {
50 e.printStackTrace();
51 }
52 }
53
54 }
1 package server;
2
3 import java.io.IOException;
4 import java.nio.channels.SelectionKey;
5 import java.nio.channels.Selector;
6 import java.nio.channels.ServerSocketChannel;
7 import java.util.Iterator;
8 import java.util.Set;
9
10 public class TCPSubReactor implements Runnable {
11
12 private final ServerSocketChannel ssc;
13 private final Selector selector;
14 private boolean restart = false;
15 int num;
16
17 public TCPSubReactor(Selector selector, ServerSocketChannel ssc, int num) {
18 this.ssc = ssc;
19 this.selector = selector;
20 this.num = num;
21 }
22
23 @Override
24 public void run() {
25 while (!Thread.interrupted()) { // 在線程被中斷前持續運行
26 //System.out.println("ID:" + num
27 // + " subReactor waiting for new event on port: "
28 // + ssc.socket().getLocalPort() + "…");
29 System.out.println("waiting for restart");
30 while (!Thread.interrupted() && !restart) { // 在線程被中斷前以及被指定重啟前持續運行
31 try {
32 if (selector.select() == 0)
33 continue; // 若沒有事件就緒則不往下執行
34 } catch (IOException e) {
35 e.printStackTrace();
36 }
37 Set
38 Iterator
39 while (it.hasNext()) {
40 dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
41 it.remove();
42 }
43 }
44 }
45 }
46
47 /*
48 * name: dispatch(SelectionKey key) description: 調度方法,根據事件綁定的對象開新線程
49 */
50 private void dispatch(SelectionKey key) {
51 Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
52 if (r != null)
53 r.run();
54 }
55
56 public void setRestart(boolean restart) {
57 this.restart = restart;
58 }
59 }
1 // Handler線程
2 package server;
3
4 import java.io.IOException;
5 import java.nio.channels.SelectionKey;
6 import java.nio.channels.SocketChannel;
7 import java.util.concurrent.LinkedBlockingQueue;
8 import java.util.concurrent.ThreadPoolExecutor;
9 import java.util.concurrent.TimeUnit;
10
11 public class TCPHandler implements Runnable {
12
13 private final SelectionKey sk;
14 private final SocketChannel sc;
15 private static final int THREAD_COUNTING = 10;
16 private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
17 THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
18 new LinkedBlockingQueue
19
20 HandlerState state; // 以狀態模式實現Handler
21
22 public TCPHandler(SelectionKey sk, SocketChannel sc) {
23 this.sk = sk;
24 this.sc = sc;
25 state = new ReadState(); // 初始狀態設定為READING
26 pool.setMaximumPoolSize(32); // 設置線程池最大線程數
27 }
28
29 @Override
30 public void run() {
31 try {
32 state.handle(this, sk, sc, pool);
33
34 } catch (IOException e) {
35 System.out.println("[Warning!] A client has been closed.");
36 closeChannel();
37 }
38 }
39
40 public void closeChannel() {
41 try {
42 sk.cancel();
43 sc.close();
44 } catch (IOException e1) {
45 e1.printStackTrace();
46 }
47 }
48
49 public void setState(HandlerState state) {
50 this.state = state;
51 }
52 }
1 package server;
2
3 import java.io.IOException;
4 import java.nio.channels.SelectionKey;
5 import java.nio.channels.SocketChannel;
6 import java.util.concurrent.ThreadPoolExecutor;
7
8 public interface HandlerState {
9
10 public void changeState(TCPHandler h);
11
12 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
13 ThreadPoolExecutor pool) throws IOException ;
14 }
package server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ThreadPoolExecutor;
public class ReadState implements HandlerState{
private SelectionKey sk;
public ReadState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new WorkState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // read()
this.sk = sk;
// non-blocking下不可用Readers,因為Readers不支援non-blocking
byte\[\] arr = new byte\[1024\];
ByteBuffer buf = ByteBuffer.wrap(arr);
int numBytes = sc.read(buf); // 讀取字符串
if(numBytes == -1)
{
System.out.println("\[Warning!\] A client has been closed.");
h.closeChannel();
return;
}
String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
if ((str != null) && !str.equals(" ")) {
h.setState(new WorkState()); // 改變狀態(READING->WORKING)
pool.execute(new WorkerThread(h, str)); // do process in worker thread
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " > " + str);
}
}
/\*
\* 執行邏輯處理之函數
\*/
synchronized void process(TCPHandler h, String str) {
// do process(decode, logically process, encode)..
// ..
h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
this.sk.interestOps(SelectionKey.OP\_WRITE); // 通過key改變通道註冊的事件
this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
}
/\*
\* 工作者線程
\*/
class WorkerThread implements Runnable {
TCPHandler h;
String str;
public WorkerThread(TCPHandler h, String str) {
this.h = h;
this.str=str;
}
@Override
public void run() {
process(h, str);
}
}
}
1 package server;
2
3 import java.io.IOException;
4 import java.nio.channels.SelectionKey;
5 import java.nio.channels.SocketChannel;
6 import java.util.concurrent.ThreadPoolExecutor;
7
8 public class WorkState implements HandlerState {
9
10 public WorkState() {
11 }
12
13 @Override
14 public void changeState(TCPHandler h) {
15 // TODO Auto-generated method stub
16 h.setState(new WriteState());
17 }
18
19 @Override
20 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
21 ThreadPoolExecutor pool) throws IOException {
22 // TODO Auto-generated method stub
23
24 }
25
26 }
package server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ThreadPoolExecutor;
public class WriteState implements HandlerState{
public WriteState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new ReadState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // send()
// get message from message queue
String str = "Your message has sent to "
+ sc.socket().getLocalSocketAddress().toString() + "\\r\\n";
ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
while (buf.hasRemaining()) {
sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
}
h.setState(new ReadState()); // 改變狀態(SENDING->READING)
sk.interestOps(SelectionKey.OP\_READ); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
}
}
1 package server;
2
3 import java.io.IOException;
4
5 public class Main {
6
7
8 public static void main(String[] args) {
9 // TODO Auto-generated method stub
10 try {
11 TCPReactor reactor = new TCPReactor(1333);
12 new Thread(reactor).start();
13 } catch (IOException e) {
14 // TODO Auto-generated catch block
15 e.printStackTrace();
16 }
17 }
18
19 }
总的来说,主从式reactor比多线程的reactor先进的地方在于:
1.主reactor是一个线程,负责监听外部的连线请求,并派发给Acceptor处理。故Main Reactor中的selector只有注册OP_ACCEPT事件,也只能监听OP_ACCEPT事件。
而处理请求是其他N个不同的线程,即从reactor
2.可以根据请求的密集度来调控从reactor的个数
参考文章:
手机扫一扫
移动阅读更方便
你可能感兴趣的文章