Reactor模型

2020-03-27     sandag

要無障礙閱讀本文,需要對NIO有一個大概的了解,起碼要可以寫一個NIO的Hello World。

說到NIO、Netty,Reactor模型一定是繞不開的,因為這種模式架構太經典了,但是好多人在學習的時候,往往會忽視基礎的學習,一上來就是Netty,各種高大上,但是卻沒有靜下心來好好看看Netty的基石——Reactor模型。本文就帶著大家看看Reactor模型,讓大家對Reactor模型有個淺顯而又感性的認識。

說到Reactor,不得不提到一篇文章,文章作者是大名鼎鼎的Doug Lea,Java中的並發包就是出自他之手,下面我試著從文章中挑出一些重要的內容,結合我的理解,來說說Reactor模型,看看Doug Lea大神的腦迴路是多麼的與眾不同。

經典的服務設計

這是最為傳統的Socket服務設計,有多個客戶端連接服務端,服務端會開啟很多線程,一個線程為一個客戶端服務。

在絕大多數場景下,處理一個網絡請求有如下幾個步驟:

  1. read:從socket讀取數據。
  2. decode:解碼,因為網絡上的數據都是以byte的形式進行傳輸的,要想獲取真正的請求,必定需要解碼。
  3. compute:計算,也就是業務處理,你想幹啥就幹啥。
  4. encode:編碼,同理,因為網絡上的數據都是以byte的形式進行傳輸的,也就是socket只接收byte,所以必定需要編碼。

下面我們來看看傳統的BIO代碼:

