Java NIO 选择器 Selector

选择器 Selector 是 I/O 多路复用的核心组件,它可以监控实现了 SelectableChannel 的通道的就绪情况。有了多路复用(multiplexing) I/O 模型,使得单线程的 Java 程序在极端情况下能够处理数万个连接,极大提高了程序的并发数。

1. 多路复用 I/O 模型

I/O 多路复用模型是操作系统提供给应用程序的一种进行 I/O 操作的模型。应用程序通过 select/poll 系统调用监控多个 I/O 设备,一旦某个或者多个 I/O 设备的处于就绪状态(例如:可读)则返回,应用程序随后可对就绪的设备进行操作。

应用程序 内核 select 数据未准备好 阻塞等待 等待数据 数据已准备好 复制完成 复制数据到 用户空间 read 处理数据 阻塞等待 系统调用 返回可读 系统调用 返回 OK

大致流程如下:

1)应用程序向内核发起 select 系统调用,该调用会阻塞应用程序。

2)内核等待数据到达。数据可能由 DMA 复制到内核缓冲区,也有可能是 CPU 进行复制。

3)数据准备完毕,select 调用返回。select 返回的是一个集和,可能有多个连接都已经就绪。

4)应用程序发起 read 系统调用。

5)操作系统将数据有内核缓冲区复制到用户缓冲区。

6)read 调用返回。

I/O 多路复用模型本质上是一种阻塞 I/O,进行读操作的 read 系统调用是阻塞的,select 的时候也是阻塞的。不过 I/O 多路复用模型的优势在于阻塞时可以等待多路 I/O 就绪,然后一并处理。与多线程处理多路 I/O 相比,它是单线程的,没有线程切换的开销,单位时间内能够处理多的连接数。

2. 选择器与通道关系

在 Java 中,通道 Channel 可以表示 I/O 连接,而选择器可以监控某些 I/O 事件就绪的通道,选择通道中就绪的 I/O 事件。这里的通道必须是实现了 SelectableChannel 接口的通道,例如:SocketChannel, DatagramChannel 等;而文件通道 FileChannel 没有实现该接口,所以不支持选择器。

Selector 选择器 SelectableChannel 可选择通道 SelectableChannel 可选择通道 SelectableChannel 可选择通道 SelectableChannel 可选择通道

3. 选择键 SelectionKey

选择器 Selector 监控通道时监控的是通道中的事件,选择键 SelectionKey 就代表着 I/O 事件。程序通过调用 Selector.select() 方法来选中选择器所监控的通道中的就绪的 I/O 事件的集合,然后遍历集合,对事件作出相应的处理。

选择键 SelectionKey 可以表示 4 种事件,这 4 种事件使用 int 类型的常量来表示。

1)SelectionKey.OP_ACCEPT 表示 accept 事件就绪。例如:对于 ServerSocketChannel 来说,该事件就绪表示可以调用 accept() 方法来获得与客户端连接的通道 SocketChannel。

2)SelectionKey.OP_CONNECT 表示客户端与服务端连接成功。

3)SelectionKey.OP_READ 表示通道中已经有了可读数据,可以调用 read() 方法从通道中读取数据。

4)SelectionKey.OP_WRITE 表示写事件就绪,可以调用 write() 方法往通道中写入数据。

不同的通道所能够支持的 I/O 事件不同,例如:ServerSocketChannel 只支持 accept 事件,而 DatagramChannel 只支持 read 和 write 事件。要查看通道所支持的事件,可以查看通道的 javadoc 文档,或者调用通道的 validOps() 方法来进行判断。例如:channel.validOps() & SelectionKey.OP_READ > 0 表示 channel 支持读事件。

4. 选择器使用步骤

4.1 获取选择器

与通道和缓冲区的获取类似,选择器的获取也是通过静态工厂方法 open() 来得到的。

Selector selector = Selector.open();    // 获取一个选择器实例

4.2 获取可选择通道

能够被选择器监控的通道必须实现了 SelectableChannel 接口,并且需要将通道配置成非阻塞模式,否则后续的注册步骤会抛出 IllegalBlockingModeException。

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打开 SocketChannel 并连接到本机 9090 端口
socketChannel.configureBlocking(false); // 配置通道为非阻塞模式

