一文了解 Java 的 IO 模型

我們都知道在 Java 當中有許許多多的使用上的問題,比如 Java 的鎖,Java 的安全性,以及 Java 的 IO 操作,Java 中各種設計模式的使用,今天我們就來說說關於這個 Java 的 IO。

Java 的 IO 是什麼?

Java IO(輸入輸出)是 Java 程序與外部進行數據交互的接口,包括文件讀寫、標準設備輸出等 1。

Java IO 是建立在流的基礎上進行輸入輸出的,所有數據被串行化寫入輸出流,或者從輸入流中讀入 1。

Java IO 有字節流和字符流兩種形式,其中字節流一次讀寫一個字節,而字符流一次讀寫一個字符。

Java 的 IO 模型

Java 中的 IO 模型主要有三種:

1.BIO(Blocking IO):同步阻塞式 IO,是比較常用的 IO 模型,特點是編寫相對簡單,分爲輸入流和輸出流,進行網絡通訊時,輸入流的讀操作會阻塞住線程,直到有輸出流執行寫操作。

2.NIO(Nonblocking IO):同步非阻塞式 IO,IO 操作不再阻塞線程,當數據準備好後,可以通過 Selector 選擇通道進行數據的發送和接收。

3.AIO(Asynchronous IO):異步非阻塞式 IO,是 Java 7 引入的新特性,基於 NIO 實現,提供了異步文件通道和異步通道的方式進行 IO 操作。

既然我們已經知道了 IO 模型是三種,那麼我們分開來說一下這些 IO 模型。

BIO 同步阻塞 IO

BIO(Blocking I/O):同步阻塞,服務器實現模式爲一個連接一個線程,即客戶端有連接請求時服務器端就需要啓動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷,可以通過線程池機制改善(實現多個客戶連接服務器)

BIO 工作機制:服務端啓動一個 ServerSocket,客戶端發出請求後,先諮詢服務器是否有線程響應,如果沒有則會等待,或者被拒絕。如果有響應,客戶端線程會等待請求結束後,再繼續執行。使用線程池,當一個客戶端連接時就啓動一個線程進行通信

我們簡單來實現一下這個 BIO 客戶端和服務端之間的代碼:

服務端代碼

import java.io.*;
import java.net.*;

public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(4444);
        } catch (IOException e) {
            System.err.println("Could not listen on port: 4444.");
            System.exit(1);
        }

        Socket clientSocket = null;
        try {
            clientSocket = serverSocket.accept();
        } catch (IOException e) {
            System.err.println("Accept failed.");
            System.exit(1);
        }

        PrintWriter out = new PrintWriter(clientSocket.getOutputStream()true);
        BufferedReader in = new BufferedReader(
                new InputStreamReader(
                        clientSocket.getInputStream()));
        String inputLine;

        while ((inputLine = in.readLine()) != null) {
            out.println(inputLine);
            out.flush();
            if (inputLine.equals("Bye.")) {
                break;
            }
        }

        out.close();
        in.close();
        clientSocket.close();
        serverSocket.close();
    }
}

客戶端代碼

import java.io.*;
import java.net.*;

public class Client {
    public static void main(String[] args) throws IOException {
        Socket socket = null;
        PrintWriter out = null;
        BufferedReader in = null;
        try {
            socket = new Socket("localhost", 4444);
            out = new PrintWriter(socket.getOutputStream()true);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        } catch (UnknownHostException e) {
            System.err.println("Don't know about host: localhost.");
            System.exit(1);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for the connection to: localhost.");
            System.exit(1);
        }
        BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
        String userInput;
        while ((userInput = stdIn.readLine()) != null) {
            out.println(userInput);
            System.out.println("echo: " + in.readLine());
            if (userInput.equals("Bye.")) {
                break;
            }
        } 
        out.close(); 
        in.close(); 
        stdIn.close(); 
        socket.close(); 
    } 
}

