一篇文章帶你瞭解 Netty

Netty

傳統的 IO 模型的 web 容器,比如老版本的 Tomcat,爲了增加系統的吞吐量,需要不斷增加系統核心線程數量,或者通過水平擴展服務器數量,來增加系統處理請求的能力。有了 NIO 之後,一個線程即可處理多個連接事件,其中基於多路複用模型的 Netty 框架,不僅降低了使用 NIO 的複雜度,

優點

Netty 是一款以 java NIO 爲基礎,基於事件驅動模型支持異步、高併發的網絡應用框架。

Reactor 模型

Reactor 模型也叫做反應器設計模式,是一種爲處理服務請求併發提交到一個或者多個服務處理器的事件設計模式。

而 Netty 是基於 Reactor 模型設計的一套高性能網絡應用框架,下面來看下常見的 Reactor 模型有哪些:

一、單 Reactor 單線程

1)可以實現通過一個阻塞對象監聽多個鏈接請求

2)Reactor 對象通過 select 監聽客戶端請求事件,通過 dispatch 進行分發

3)如果是建立鏈接請求,則由 Acceptor 通過 accept 處理鏈接請求,然後創建一個 Handler 對象處理完成鏈接後的各種事件

4)如果不是鏈接請求,則由 Reactor 分發調用鏈接對應的 Handler 來處理

5)Handler 會完成 Read-> 業務處理 ->send 的完整業務流程

二、單 Reactor 多線程

1)Reactor 對象通過 select 監聽客戶端請求事件,收到事件後,通過 dispatch 分發

2)如果是建立鏈接請求,則由 Acceptor 通過 accept 處理鏈接請求,然後創建一個 Handler 對象處理完成鏈接後的各種事件

3)如果不是鏈接請求,則由 Reactor 分發調用鏈接對應的 Handler 來處理

4)Handler 只負責事件響應不做具體業務處理

5)通過 read 讀取數據後,分發到 worker 線程池處理,處理完成後返回給 Handler,Handler 收到後,通過 send 將結果返回給 client

三、主從 Reactor 多線程

1)Reactor 主線程 MainReactor 對象通過 select 監聽鏈接事件,通過 Acceptor 處理

2)當 Acceptor 處理鏈接事件後,MainReactor 將鏈接分配給 SubReactor

3)SubReactor 將鏈接加入到隊列進行監聽,並創建 Handler 進行事件處理

4)當有新事件發生時,SubReactor 就會調用對應的 Handler 處理

5)Handler 通過 read 讀取數據,分發到 worker 線程池處理,處理完成後返回給 Handler,Handler 收到後,通過 send 將結果返回給 client

6)Reactor 主線程可以對應多個 Reactor 子線程

三種模式用生活案例來理解:

1)單 Reactor 單線程,前臺接待員和服務員是同一個人,全程爲顧客服務

2)單 Reactor 多線程,1 個前臺接待員,多個服務員,接待員只負責接待

3)主從 Reactor 多線程,多個前臺接待員,多個服務員

Reactor 模型具有如下優點:

1)響應快,不必爲單個同步事件所阻塞,雖然 Reactor 本身依然是同步的

2)可以最大程度的避免複雜的多線程及同步問題,並且避免了多線程 / 進程的切換開銷

3)擴展性好,可以方便的通過增加 Reactor 實例個數來充分利用 CPU 資源

4)複用性好,Reactor 模型本身與具體事件處理邏輯無關,具有很高的複用性

核心組件

Bootstrap
一個 Netty 應用通常由一個 Bootstrap 開始,它主要作用是配置整個 Netty 程序,串聯起各個組件。

Handler
爲了支持各種協議和處理數據的方式,便誕生了 Handler 組件。Handler 主要用來處理各種事件,這裏的事件很廣泛,比如可以是連接、數據接收、異常、數據轉換等。

ChannelInboundHandler
一個最常用的 Handler。這個 Handler 的作用就是處理接收到數據時的事件,也就是說,我們的業務邏輯一般就是寫在這個 Handler 裏面的,ChannelInboundHandler 就是用來處理我們的核心業務邏輯。

ChannelInitializer
當一個鏈接建立時,我們需要知道怎麼來接收或者發送數據,當然,我們有各種各樣的 Handler 實現來處理它,那麼 ChannelInitializer 便是用來配置這些 Handler,它會提供一個 ChannelPipeline,並把 Handler 加入到 ChannelPipeline。

ChannelPipeline
一個 Netty 應用基於 ChannelPipeline 機制,這種機制需要依賴於 EventLoop 和 EventLoopGroup,因爲它們三個都和事件或者事件處理相關。

EventLoops
目的是爲 Channel 處理 IO 操作,一個 EventLoop 可以爲多個 Channel 服務。EventLoopGroup 由多個 EventLoop 組成。

Channel
代表了一個 Socket 鏈接,或者其它和 IO 操作相關的組件,它和 EventLoop 一起用來參與 IO 處理。

Future
在 Netty 中所有的 IO 操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過一會等它執行完成或者直接註冊一個監聽,具體的實現就是通過 Future 和 ChannelFutures, 他們可以註冊一個監聽,當操作執行成功或失敗時監聽會自動觸發。

示例