4.3 将通道注册到选择器

通道在被指定的选择器监控之前,应该先告诉选择器,并且告知监控的事件,即:将通道注册到选择器。

通道的注册通过 SelectableChannel.register(Selector selector, int ops) 来完成,ops 表示关注的事件,如果需要关注该通道的多个 I/O 事件,可以传入这些事件类型或运算之后的结果。这些事件必须是通道所支持的,否则抛出 IllegalArgumentException。

socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);   // 将套接字通过到注册到选择器,关注 read 和 write 事件

4.4 轮询 select 就绪事件

通过调用选择器的 Selector.select() 方法可以获取就绪事件,该方法会将就绪事件放到一个 SelectionKey 集合中,然后返回就绪的事件的个数。这个方法映射多路复用 I/O 模型中的 select 系统调用,它是一个阻塞方法。正常情况下,直到至少有一个就绪事件,或者其它线程调用了当前 Selector 对象的 wakeup() 方法,或者当前线程被中断时返回。

while (selector.select() > 0){ // 轮询,且返回时有就绪事件
    Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合
    .......
}

有 3 种方式可以 select 就绪事件:

1)select() 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup() 或者当前线程被中断时返回。

2)select(long timeout) 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup(),或者当前线程被中断,或者阻塞时长达到了 timeout 时返回。不抛出超时异常。

3)selectNode() 不阻塞,如果无就绪事件,则返回 0;如果有就绪事件,则将就绪事件放到一个集合,返回就绪事件的数量。

4.5 处理就绪事件

每次可以 select 出一批就绪的事件,所以需要对这些事件进行迭代。从一个 SelectionKey 对象可以得到:1)就绪事件的对应的通道;2)就绪的事件。通过这些信息,就可以很方便地进行 I/O 操作。

for(SelectionKey key : keys){
    if(key.isWritable()){ // 可写事件
        if("Bye".equals( (line = scanner.nextLine()) )){
            socketChannel.shutdownOutput();
            socketChannel.close();
            break;
        }
        buf.put(line.getBytes());
        buf.flip();
        socketChannel.write(buf);
        buf.compact();
    }
}
keys.clear(); // 清除选择键(事件)集,避免下次循环的时候重复处理。

需要注意的是,处理完 I/O 事件之后,需要清除选择键集和,避免下一轮循环的时候对同一事件重复处理。

5. 完整示例

下面给出一个完整的实例,实例中包含 TCP 客户端 TcpClient, UDP 客户端 UdpClient 和服务端 EchoServer。服务端 EchoServer 可以同时处理 UDP 请求和 TCP 请求,用户可以在客户端控制台输入内容,按回车发送给服务端,服务端打印客户端发送过来的内容。完整代码:https://github.com/Robothy/java-experiments/tree/main/nio/Selector

5.1 服务端

