reactor模式:主从式reactor
阅读原文时间:2023年07月09日阅读:2

前面两篇文章提到

reactor模式:单线程的reactor模式

reactor模式:多线程的reactor模式

NIO的server模式只有5个阶段,但是,NIO的selectionkey里确实有个accept事件,所以,为了区别,衍生出了主reactor和从reactor

并且,从reactor可以根据服务器的负荷,新增多个从reactor进行请求处理

服务器架构如下图

这个就是完整版的reactor模式的架构图了,目前使用到了reactor模式的框架(如netty),基本用的模式就是这个

代码实现:

1 // Reactor線程
2 package server;
3
4 import java.io.IOException;
5 import java.net.InetSocketAddress;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.Selector;
8 import java.nio.channels.ServerSocketChannel;
9 import java.util.Iterator;
10 import java.util.Set;
11
12 public class TCPReactor implements Runnable {
13
14 private final ServerSocketChannel ssc;
15 private final Selector selector; // mainReactor用的selector
16
17 public TCPReactor(int port) throws IOException {
18 selector = Selector.open();
19 ssc = ServerSocketChannel.open();
20 InetSocketAddress addr = new InetSocketAddress(port);
21 ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
22 ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
23 SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
24 sk.attach(new Acceptor(ssc)); // 給定key一個附加的Acceptor對象
25 }
26
27 @Override
28 public void run() {
29 while (!Thread.interrupted()) { // 在線程被中斷前持續運行
30 System.out.println("mainReactor waiting for new event on port: "
31 + ssc.socket().getLocalPort() + "…");
32 try {
33 if (selector.select() == 0) // 若沒有事件就緒則不往下執行
34 continue;
35 } catch (IOException e) {
36 e.printStackTrace();
37 }
38 Set selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
39 Iterator it = selectedKeys.iterator();
40 while (it.hasNext()) {
41 dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
42 it.remove();
43 }
44 }
45 }
46
47 /*
48 * name: dispatch(SelectionKey key)
49 * description: 調度方法,根據事件綁定的對象開新線程
50 */
51 private void dispatch(SelectionKey key) {
52 Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
53 if (r != null)
54 r.run();
55 }
56
57 }

1 // 接受連線請求線程
2 package server;
3
4 import java.io.IOException;
5 import java.nio.channels.SelectionKey;
6 import java.nio.channels.Selector;
7 import java.nio.channels.ServerSocketChannel;
8 import java.nio.channels.SocketChannel;
9
10 public class Acceptor implements Runnable {
11
12 private final ServerSocketChannel ssc; // mainReactor監聽的socket通道
13 private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數
14 private final Selector[] selectors = new Selector[cores]; // 創建核心數個selector給subReactor用
15 private int selIdx = 0; // 當前可使用的subReactor索引
16 private TCPSubReactor[] r = new TCPSubReactor[cores]; // subReactor線程
17 private Thread[] t = new Thread[cores]; // subReactor線程
18
19 public Acceptor(ServerSocketChannel ssc) throws IOException {
20 this.ssc = ssc;
21 // 創建多個selector以及多個subReactor線程
22 for (int i = 0; i < cores; i++) {
23 selectors[i] = Selector.open();
24 r[i] = new TCPSubReactor(selectors[i], ssc, i);
25 t[i] = new Thread(r[i]);
26 t[i].start();
27 }
28 }
29
30 @Override
31 public synchronized void run() {
32 try {
33 SocketChannel sc = ssc.accept(); // 接受client連線請求
34 System.out.println(sc.socket().getRemoteSocketAddress().toString()
35 + " is connected.");
36
37 if (sc != null) {
38 sc.configureBlocking(false); // 設置為非阻塞
39 r[selIdx].setRestart(true); // 暫停線程
40 selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回
41 SelectionKey sk = sc.register(selectors[selIdx],
42 SelectionKey.OP_READ); // SocketChannel向selector[selIdx]註冊一個OP_READ事件,然後返回該通道的key
43 selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回
44 r[selIdx].setRestart(false); // 重啟線程
45 sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
46 if (++selIdx == selectors.length)
47 selIdx = 0;
48 }
49 } catch (IOException e) {
50 e.printStackTrace();
51 }
52 }
53
54 }