public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(9696);
Socket socket = serverSocket.accept();
new Thread(() -> {
try {
byte[] byteRead = new byte[1024];
socket.getInputStream().read(byteRead);

String req = new String(byteRead, StandardCharsets.UTF_8);//encode
// do something

byte[] byteWrite = "Hello".getBytes(StandardCharsets.UTF_8);//decode
socket.getOutputStream().write(byteWrite);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
} catch (IOException e) {
e.printStackTrace();
}
}

這段代碼應該不需要解釋了,應該都看得懂,不然是什麼支撐者你看到這裡的。。。

這種處理方式有什麼弊端呢,一眼就可以知道答案:需要開啟大量的線程。

所以我們需要改進它,改進一個東西,肯定需要有目標,我們的目標是什麼?沒有蛀牙。小夥計,你走錯片場了。

我們的目標是:

  1. 隨著負載的增加可以優雅降級;
  2. 能夠隨著資源的改進,性能可以持續提升;
  3. 同時還要滿足可用性和性能指標:
    3.1 低延遲
    3.2 滿足高峰需求
    3.3 可調節的服務質量

讓我們想想為什麼傳統的Socket會有如此的弊端:

  1. 阻塞
    不管是等待客戶端的連接,還是等待客戶的數據,都是阻塞的,一夫當關,萬夫莫開,不管你什麼時候連接我,不管你什麼時候給我數據,我都依然等著你。
    讓我們試想下:如果accept()、read()這兩個方法都是不阻塞的,是不是傳統的Socket問題就解決一半了?
  2. 同步
    服務端是死死的盯著客戶端,看客戶端有沒有連接我,有沒有給我發數據。
    如果我可以喝著茶,打著農藥,而你發了數據,連接了我,系統通知我一下,我再去處理,那該多好,這樣傳統的Socket問題又解決了一半。

所以神說要有NIO,便有了NIO。

NIO

NIO是什麼意思?是什麼的簡寫?Non-blocking,非阻塞的IO模型,這是主流的說法,但是我覺得理解成New IO——新一代的IO模型或許會更好,起碼在Java領域會更好。到底如何理解,就看各位看官的了。

NIO就很好的解決了傳統Socket問題:

  1. 一個線程可以監聽多個Socket,不再是一夫當關,萬夫莫開;
  2. 基於事件驅動:等發生了各種事件,系統可以通知我,我再去處理。

關於NIO的更多概念就不在這裡闡述了,上面寫的只是為了引入今天的主角:Reactor。

Reactor

在講Rector模型之前,我先把客戶端代碼放出來,後面實現Reactor模型會用到:

public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", 9090));
new Thread(() -> {
while (true) {
try {
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
inputStream.read(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

while (true) {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
socket.getOutputStream().write(s.getBytes());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

單Reactor單線程模型

這是最簡單的Reactor模型,可以看到有多個客戶端連接到Reactor,Reactor內部有一個dispatch(分發器)。

有連接請求後,Reactor會通過dispatch把請求交給Acceptor進行處理,有IO讀寫事件之後,又會通過dispatch交給具體的Handler進行處理。

此時一個Reactor既然負責處理連接請求,又要負責處理讀寫請求,一般來說處理連接請求是很快的,但是處理具體的讀寫請求就要涉及到業務邏輯處理了,相對慢太多了。Reactor正在處理讀寫請求的時候,其他請求只能等著,只有等處理完了,才可以處理下一個請求。

畫外音:弱弱的說下,菜的摳腳的我在學習NIO和Reactor的時候,有一個問題是百思不得其解:不是說NIO很強大嗎,在不開啟的線程的時候,一個服務端可以同時處理多個客戶端嗎?為什麼這裡又說只有處理完一個請求,才能處理下一個請求。不知道是否有人和我一個想法,希望我不是唯一一個。。。NIO在不開啟線程的時候,一個服務端可以同時處理多個客戶端,是指的一個客戶端可以監聽多個客戶端的連接、讀寫事件,真正做業務處理還是「一夫當關,萬夫莫開」的效果。

單線程Reactor模型編程簡單,比較適用於每個請求都可以快速完成的場景,但是不能發揮出多核CPU的優勢,在一般情況下,不會使用單Reactor單線程模型。

萬年不變的道理,有很多東西只有真正實踐過了,才能記住,就像Reactor模型,如果僅僅看看圖,哪怕當時自認為理解的非常透徹了,相信用不了半個月也會全部忘記,所以還是要自己敲敲鍵盤,實現一個單Reactor單線程模型。

public class Reactor implements Runnable {
ServerSocketChannel serverSocketChannel;
Selector selector;

public Reactor(int port) {
try {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor(selector, serverSocketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
while (true) {
try {
selector.select();
Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatcher(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}

定義了一個Reactor類。

在構造方法中,註冊了連接事件,並且在selectionKey對象附加了一個Acceptor對象,這是用來處理連接請求的類。

Reactor類實現了Runnable接口,並且實現了run方法,在run方法中,

監聽各種事件,有了事件後,調用dispatcher方法,在dispatcher方法中,拿到了selectionKey附加的對象,隨後調用run方法,注意此時是調用run方法,並沒有開啟線程,只是一個普通的調用而已。

public class Acceptor implements Runnable {
private Selector selector;

private ServerSocketChannel serverSocketChannel;

public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}

@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("有客戶端連接上來了," + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(new WorkHandler(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}

目前如果有事件發生,那一定是連接事件,因為在Reactor類的構造方法中只註冊了連接事件,還沒有註冊讀寫事件。

發生了連接事件後,Reactor類的dispatcher方法拿到了Acceptor附加對象,調用了Acceptor的run方法,在run方法中又註冊了讀事件,然後在selectionKey附加了一個WorkHandler對象。

Acceptor的run方法執行完畢後,就會繼續回到Reactor類中的run方法,負責監聽事件。

此時,Reactor監聽了兩個事件,一個是連接事件,一個是讀事件。

當客戶端寫事件發生後,Reactor又會調用dispatcher方法,此時拿到的附加對象是WorkHandler,所以又跑到了WorkHandler中的run方法。

public class WorkHandler implements Runnable {
private SocketChannel socketChannel;

public WorkHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

@Override
public void run() {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
System.out.println(socketChannel.getRemoteAddress() + "發來的消息是:" + message);
socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
}
}

WorkHandler就是真正負責處理客戶端寫事件的了。

public class Main {
public static void main(String[] args) {
Reactor reactor = new Reactor(9090);
reactor.run();
}
}

下面我們可以進行測試了:

有客戶端連接上來了,/127.0.0.1:63912
/127.0.0.1:63912發來的消息是:你好
有客戶端連接上來了,/127.0.0.1:49290
有客戶端連接上來了,/127.0.0.1:49428
/127.0.0.1:49290發來的消息是:我不好
/127.0.0.1:49428發來的消息是:嘻嘻嘻嘻

畫外音:本文的目的只是為了讓大家更方便、更輕鬆的了解Reactor模型,所以去除了很多東西,比如註冊寫事件、讀寫切換、喚醒等等,如果加上這些瑣碎的東西,很可能讓大家誤入歧途,糾結為什麼要註冊寫事件,不註冊不是照樣可以寫嗎,為什麼要喚醒,不喚醒不是照樣可以監聽到新加的事件嗎,而這些和Reactor模型關係不是很大。

單Reactor多線程模型

我們知道了單Reactor單線程模型有那麼多缺點,就可以有針對性的去解決了。讓我們再回顧下單Reactor單線程模型有什麼缺點:在處理一個客戶端的請求的時候,其他請求只能等著。

那麼我們只要+上多線程的概念不就可以了嗎?沒錯,這就是單Reactor多線程模型。

可以看到,Reactor還是既要負責處理連接事件,又要負責處理客戶端的寫事件,不同的是,多了一個線程池的概念。

當客戶端發起連接請求後,Reactor會把任務交給acceptor處理,如果客戶端發起了寫請求,Reactor會把任務交給線程池進行處理,這樣一個服務端就可以同時為N個客戶端服務了。

讓我們繼續敲敲鍵盤,實現一個單Reactor多線程模型把:

public class Reactor implements Runnable {

ServerSocketChannel serverSocketChannel;

Selector selector;

public Reactor(int port) {
try {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9090));
serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor(serverSocketChannel, selector));
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
while (true) {
try {
selector.select();
Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatcher(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
public class Acceptor implements Runnable {
ServerSocketChannel serverSocketChannel;

Selector selector;

public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
}


@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("有客戶端連接上來了," + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("acceptor thread:" + Thread.currentThread().getName());
selectionKey.attach(new WorkHandler(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class WorkHandler implements Runnable {

static ExecutorService pool = Executors.newFixedThreadPool(2);

private SocketChannel socketChannel;

public WorkHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

@Override
public void run() {
try {
System.out.println("workHandler thread:" + Thread.currentThread().getName());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
pool.execute(new Process(socketChannel, buffer));
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class Process implements Runnable {

private SocketChannel socketChannel;

private ByteBuffer byteBuffer;

public Process(SocketChannel socketChannel, ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
this.socketChannel = socketChannel;
}

@Override
public void run() {
try {
System.out.println("process thread:" + Thread.currentThread().getName());
String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
System.out.println(socketChannel.getRemoteAddress() + "發來的消息是:" + message);
socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
Reactor reactor = new Reactor(9100);
reactor.run();
}
}

單Reactor單線程和單Reactor多線程代碼區別不大,只是有了一個多線程的概念而已。

讓我們再測試一下:

有客戶端連接上來了,/127.0.0.1:55789
acceptor thread:main
有客戶端連接上來了,/127.0.0.1:56681
acceptor thread:main
有客戶端連接上來了,/127.0.0.1:56850
acceptor thread:main
workHandler thread:main
process thread:pool-1-thread-1
/127.0.0.1:55789發來的消息是:我是客戶端1
workHandler thread:main
process thread:pool-1-thread-2
/127.0.0.1:56681發來的消息是:我是客戶端2
workHandler thread:main
process thread:pool-1-thread-1
/127.0.0.1:56850發來的消息是:我是客戶端3

可以很清楚的看到acceptor、workHandler還是主線程,但是到了process就開啟多線程了。

單Reactor多線程模型看起來是很不錯了,但是還是有缺點:一個Reactor還是既然負責連接請求,又要負責讀寫請求,連接請求是很快的,而且一個客戶端一般只要連接一次就可以了,但是會發生很多次寫請求,如果可以有多個Reactor,其中一個Reactor負責處理連接事件,多個Reactor負責處理客戶端的寫事件就好了,這樣更符合單一職責,所以主從Reactor模型誕生了。

主從Reactor模型

這就是主從Reactor模型了,可以看到mainReactor只負責連接請求,而subReactor

只負責處理客戶端的寫事件。

下面來實現一個主從Reactor模型,需要注意的是,我實現的主從Reactor模型和圖片上有區別。圖片上是一主一從,而我實現的是一主八從,圖片上一個subReactor下面開了一個線程池,而我實現的subReactor之下沒有線程池,雖然有所不同,但是核心思路是一樣的。

public class Reactor implements Runnable {
private ServerSocketChannel serverSocketChannel;

private Selector selector;

public Reactor(int port) {
try {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor(serverSocketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
try {
while (true) {
selector.select();
Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatcher(selectionKey);
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
public class Acceptor implements Runnable {
private ServerSocketChannel serverSocketChannel;
private final int CORE = 8;

private int index;

private SubReactor[] subReactors = new SubReactor[CORE];
private Thread[] threads = new Thread[CORE];
private final Selector[] selectors = new Selector[CORE];

public Acceptor(ServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
for (int i = 0; i < CORE; i++) {
try {
selectors[i] = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
subReactors[i] = new SubReactor(selectors[i]);
threads[i] = new Thread(subReactors[i]);
threads[i].start();
}
}

@Override
public void run() {
try {
System.out.println("acceptor thread:" + Thread.currentThread().getName());
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("有客戶端連接上來了," + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
selectors[index].wakeup();
SelectionKey selectionKey = socketChannel.register(selectors[index], SelectionKey.OP_READ);
selectionKey.attach(new WorkHandler(socketChannel));
if (++index == threads.length) {
index = 0;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class SubReactor implements Runnable {
private Selector selector;

public SubReactor(Selector selector) {
this.selector = selector;
}


@Override
public void run() {
while (true) {
try {
selector.select();
System.out.println("selector:" + selector.toString() + "thread:" + Thread.currentThread().getName());
Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatcher(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
public class Main {
public static void main(String[] args) {
Reactor reactor = new Reactor(9090);
reactor.run();
}
}

最大的不同在於Acceptor類的構造方法,我開了8個線程,8個subReactor,8個selector,程序一啟動,8個線程就會執行,執行的就是subReactor中定義的run方法,監聽事件。在Acceptor中的run方法中,又註冊了讀事件,所以ubReactor中定義的run方法監聽的就是讀事件了。

下面我們來測試下:

acceptor thread:main
有客戶端連接上來了,/127.0.0.1:57986
selector:sun.nio.ch.WindowsSelectorImpl@94f1d6thread:Thread-0
acceptor thread:main
有客戶端連接上來了,/127.0.0.1:58142
selector:sun.nio.ch.WindowsSelectorImpl@1819b93thread:Thread-1
acceptor thread:main
有客戶端連接上來了,/127.0.0.1:58183
selector:sun.nio.ch.WindowsSelectorImpl@1d04799thread:Thread-2
selector:sun.nio.ch.WindowsSelectorImpl@94f1d6thread:Thread-0
/127.0.0.1:57986發來的消息是:1
selector:sun.nio.ch.WindowsSelectorImpl@1819b93thread:Thread-1
/127.0.0.1:58142發來的消息是:2
selector:sun.nio.ch.WindowsSelectorImpl@1d04799thread:Thread-2
/127.0.0.1:58183發來的消息是:3
acceptor thread:main
有客戶端連接上來了,/127.0.0.1:59462
selector:sun.nio.ch.WindowsSelectorImpl@11d3ebfthread:Thread-3
selector:sun.nio.ch.WindowsSelectorImpl@11d3ebfthread:Thread-3
/127.0.0.1:59462發來的消息是:1111

可以很清楚的看到,從始至終,acceptor都只有一個main線程,而負責處理客戶端寫請求的是不同的線程,而且還是不同的reactor、selector。

Reactor模型結構圖

看完了三種Reactor模型,我們還要看下Reactor模型的結構圖,圖片來自在業內的公認講Reactor模型最好的論文,沒有之一。

看起來有點複雜,我們一個個來看。

  • Synchronous Event Demultiplexer:同步事件分離器,用於監聽各種事件,調用方調用監聽方法的時候會被阻塞,直到有事件發生,才會返回。對於Linux來說,同步事件分離器指的就是IO多路復用模型,比如epoll,poll 等, 對於Java NIO來說, 同步事件分離器對應的組件就是selector,對應的阻塞方法就是select。
  • Handler:本質上是文件描述符,是一個抽象的概念,可以簡單的理解為一個一個事件,該事件可以來自於外部,比如客戶端連接事件,客戶端的寫事件等等,也可以是內部的事件,比如作業系統產生的定時器事件等等。
  • Event Handler:事件處理器,本質上是回調方法,當有事件發生後,框架會根據Handler調用對應的回調方法,在大多數情況下,是虛函數,需要用戶自己實現接口,實現具體的方法。
  • Concrete Event Handler: 具體的事件處理器,是Event Handler的具體實現。
  • Initiation Dispatcher:初始分發器,實際上就是Reactor角色,提供了一系列方法,對Event Handler進行註冊和移除;還會調用Synchronous Event Demultiplexer監聽各種事件;當有事件發生後,還要調用對應的Event Handler。
文章來源: https://twgreatdaily.com/jUp2HnEBnkjnB-0zL9km.html