但是 BIO 的缺點也很顯著:

  1. 線程數量限制:每來一個請求就需要一個線程來處理,線程太多容易造成系統不可用。

  2. 無法處理大量併發請求:當發生大量併發請求時,超過最大數量的線程就只能等待,直到線程池中的有空閒的線程可以被複用。

  3. 對 Socket 的輸入流讀取時,會一直阻塞。

既然 BIO 的缺點這麼明顯,那麼是不是其他的能把這些缺點給避免掉呢?

NIO 同步非阻塞的 I/O 模型

NIO(Non-blocking I/O,在 Java 領域,也稱爲 New I/O),是一種同步非阻塞的 I/O 模型,也是 I/O 多路複用的基礎,已經被越來越多地應用到大型應用服務器,成爲解決高併發與大量連接、I/O 處理問題的有效方式。

NIO 主要有三大核心部分:Channel(通道)、Buffer(緩衝區)、Selector(選擇區)。傳統 IO 基於字節流和字符流進行操作,而 NIO 基於 Channel 和 Buffer(緩衝區)進行操作,數據總是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。

我們說到了這個 NIO 的三大核心部分,那麼我們應該怎麼去理解這個通道,緩衝區,選擇區呢?

我就來說說了不起對於這幾個關鍵核心的理解吧:

Channel(通道):就像水管一樣,負責傳輸數據,可以進行數據的讀取和寫入。

Buffer(緩衝區):是一個內存塊,底層是一個數組。數據的讀取和寫入都是通過 Buffer 進行的。

Selector(選擇器):用於監聽多個通道的事件,比如連接請求、數據到達等。一個線程可以監聽多個通道。

而 NIO 的代碼實現,其實就是依賴於 Java 的 nio 包中的類實現的,我們來實現一下看看。

NIO 代碼實例:

服務端

public class NioServer {
 
    public static void main(String[] args) throws IOException {
        //創建一個選擇器selector
        Selector selector= Selector.open();
        //創建serverSocketChannel
        ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
        //綁定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8888));
        //必須得設置成非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //將channel註冊到selector並設置監聽事件爲ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("===========NIO服務端啓動============");
        while(true){
            //超時等待
            if(selector.select(1000)==0){
                System.out.println("===========NIO服務端超時等待============");
                continue;
            }
            // 有客戶端請求被輪詢監聽到,獲取返回的SelectionKey集合
            Iterator<SelectionKey> iterator=selector.selectedKeys().iterator();
            //迭代器遍歷SelectionKey集合
            while (iterator.hasNext()){
                SelectionKey key=iterator.next();
                // 判斷是否爲ACCEPT事件
                if (key.isAcceptable()){
                    // 處理接收請求事件
                    SocketChannel socketChannel=((ServerSocketChannel) key.channel()).accept();
                    //非阻塞模式
                    socketChannel.configureBlocking(false);
                    // 註冊到Selector並設置監聽事件爲READ
                    socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                    System.out.println("成功連接客戶端");
                }
                //判斷是否爲READ事件
                if (key.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) key.channel();
 
                    try {
                        // 獲取以前設置的附件對象,如果沒有則新建一個
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        if (buffer == null) {
                            buffer = ByteBuffer.allocate(1024);
                            key.attach(buffer);
                        }
                        // 清空緩衝區
                        buffer.clear();
                        // 將通道中的數據讀到緩衝區
                        int len = socketChannel.read(buffer);
                        if (len > 0) {
                            buffer.flip();
                            String message = new String(buffer.array(), 0, len);
                            System.out.println("收到客戶端消息:" + message);
                        } else if (len < 0) {
                            // 接收到-1,表示連接已關閉
                            key.cancel();
                            socketChannel.close();
                            continue;
                        }
                        // 註冊寫事件,下次向客戶端發送消息
                        socketChannel.register(selector, SelectionKey.OP_WRITE, buffer);
                    } catch (IOException e) {
                        // 取消SelectionKey並關閉對應的SocketChannel
                        key.cancel();
                        socketChannel.close();
                    }
                }
                //判斷是否爲WRITE事件
                if (key.isWritable()){
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    //獲取buffer
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    String hello = "你好,坤坤!";
                    //清空buffer
                    buffer.clear();
                    //buffer中寫入消息
                    buffer.put(hello.getBytes());
                    buffer.flip();
                    //向channel中寫入消息
                    socketChannel.write(buffer);
                    buffer.clear();
                    System.out.println("向客戶端發送消息:" + hello);
                    // 設置下次讀寫操作,向 Selector 進行註冊
                    socketChannel.register(selector, SelectionKey.OP_READ, buffer);
                }
                // 移除本次處理的SelectionKey,防止重複處理
                iterator.remove();
            }
        }
 
    }
}