1 package server;
2
3 import java.io.IOException;
4 import java.nio.channels.SelectionKey;
5 import java.nio.channels.Selector;
6 import java.nio.channels.ServerSocketChannel;
7 import java.util.Iterator;
8 import java.util.Set;
9
10 public class TCPSubReactor implements Runnable {
11
12 private final ServerSocketChannel ssc;
13 private final Selector selector;
14 private boolean restart = false;
15 int num;
16
17 public TCPSubReactor(Selector selector, ServerSocketChannel ssc, int num) {
18 this.ssc = ssc;
19 this.selector = selector;
20 this.num = num;
21 }
22
23 @Override
24 public void run() {
25 while (!Thread.interrupted()) { // 在線程被中斷前持續運行
26 //System.out.println("ID:" + num
27 // + " subReactor waiting for new event on port: "
28 // + ssc.socket().getLocalPort() + "…");
29 System.out.println("waiting for restart");
30 while (!Thread.interrupted() && !restart) { // 在線程被中斷前以及被指定重啟前持續運行
31 try {
32 if (selector.select() == 0)
33 continue; // 若沒有事件就緒則不往下執行
34 } catch (IOException e) {
35 e.printStackTrace();
36 }
37 Set selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
38 Iterator it = selectedKeys.iterator();
39 while (it.hasNext()) {
40 dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
41 it.remove();
42 }
43 }
44 }
45 }
46
47 /*
48 * name: dispatch(SelectionKey key) description: 調度方法,根據事件綁定的對象開新線程
49 */
50 private void dispatch(SelectionKey key) {
51 Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
52 if (r != null)
53 r.run();
54 }
55
56 public void setRestart(boolean restart) {
57 this.restart = restart;
58 }
59 }

1 // Handler線程
2 package server;
3
4 import java.io.IOException;
5 import java.nio.channels.SelectionKey;
6 import java.nio.channels.SocketChannel;
7 import java.util.concurrent.LinkedBlockingQueue;
8 import java.util.concurrent.ThreadPoolExecutor;
9 import java.util.concurrent.TimeUnit;
10
11 public class TCPHandler implements Runnable {
12
13 private final SelectionKey sk;
14 private final SocketChannel sc;
15 private static final int THREAD_COUNTING = 10;
16 private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
17 THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
18 new LinkedBlockingQueue()); // 線程池
19
20 HandlerState state; // 以狀態模式實現Handler
21
22 public TCPHandler(SelectionKey sk, SocketChannel sc) {
23 this.sk = sk;
24 this.sc = sc;
25 state = new ReadState(); // 初始狀態設定為READING
26 pool.setMaximumPoolSize(32); // 設置線程池最大線程數
27 }
28
29 @Override
30 public void run() {
31 try {
32 state.handle(this, sk, sc, pool);
33
34 } catch (IOException e) {
35 System.out.println("[Warning!] A client has been closed.");
36 closeChannel();
37 }
38 }
39
40 public void closeChannel() {
41 try {
42 sk.cancel();
43 sc.close();
44 } catch (IOException e1) {
45 e1.printStackTrace();
46 }
47 }
48
49 public void setState(HandlerState state) {
50 this.state = state;
51 }
52 }

1 package server;
2
3 import java.io.IOException;
4 import java.nio.channels.SelectionKey;
5 import java.nio.channels.SocketChannel;
6 import java.util.concurrent.ThreadPoolExecutor;
7
8 public interface HandlerState {
9
10 public void changeState(TCPHandler h);
11
12 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
13 ThreadPoolExecutor pool) throws IOException ;
14 }

package server;

import java.io.IOException;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.SocketChannel;  
import java.util.concurrent.ThreadPoolExecutor;  

public class ReadState implements HandlerState{  

    private SelectionKey sk;  

    public ReadState() {  
    }  