下面通過一個簡單的示例,瞭解怎麼基於 Netty 如何開發一個簡單的網絡通信程序。和之前一樣,包含服務端與客戶端:

Server:

@Slf4j
public class Server {

    private EventLoopGroup boosGroup;

    private EventLoopGroup workGroup;

    public Server(int port){
        try {
            init(port);
            log.info("----- 服務啓動成功 -----");
        } catch (InterruptedException e) {
            log.error("啓動服務出錯:{}", e.getCause());
        }
    }

    private void init(int port) throws InterruptedException {
        // 處理連接
        this.boosGroup = new NioEventLoopGroup();
        // 處理業務
        this.workGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        // 綁定
        bootstrap.group(boosGroup, workGroup)
                .channel(NioServerSocketChannel.class) //配置服務端
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_RCVBUF, 1024)
                .childOption(ChannelOption.SO_SNDBUF, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.bind(port).sync();
        channelFuture.channel().closeFuture().sync();
    }

    public void close(){
        this.boosGroup.shutdownGracefully();
        this.workGroup.shutdownGracefully();
    }

}

@Slf4j
class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>> server active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //1. 讀取客戶端的數據(緩存中去取並打印到控制檯)
        ByteBuf buf = (ByteBuf) msg;
        byte[] request = new byte[buf.readableBytes()];
        buf.readBytes(request);
        String requestBody = new String(request, "utf-8");
        log.info(">>>>>>>>> receive message: {}", requestBody);

        //2. 返回響應數據
        ctx.writeAndFlush(Unpooled.copiedBuffer((requestBody+" too").getBytes()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

Client:

@Slf4j
public class Client {

    private EventLoopGroup workGroup;
    private ChannelFuture channelFuture;

    public Client(int port){
        init(port);
    }

    private void init(int port){
        this.workGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024)
                .option(ChannelOption.SO_SNDBUF, 1024)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });

        this.channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
    }

    /**
     *
     * @param message
     */
    public void send(String message){
        this.channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
    }

    /**
     *
     */
    public void close(){
        try {
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        workGroup.shutdownGracefully();
    }
}

@Slf4j
class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>> client active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);

            String body = new String(req, "utf-8");
            log.info(">>>>>>>>> receive message: {}", body);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

測試:

public class StarterTests {

    static int port = 9011;

    @Test
    public void startServer(){
        Server server = new Server(9011);
    }

    @Test
    public void startClient(){
        Client client = new Client(port);
        client.send("Hello Netty!");
        while (true){}
    }

}

主要實現步驟:

1、創建兩個 NIO 線程組,一個專門用來網絡事件處理(接受客戶端連接),另一個則進行網絡通訊讀寫

2、創建一個 ServerBootstrap 對象,配置 Netty 的一系列參數,例如接收傳入數據的緩存大小等。

3、創建一個實際處理數據的類 ChannelInitializer, 進行初始化的準備工作,比如設置傳入數據的字符集格式,實現具體處理數據的接口。

4、綁定端口,執行同步阻塞方法等待服務器啓動即可

類似技術

Mina、Netty、Grizzly

1、Mina

Mina(Multipurpose Infrastructure for Network Applications) 是 Apache 組織一個較新的項目,它爲開發高性能和高可用性的網絡應用程序提供了非常便利的框架。當前發行的 Mina 版本 2.04 支持基於 Java NIO 技術的 TCP/UDP 應用程序開發、串口通訊程序,Mina 所支持的功能也在進一步的擴展中。

目前,正在使用 Mina 的應用包括:Apache Directory Project、AsyncWeb、AMQP(Advanced Message Queuing Protocol)、RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、 Openfire 等等。

2、Netty

Netty 是一款異步的事件驅動的網絡應用框架和工具,用於快速開發可維護的高性能、高擴展性協議服務器和客戶端。也就是說,Netty 是一個 NIO 客戶端 / 服務器框架,支持快速、簡單地開發網絡應用,如協議服務器和客戶端。它極大簡化了網絡編程,如 TCP 和 UDP 套接字服務器。

3、Grizzly

Grizzly 是一種應用程序框架,專門解決編寫成千上萬用戶訪問服務器時候產生的各種問題。使用 JAVA NIO 作爲基礎,並隱藏其編程的複雜性。容易使用的高性能的 API。帶來非阻塞 socketd 到協議處理層。利用高性能的緩衝和緩衝管理使用高性能的線程池。

從設計的理念上來看,Mina 的設計理念是最爲優雅的。當然,由於 Netty 的主導作者與 Mina 的主導作者是同一人,出自同一人之手的 Netty 在設計理念上與 Mina 基本上是一致的。而 Grizzly 在設計理念上就較差了點,幾乎是 JavaNIO 的簡單封裝。

結束語

前面我們說到了各種 IO 模型,基於 Java NIO 開發的網絡應用程序,能夠更好的基於服務器操作系統底層支持,更好的利用服務器資源提供高性能、可靠易擴展的應用。而 Netty 在 Java NIO 的基礎上,爲開發這提供便捷提升開發的效率與穩定性,在很多主流的框架中都有被使用,比如 Dubbo、Spring Webflux 等

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/uEQBgzRAB7eCjqu0k_28aw