public class EchoServer {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();    // 获取选择器

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开服务器通道
        serverSocketChannel.configureBlocking(false);                         // 服务器通道配置为非阻塞模式
        serverSocketChannel.bind(new InetSocketAddress(9090));           // 绑定 TCP 端口 9090
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);       // 将服务器通道注册到选择器 selector 中,注册事件为 ACCEPT

        DatagramChannel datagramChannel = DatagramChannel.open();             // 打开套接字通道
        datagramChannel.configureBlocking(false);                             // 配置通道为非阻塞模式
        datagramChannel.bind(new InetSocketAddress(9090));               // 绑定 UDP 端口 9090
        datagramChannel.register(selector, SelectionKey.OP_READ);             // 将通道注册到选择器 selector 中,注册事件为读取数据

        ByteBuffer buf = ByteBuffer.allocate(1024);                           // 分配一个 1024 字节的堆字节缓冲区

        while (selector.select() > 0){                                        // 轮询已经就绪的注册的通道的 I/O 事件
            Set<SelectionKey> keys = selector.selectedKeys();                 // 获取就绪的 I/O 事件,即选择器键集合
            for (SelectionKey key : keys){                                    // 遍历选择键,处理就绪事件
                if(key.isAcceptable()){                                       // 选择键的事件的是 I/O 连接事件
                    SocketChannel socketChannel = serverSocketChannel.accept(); // 执行 I/O 操作,获取套接字连接通道
                    socketChannel.configureBlocking(false);                   // 配置为套接字通道为非阻塞模式
                    socketChannel.register(selector, SelectionKey.OP_READ);   // 将套接字通过到注册到选择器,关注 READ 事件
                }else if(key.isReadable()){                        // 选择键的事件是 READ
                    StringBuilder sb = new StringBuilder();
                    if(key.channel() instanceof DatagramChannel){  // 选择的通道为数据报通道,客户端是通过 UDP 连接过来的
                        sb.append("UDP Client: ");
                        datagramChannel.receive(buf);              // 最多读取 1024 字节,数据报多出的部分自动丢弃
                        buf.flip();
                        while(buf.position() < buf.limit()) {
                            sb.append((char)buf.get());
                        }
                        buf.clear();
                    }else{                                          // 选择的通道为套接字通道,客户端时通过 TCP 连接过来的
                        sb.append("TCP Client: ");
                        ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 获取通道
                        int size;
                        while ( (size = channel.read(buf))>0){
                            buf.flip();
                            while (buf.position() < buf.limit()) {
                                sb.append((char)buf.get());
                            }
                            buf.clear();
                        }

                        if (size == -1) {
                            sb.append("Exit");
                            channel.close();
                        }
                    }
                    System.out.println(sb);
                }
            }
            keys.clear();  // 将选择键清空,防止下次循环时被重复处理
        }
    }
}

5.2 TCP 客户端

public class TcpClient {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();

        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_WRITE);

        Scanner scanner = new Scanner(System.in);
        String line;
        ByteBuffer buf = ByteBuffer.allocate(1024);

        while (selector.select() > 0){
            Set<SelectionKey> keys = selector.selectedKeys();
            for(SelectionKey key : keys){
                if(key.isWritable()){
                    if("Bye".equals( (line = scanner.nextLine()) )){
                        socketChannel.shutdownOutput();
                        socketChannel.close();
                        break;
                    }
                    buf.put(line.getBytes());
                    buf.flip();
                    socketChannel.write(buf);
                    buf.compact();
                }
            }
            keys.clear();
            if(!socketChannel.isOpen()) break;
        }
    }
}

5.3 UDP 客户端

public class UdpClient {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();                        // 获取选择器
        DatagramChannel datagramChannel = DatagramChannel.open();   // 打开一个数据报通道
        datagramChannel.configureBlocking(false);                   // 配置通道为非阻塞模式
        datagramChannel.register(selector, SelectionKey.OP_WRITE);  // 将通道的写事件注册到选择器
        ByteBuffer buff = ByteBuffer.allocate(1024);                // 分配字节缓冲区
        Scanner scanner = new Scanner(System.in);                   // 创建扫描器,扫描控制台输入流
        InetSocketAddress server = new InetSocketAddress("localhost", 9090);
        while (selector.select() > 0){                              // 有就绪事件
            Set<SelectionKey> keys = selector.selectedKeys();       // 获取选择键,即就绪的事件
            for(SelectionKey key : keys){                           // 遍历选择键
                if(key.isWritable()){                               // 如果当前选择键是读就绪
                    String line;
                    if("Bye".equals( line = scanner.nextLine() )) { // 从控制台获取 1 行输入,并检查输入的是不是 Bye
                        System.exit(0);                 // 正常退出
                    }
                    buff.put(line.getBytes());          // 放入缓冲区
                    buff.flip();                        // 将缓冲区置为读状态
                    datagramChannel.send(buff, server); // 往 I/O 写数据
                    buff.compact();                     // 压缩缓冲区,保留没发送完的数据
                }
            }
            keys.clear();
        }
    }
}

6. 小结

Selector 作为多路复用 I/O 模型的核心组件,能够同时监控多路 I/O 通道。选择器在 select 就绪事件地时候会阻塞,在处理 I/O 事件的时候也会阻塞,它的优势在于在阻塞的时候可以等待多路 I/O 就绪,是一种异步阻塞 I/O 模型。与多线程处理多路 I/O 相比,多路复用模型只需要单个线程即可处理万级连接,没有线程切换的开销。

发表评论

评论已关闭。

相关文章