    @Override  
    public void changeState(TCPHandler h) {  
        // TODO Auto-generated method stub  
        h.setState(new WorkState());  
    }  

    @Override  
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,  
            ThreadPoolExecutor pool) throws IOException { // read()  
        this.sk = sk;  
        // non-blocking下不可用Readers,因為Readers不支援non-blocking  
        byte\[\] arr = new byte\[1024\];  
        ByteBuffer buf = ByteBuffer.wrap(arr);  

        int numBytes = sc.read(buf); // 讀取字符串  
        if(numBytes == -1)  
        {  
            System.out.println("\[Warning!\] A client has been closed.");  
            h.closeChannel();  
            return;  
        }  
        String str = new String(arr); // 將讀取到的byte內容轉為字符串型態  
        if ((str != null) && !str.equals(" ")) {  
            h.setState(new WorkState()); // 改變狀態(READING->WORKING)  
            pool.execute(new WorkerThread(h, str)); // do process in worker thread  
            System.out.println(sc.socket().getRemoteSocketAddress().toString()  
                    + " > " + str);  
        }  

    }  

    /\*  
     \* 執行邏輯處理之函數  
     \*/  
    synchronized void process(TCPHandler h, String str) {  
        // do process(decode, logically process, encode)..  
        // ..  
        h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)  
        this.sk.interestOps(SelectionKey.OP\_WRITE); // 通過key改變通道註冊的事件  
        this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  
    }  

    /\*  
     \* 工作者線程  
     \*/  
    class WorkerThread implements Runnable {  

        TCPHandler h;  
        String str;  

        public WorkerThread(TCPHandler h, String str) {  
            this.h = h;  
            this.str=str;  
        }  

        @Override  
        public void run() {  
            process(h, str);  
        }  

    }  
}  

1 package server;
2
3 import java.io.IOException;
4 import java.nio.channels.SelectionKey;
5 import java.nio.channels.SocketChannel;
6 import java.util.concurrent.ThreadPoolExecutor;
7
8 public class WorkState implements HandlerState {
9
10 public WorkState() {
11 }
12
13 @Override
14 public void changeState(TCPHandler h) {
15 // TODO Auto-generated method stub
16 h.setState(new WriteState());
17 }
18
19 @Override
20 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
21 ThreadPoolExecutor pool) throws IOException {
22 // TODO Auto-generated method stub
23
24 }
25
26 }

package server;

import java.io.IOException;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.SocketChannel;  
import java.util.concurrent.ThreadPoolExecutor;  

public class WriteState implements HandlerState{  

    public WriteState() {  
    }  

    @Override  
    public void changeState(TCPHandler h) {  
        // TODO Auto-generated method stub  
        h.setState(new ReadState());  
    }  

    @Override  
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,  
            ThreadPoolExecutor pool) throws IOException { // send()  
        // get message from message queue  

        String str = "Your message has sent to "  
                + sc.socket().getLocalSocketAddress().toString() + "\\r\\n";  
        ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()  

        while (buf.hasRemaining()) {  
            sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容  
        }  

        h.setState(new ReadState()); // 改變狀態(SENDING->READING)  
        sk.interestOps(SelectionKey.OP\_READ); // 通過key改變通道註冊的事件  
        sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  
    }  
}  

1 package server;
2
3 import java.io.IOException;
4
5 public class Main {
6
7
8 public static void main(String[] args) {
9 // TODO Auto-generated method stub
10 try {
11 TCPReactor reactor = new TCPReactor(1333);
12 new Thread(reactor).start();
13 } catch (IOException e) {
14 // TODO Auto-generated catch block
15 e.printStackTrace();
16 }
17 }
18
19 }

总的来说,主从式reactor比多线程的reactor先进的地方在于:

1.主reactor是一个线程,负责监听外部的连线请求,并派发给Acceptor处理。故Main Reactor中的selector只有注册OP_ACCEPT事件,也只能监听OP_ACCEPT事件。

而处理请求是其他N个不同的线程,即从reactor

2.可以根据请求的密集度来调控从reactor的个数

参考文章:

https://blog.csdn.net/yehjordan/article/details/51026045