客戶端

public class NioClient {
 
    public static void main(String[] args) throws IOException {
        // 創建SocketChannel並指定ip地址和端口號
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));
        System.out.println("==============NIO客戶端啓動================");
        // 非阻塞模式
        socketChannel.configureBlocking(false);
        String hello="你好,靚仔!";
        ByteBuffer buffer = ByteBuffer.wrap(hello.getBytes());
        // 向通道中寫入數據
        socketChannel.write(buffer);
        System.out.println("發送消息:" + hello);
        buffer.clear();
        // 將channel註冊到Selector並監聽READ事件
        socketChannel.register(Selector.open(), SelectionKey.OP_READ, buffer);
        while (true) {
            // 讀取服務端數據
            if (socketChannel.read(buffer) > 0) {
                buffer.flip();
                String msg = new String(buffer.array(), 0, buffer.limit());
                System.out.println("收到服務端消息:" + msg);
                break;
            }
        }
        // 關閉輸入流
        socketChannel.shutdownInput();
        // 關閉SocketChannel連接
        socketChannel.close();
    }
}

Java NIO 的缺點:

  1. 系統穩定度不夠:服務端響應隨着客戶端的增加延時增加,每一個客戶端需要建立一個線程,當到達一定的限制時,會使系統無法響應。

  2. 編程模型較複雜:NIO 的編程模型相對比較複雜,需要處理多線程和異步 I/O 操作,對開發者的要求較高。

  3. 不支持阻塞模式:NIO 在處理 I/O 請求時,默認情況下是不支持阻塞模式的,需要手動設置。

  4. 內存分配問題:使用 NIO 進行文件操作時,需要手動管理內存,如果處理不當可能會導致內存泄漏。

說完了 NIO 那最後就得來看看這個 AIO 了

AIO 異步非阻塞式 IO

Java 的 AIO(Asynchronous I/O)是 Java NIO 的下一代版本,也稱爲 NIO.2。AIO 在 Java 7 中被引入,提供了一種基於事件驅動的非阻塞 I/O 模型,用於簡化異步 I/O 操作的開發。

AIO 的核心思想是使用異步 I/O 模型,而不是傳統的同步或阻塞 I/O 模型。在 AIO 中,應用程序發出 I/O 請求後,不需要等待操作完成就可以繼續執行其他任務。當操作完成後,應用程序會收到通知,然後可以處理結果。這種模型可以顯著提高應用程序的吞吐量和響應能力。

它們的主要區別就在於這個異步通道,見名知意:使用異步通道去進行 IO 操作時,所有操作都爲異步非阻塞的,當調用 read()/write()/accept()/connect() 方法時,本質上都會交由操作系統去完成,比如要接收一個客戶端的數據時,操作系統會先將通道中可讀的數據先傳入 read() 回調方法指定的緩衝區中,然後再主動通知 Java 程序去處理。

我們直接來看看代碼:

服務端代碼

public class AioServer {
 
    public static void main(String[] args) throws Exception {
        // 創建異步通道組,處理IO事件
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
        //創建異步服務器Socket通道,並綁定端口
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(8888));
        System.out.println("=============AIO服務端啓動=========");
 
        // 異步等待接收客戶端連接
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            // 創建ByteBuffer
            final ByteBuffer buffer = ByteBuffer.allocate(1024);
 
            @Override
            public void completed(AsynchronousSocketChannel channel, Object attachment) {
                System.out.println("客戶端連接成功");
                try {
                    buffer.clear();
                    // 異步讀取客戶端發送的消息
                    channel.read(buffer, null, new CompletionHandler<Integer, Object>() {
                        @Override
                        public void completed(Integer len, Object attachment) {
                            buffer.flip();
                            String message = new String(buffer.array(), 0, len);
                            System.out.println("收到客戶端消息:" + message);
 
                            // 異步發送消息給客戶端
                            channel.write(ByteBuffer.wrap(("你好,阿坤!").getBytes()), null, new CompletionHandler<Integer, Object>() {
                                @Override
                                public void completed(Integer result, Object attachment) {
                                    // 關閉輸出流
                                    try {
                                        channel.shutdownOutput();
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
 
                                @Override
                                public void failed(Throwable exc, Object attachment) {
                                    exc.printStackTrace();
                                    try {
                                        channel.close();
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                            });
                        }
 
                        @Override
                        public void failed(Throwable exc, Object attachment) {
                            exc.printStackTrace();
                            try {
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // 繼續異步等待接收客戶端連接
                server.accept(null, this);
            }
 
            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
                // 繼續異步等待接收客戶端連接
                server.accept(null, this);
            }
        });
        // 等待所有連接都處理完畢
        group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }
 
}

客戶端代碼

public class AioClient {
 
    public static void main(String[] args) throws Exception {
        // 創建異步Socket通道
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
        // 異步連接服務器
        client.connect(new InetSocketAddress("127.0.0.1", 8888), null, new CompletionHandler<Void, Object>() {
            // 創建ByteBuffer
            final ByteBuffer buffer = ByteBuffer.wrap(("你好,靚仔!").getBytes());
 
            @Override
            public void completed(Void result, Object attachment) {
                // 異步發送消息給服務器
                client.write(buffer, null, new CompletionHandler<Integer, Object>() {
                    // 創建ByteBuffer
                    final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 
                    @Override
                    public void completed(Integer result, Object attachment) {
                        readBuffer.clear();
                        // 異步讀取服務器發送的消息
                        client.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
                            @Override
                            public void completed(Integer result, Object attachment) {
                                readBuffer.flip();
                                String msg = new String(readBuffer.array(), 0, result);
                                System.out.println("收到服務端消息:" + msg);
                            }
 
                            @Override
                            public void failed(Throwable exc, Object attachment) {
                                exc.printStackTrace();
                                try {
                                    client.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }
 
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        exc.printStackTrace();
                        try {
                            client.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
 
            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        // 等待連接處理完畢
        Thread.sleep(1000);
        // 關閉輸入流和Socket通道
        client.shutdownInput();
        client.close();
    }
}

那麼 AIO 有什麼缺點麼?

實現複雜:AIO 的實現相對複雜,需要處理異步 I/O 操作和多線程編程,對開發者的要求較高。

需要額外的技能:使用 AIO 需要具備 Java 多線程編程的知識,否則很難寫出高質量的代碼。

存在著名的 Selector 空輪詢 bug:這個 bug 可能導致 CPU 佔用率過高,影響系統性能。

可靠性差:網絡狀態複雜多樣,可能會出現各種各樣的問題,如網絡斷開重連、緩存失效、半包讀寫等,導致 AIO 的可靠性較差。

了不起還是勸大家,如果你不是很精通,可以慢慢學習,不能強制的去在各自的系統中使用,量力而行,畢竟總比搗鼓出 bug 自己解決不了強很多。

關於 IO 模型,你瞭解了麼?

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/pDjce2O0HBqnZ3eUpY71oQ