学习笔记-I/O模型详解

bg.png

相关背景

理解IO的各种模型需要先了解计算机的运作流程,下面将对其一些专业的名称做一些简单介绍

什么是I/O?

在计算机系统中I/O就是输入(Input)和输出(Output)的意思,针对不同的操作对象,可以划分为磁盘I/O模型,网络I/O模型,内存映射I/O, Direct I/O、数据库I/O等,只要具有输入输出类型的交互系统都可以认为是I/O系统。I/O可以简单的认为就是“读写”。

在Linux系统中,计算机里面的程序(代码程序)都是作为文件存在硬盘里面的。开机之后,这些程序就会从硬盘加载到内存并经过处理,变成CPU可执行的格式,此时这些程序就变成了进程。
kernel(内核)是开机是第一个加载到内存的程序。其他的程序都是有内核帮忙加载进内存的。内核进程和其他进程都会按照时间片占有CPU并发的运行

内核kernel

内核的主要功能是屏蔽硬件的底层操作,封装系统调用(system call)函数,用户进程可以通过请求内核调用内核中的函数完成工作(注意用户进程无法直接调用系统函数,只有内核才可以执行系统调用,系统调用的代码存在内核中)。

系统调用的作用就是,让其他进程调用内核提供的接口函数,让内核程序帮这些进程去完成这个操作,而不是用户进程自己去做这些事。如果是用户进程自己去做这些事,就需要他们自己重新实现这些方法,冗余且复杂,所以内核提供的这些系统调用大大减少了用户进程的工作量。

为了防止其他程序知道内核所在的内存地址修改内核kernel中的指令,Linux提供了一种保护模式,内核进程是处于这种保护模式之下(这种保护模式的作用就是不让其他程序知道内核所在的内存地址,这样其他进程就无法访问内核和修改内核):当cpu执行内核进程中的指令时,内核可以访问其他进程的内存,但是cpu执行用户进程中的指令时,用户进程不能访问内核和其他用户进程的内存地址,只能访问自己这个进程的内存(也就是说,kernel可以访问和修改其他进程的内容,其他的进程只能访问和修改自己这个进程的内容)。
image.png
用户进程不能访问到内核又怎么调用到内核提供的系统调用(函数)呢?这个时候就提出了系统中断的概念。

系统中断

所谓的中断其实就是告诉CPU停止运行当前的程序去做另一件事情,去做另一件事情有很多种情况中断发生的情况有很多种可能是时间片用完了,cpu就会接受到中断指令,于是cpu就停止当前程序的运行让下一个进程占用进行工作也可能是程序中运行到sleep,yield这样的代码,运行到这样的代码也会发送中断指令给cpu,cpu就也会中断当前程序的运行,让其他进程运行也可能是移动鼠标,鼠标这个硬件设备会发送一个中断指令给cpu,cpu就会中断当前进程的运行,然后发送一个io请求让鼠标移动。

中断是一个计算机指令,这个指令后面会跟一个数字,这些数字映射到一个存着回调函数的表中(这个表叫做中断向量表,是存在CPU的寄存器中的,只有当发起中断指令给cpu时,cpu才会往这个表里面查数字对应的回调函数),一个数字代表一个回调函数。所以中断指令后面跟着的数字代表cpu中断程序后,会做些什么操作,是去调度下一个程序还是去发起一个io请求之类的。

总结:中断就是告诉cpu停下手里的工作去干另一件事,干完这件事之后你可以继续运行刚刚的程序,也可以去运行其他程序。

如果没有系统中断机制,那么cpu就会跑完一个程序再跑另一个程序或者干其他事情。那计算机就变成串行运行程序而不是并发运行。因此系统中断是计算机可以并发运行多个程序的关键

现在回到其他进程怎么调用内核的系统调用。比如,我用OutputStream调用了一个write函数,这个write函数里面其实埋了一个中断指令(int 0x80,int就是中断指令,这是cpu才能识别的指令),当cpu运行到这个int 0x80的时候,就会找中断向量表对应0x80的回调函数并执行,回调函数会让cpu中断当前进程,并切换到内核进程(让内核进程占有cpu),内核再去调用系统调用中的write方法。这个过程中,就由用户态的程序切换到了内核态,进入到了内核态就自然可以调内核中的函数了。不是用户进程去调内核的函数,而是切换到内核态后内核自己调自己进程的方法,是内核自己访问自己的内存。

io操作都会需要进行系统调用(调用内核提供的函数),所以io操作都需要系统中断(中断会使用户进程让出cpu),都得经过一个用户态内核态的切换。所以io操作的成本比较高。

总结:系统调用需要进行系统中断,切换用户态和内核态,系统中断就要当前程序让出cpu停止工作,把cpu交给内核。IO操作都需要内核执行系统调用。

系统中断的过程和分类

系统中断分为两种:硬中断和软中断

硬中断是由计算机硬件发起的中断,如网卡,鼠标,键盘和打印机等。硬中断可能发生在任何时期。

以网卡为例:当网卡接收到一个网络报文,报文由网卡的DMA(直接存储器访问)写入到内存(网卡缓冲区),网卡再向CPU发起一个中断请求(IRQ,interrupt request),CPU收到中断信号会停下当前用户进程的运行,做好上下文环境的保存(保存到PCB的进程描述符中)。之后CPU从用户态切换到内核态(CPU所保存的堆栈地址从用户空间切换到内核空间的堆栈地址),执行网卡的中断程序。之后会切换回进程的用户态,CPU从进程描述符中读取上下文继续工作。

软中断是正在运行的用户态进程产生的,最常见的软中断就是用户程序要进行IO操作的时候,此时用户进程的上下文环境会从CPU的寄存器写入到内存中(写入到进程的进程描述符中,不是写入到用户空间中)以保存上下文。之后CPU由用户态切换到内核态进行系统调用,再之后CPU会切换会刚才的用户态,加载上下文环境到CPU的寄存器中,然后继续用户进程的运行。除了io操作外,像sleep,yield代码也会产生软中断使当前进程让出CPU。

无论是硬中断还是软中断,每种系统中断都由各自不同的中断处理程序(即中断之后要执行的函数,要做的事情),例如系统调用他的中断处理程序就是 0x80 。像网卡,鼠标,键盘,硬盘都有它对应的中断处理程序。

这些中断处理程序的编号会以数字的形式存在CPU的中断向量表中。而中断处理程序的内容存在内核中,CPU会拿着这个编号去找内核中对应的中断处理程序来执行。
所以无论是硬中断还是软中断的io操作都需要进行用户态切换到内核态,因为要执行中断处理程序。

用户空间和内核空间

系统分配给每个进程(注意是每个进程)**一个独立的、连续的、虚拟**的内存空间,该大小一般是4G(是所有进程都放在这里面共用)。其中将高地址值的内存空间分配给内核使用,一般是1G,其他空间给用户程序使用(即所有进程共用这3G)。linux下每个进程都被分配了用户空间和内核空间。可以理解为用户空间和内核空间是存储在不同的内存空间中。

进程控制块

进程控制块是存放进程的管理和控制信息的数据结构称为进程控制块。它是进程管理和控制的最重要的数据结构,每一个进程都有一个PCB,在创建进程时,建立PCB,伴随进程运行的全过程,直到进程撤销而撤销。

在不同的操作系统中对进程的管理和控制机制不同,PCB中的信息存在差异,通常PCB包含如下信息。

  1. 进程标识符:每个进程都必须有一个唯一的标识符,可以是字符串,也可以是数字,UNIX系统中就是一个整型数,在进程创建时由系统赋予。
  2. 进程当前状态:说明进程当前所处的状态,为了管理的方便,系统设计时会将相同状态的进程组成一个队列,如就绪进程队列,等待进程则要根据等待的事件组成多个等待队列,如等待打印机队列、等待磁盘I/O完成队列等等。
  3. 进程相应的程序和数据地址:将PCB与其程序和数据联系起来。
  4. 进程资源清单:列出除CPU以外的资源记录,如拥有的I/O设备,打开的文件列表等。
  5. 进程优先级:进程的优先级反映进程的紧迫程度,通常由用户指定和系统设置。UNIX系统采用用户设置和系统计算相结合的方式确定进程的优先级。
  6. CPU现场保护区:当进程因某种原因不能继续占用CPU时(等待打印机),释放CPU,这时就要将CPU的各种状态信息保护起来,为将来再次得到处理机恢复CPU的各种状态,继续进行。
  7. 进程同步与通信机制:用于实现进程间的互斥、同步和通信所需的信号量等。
  8. 进程所在队列PCB的链接字:根据进程所处的现行状态,进程相应的PCB参加到不同队列中。PCB链接字指出该进程所在队列中下一个进程PCB的首地址。
  9. 与进程相关的其它信息:如进程记账信息,进程占用CPU的时间等。

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。也就是说,进程的切换是由内核控制的。

为什么要划分用户空间和内核空间?为了保证用户进程不能直接操作内核,保证内核的安全

用户态切换内核态的过程:(用户态与内核态切换本质是CPU在用户空间的内存和内核空间内存的切换)

Linux创建进程的时候,会给该进程分配两块空间:用户空间(用户栈)和内核空间(内核栈)。PCB进程控制块中保存着该进程的用户栈空间的地址和内核空间的地址。

CPU的寄存器中存储着当前用户程序的运行信息和上下文以及用户栈的地址。当CPU从用户态切换到内核态的时候(比如因为硬中断,硬件设备向CPU发起IRQ),首先会将用户程序的运行 信息和上下文存到PCB的进程描述符(有点类似与游戏存档),然后CPU寄存器记录的堆栈地址从用户堆栈的地址指向为内核堆栈的地址(这就是用户空间切换到内核空间),CPU查询中断向量表在内核中找到对应的中断处理程序,并加载到CPU的寄存器,然后执行中断处理程序。此时可以说,CPU被内核进程给占用。


上面就是用户态切换为内核态的过程。内核态切换为用户态就是一个反过来的过程。

什么情况下会进行用户态切换内核态

  1. 系统调用(用户程序自己发起中断,软中断)
  2. 外部设备发起中断请求(硬中断)
  3. 用户程序异常

用户态切换内核态与进程间切换的区别

CPU在两个进程间的切换本质上是CPU在两块PCB内存间的切换,CPU会从读取某块PCB切换为读取另一块PCB的数据,然后进行运算。而用户态切换到内核态是CPU从用户空间这块内存切换到内核空间这块内存。所以二者都是CPU在不同内存间的切换。二者都需要进行用户程序的中断和上下文的保存。所以二者的耗时和成本基本相当。

缓存 I/O

缓存I/O又被称作标准I/O,目前大多数操作系统中的文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,数据先从磁盘复制到内核空间的缓冲区,然后从内核空间缓冲区复制到应用程序的地址空间。缓存I/O使用操作系统内核缓冲区,在一定程度上分离了应用程序空间与实际的物理设备,它能够减少读取磁盘的次数,进而提高I/O效率。

  • 读操作:操作系统检查内核的缓冲区有没有需要的数据,如果已经缓存了,那么就直接从缓存中返回;否则从磁盘中读取,然后缓存在操作系统的缓存中。

读取: 硬盘 ->内核缓冲区 -> 用户缓冲区

  • 写操作:将数据从用户空间复制到内核空间的缓存中。这时对用户程序来说写操作就已经完成,至于什么时候再写到磁盘中由操作系统决定,除非显示地调用了sync同步命令。

写入: 用户缓冲区->内核缓冲区 ->硬盘

正常的系统调用read/write的流程如下:

read: 硬盘 ->内核缓冲区 -> 用户缓冲区
write: 数据会从用户地址空间拷贝到操作系统内核地址空间的page cache中,这时write就会直接返回,操作系统会在恰当的时候将其刷至磁盘。

缓存I/O的缺点:数据在传输过程中需要在应用程序地址空间和缓存之间进行多次数据拷贝操作,这些数据拷贝操作所带来的CPU以及内存开销是非常大的。

无论是磁盘IO还是网络IO,数据都要在内核空间的内存和用户空间内存之间拷贝传输。
以磁盘IO写入磁盘文件为例,数据不会直接从用户进程(用户空间)的内存直接写入磁盘,而是会先把数据从用户空间的内存拷贝到内核空间的缓冲区,再从内核缓冲区写入到磁盘。 而数据从内核缓冲区写入到磁盘的过程与用户进程是异步发生的,也就是说这个过程中用户进程完全可以干自己的事情而不用等待内核刷盘。网络IO同理,无论是读还是写,数据也都会经过内核的缓冲区。

那么综合以上的所有概念,我们简单的描述进程进行IO写操作的整体过程:
1.用户进程发起系统中断指令给CPU,用户进程暂停运行(即将让出CPU)
2.CPU根据系统中断指令查询中断向量表找到对应的系统调用
3.CPU保存好用户进程的上下文,从用户态切换到内核态(CPU的堆栈指针从指向用户空间的内存地址变为指向内核空间的内存地址)
4.数据从用户空间的内存拷贝到内核空间的内存(缓冲区)
5.内核执行相应的系统调用将数据从内核缓冲区写入磁盘文件(磁盘IO)或者发送给网络对端(网络IO)

那内核是如何进行IO交互的呢?

  1. 网卡收到经过网线传来的网络数据,并将网络数据写到内存中。
  2. 当网卡把数据写入到内存后,网卡向cpu发出一个中断信号,操作系统便能得知有新数据到来,再通过网卡中断程序去处理数据。
  3. 将内存中的网络数据写入到对应socket的接收缓冲区中。
  4. 当接收缓冲区的数据写好之后,应用程序开始进行数据处理。

Unix I/O 模型

有了上面介绍的铺垫,我们可以就可以来聊聊unix 的I/O模型了

在Linux(UNIX)操作系统中,共有五种IO模型,分别是:阻塞IO模型非阻塞IO模型IO复用模型信号驱动IO模型以及异步IO模型

阻塞I/O模型

套接字的默认状态是阻塞的,这就意味着当发出一个不能立即完成的套接字调用时,其进程被投入睡眠,等待相应操作完成才会进行唤醒。

image.png
如上图所示,应用程序通过系统调用(将recvfrom视为系统调用)recvfrom,其系统调用知道数据报大大且被复制到应用进程的缓冲区中或发生错误才返回。最常见的错误是系统调用被信号中断,我们说进程在从调用recvfrom开始到它返回的整段时间内是被阻塞的(进程无法做其他事情)。recvfrom成功返回后,应用程序进程开始处理数据报。

非阻塞I/O模型

image.png
进程把一个套接字设置成非阻塞再通知内核:当前所请求的I/O操作无需将本进程进行阻塞,而是返回一个错误。如上图前三次的调用recvfrom时没有数据可返回,因此内核转而立即返回一个EWOULDBLOCK错误。第四次调用recvfrom时已经有一个数据报准备好,它被复制到应用进程缓冲区,雨水recvfrom成功返回。接着进程继续处理数据。

当一个应用进程像这样对一个非阻寨描述符循环调用recvfrom时,我们称之为轮询(polling)。应用进程持续轮询内核,以查看某个操作是否就绪。这么做往往耗费大量CPU时间,不过这种模型偶尔也会遇到,通常是在专门提供菜一种功能的系统中才有。

I/O复用模型

Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP

image.png

I/O多路复用(I/O multiplexing)模型下可以调用selec、poll或epoll,阻塞在这两个系统调用中的某一个上,而不是阻塞真正的I/O系统调用上。我们阻塞在select调用,等待数据报套接字变为可读。当select返回套接字可读这一条件时,我们调用recvfrom把所读数据报复制到应用进程缓冲区。

I/O复用并不显得有什么优势,事实上由于使用select需要两个而不是单个系统调用,I/O复用还稍有劣势。实际上使用select的优势在于我们可以等待多个描述符就绪。

与I/O多路复用密切相关的另一种I/O模型是在多线程中使用阻塞式I/O,这种模型与上述模型极为相似,但它没有使用sclect阻寒在多个文件描述符上,而是使用多个线程(每个文件描迷符一个线程),这样每个线程都可以自由地调用诸如recvfrom之类的阻寒式I/O系统调用

Linux IO复用的底层实现参考:https://mikeygithub.github.io/2020/12/03/yuque/xzqx4t/

信号驱动I/O模型

我们也可以用信号,让内核在描述符就绪时发送SIGIO信号通知我们。我们称这种模型为信号驱动式I/O(signal-driven I/O),IO多路复用常用的方法有:select、poll以及epoll三种。IO多路复用的好处就在于单个process就可以同时处理多个网络连接的IO。

image.png
我们首先开启套接字的信号驱动式I/O功能,并通过sigaction系统调用安装一个型号处理函数。该系统调用将立即返回,我们的进程继续工作,也就是说它没有被阻塞。当数据准备好取读时,内核就为该进程产生一个SIGIO信号。我们随后既可以在型号处理函数中调用recvfrom读取数据报,并通知主循环数据已准备好带出来,也可以理解通知主循环,让它读取数据报。

无论如何处理SIGIO信号,这种模型的优势在于等待数据报到达期间进程不会被阻塞。主循环可以继续执行,只有等待来自信号处理函数的通知;既可以是数据已准备好被处理,也可以是数据报已准备好被读取。

异步I/O模型

异步I/O(asynchronous I/O)由POSIX规范定义。

这些函数的工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据冲内核复制到我们自己的缓冲区)完成后通知我们。

这种模型和信号驱动式模型的区别在于:信号驱动式I/O是由内核通知我们核实可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。

image.png

我们调用aio_read 函数(POSIX异步I/O函数以aio_或lio_开头),给内核传递描述符、缓冲区指针、缓冲区大小(与read相同的三个参数)和文件偏移(与lseek类似),并高手内核当整个操作完成时如何通知我们。该系统调用立即返回,并且在等待IO完成期间,我们的进程不被阻塞。

本例子中我们假设要求内核在操作完成时产生某个信号。该信号直到数据己复制到应用进程缓冲区才产生,这一点不同于信号驱动式VO模型,本书编写至此的时候,支持POSIX异步I0模型的系统仍较罕见。我们不能确定这样的系统是否支持套接宇上的这种模型。这儿我们只是用它作为一个与信号驱动式I/O模型相比照的例子

各种I/O模型的比较

image.png
根据上述定义,我们的前4种模型。阻塞式I/O模型、非阻塞式I/O模型、I/O复用模型和信号驱动式I/O模型都是同步I/O模型,因为其中真正的I/O操作 (recvfrom)将阻塞进程。只有异步I/O模型与POSIX定义的异步I/O相匹配。

I/O 模型概念

在了解I/O模型我们先要了解一下几个关键词,阻塞/非阻塞、同步/异步

同步/异步

同步 (synchronous communication)和异步(asynchronous communication)关注的是两端(调用端和接收端)之间的消息通信机制。(描述两个模块之间的关系)

同步

所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由调用者主动等待这个调用的结果。

异步

异步则与同步是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

阻塞/非阻塞

阻塞(Blocking)和非阻塞(Non Blocking)指的是调用者(程序)在等待返回结果(或输入)时的状态。(描述一个模块的情况)

阻塞

阻塞时,在调用结果返回前,当前线程会被挂起,并在得到结果之后返回。

非阻塞

非阻塞时,如果不能立刻得到结果,则该调用者不会阻塞当前线程。因此对应非阻塞的情况,调用者需要定时轮询查看处理状态。

两者的关系

先上结论

  • 同步/异步是描述两个模块之间的关系。
  • 阻塞/非阻塞是描述一个模块的情况。

看看知乎上的大佬这么说的

怎样理解阻塞非阻塞与同步异步的区别?

一个网络包从应用程序A发到另一台电脑上的应用程序B,需要经历:

  1. 从A的业务代码到A的软件框架
  2. 从A的软件框架到计算机的操作系统内核
  3. 从A所在计算机的内核到网卡
  4. 从网卡经过网线发到交换机等设备,层层转发,到达B所在计算机的网卡
  5. 从B所在计算机的网卡到B所在计算机的内核
  6. 从B所在计算机的内核到B的程序的用户空间
  7. 从B的软件框架到B的业务代码

这个层级关系就像是过程调用一样,前一级调用后一级的功能,后一级返回一个结果给前一级(比如:成功,或者失败)。只有在单独一级的调用上,可以说同步还是异步的问题。所谓同步,是指调用协议中结果在调用完成时返回,这样调用的过程中参与双方都处于一个状态同步的过程而异步,是指调用方发出请求就立即返回,请求甚至可能还没到达接收方,比如说放到了某个缓冲区中,等待对方取走或者第三方转交;而结果,则通过接收方主动推送,或调用方轮询来得到。

从这个定义中,我们看,首先1和7,这取決于软件框架的设计,如果软件框架可以beginxxX,然后立即返回,这就是一种异步调用,再比如javascript当中的异步HTTP调用,传入参数时提供一个回调函数,回调函数在完成时调用,再比如协程模型,调用接口后马上切换到其他协程继续执行,在完成时由框架切换回到协程中,这都是典型的异步接口设计。

而2和6,其实都需要调用方自己把数据在内核和用户空间里搬来搬去,其实都是同步接口,除非是10CP这样的专门的异步传输接口,所以这一级其实是同步的,阻塞与非阻塞的区别其实是影响调用接口的结果(在特定条件下是否提前返回结果),而不是调用方式。(就是说从接口形式上看其实都是同步的,都会在调用结束时返回结果,但是在暂时没有新数据的情况下,阻塞会等待到有新数据时再返回(接收端处理未完成情况),而非阻塞会立即返回“没有新数据”,其实是这个差别)。

3和5,内核一般通过缓冲区,使用DMI来传输数据,所以这一步又是异步的。

4,以太网是个同步时序逻辑,随信号传输时钟,必须两边设备同时就绪了才能开始传输数据,这叉是同步的。

总结来说,讨论究竟是异步还是同步,一定要严格说明说的是哪一部分。说非阻塞是同步而不是异步,这毫无疑问是正确的,然而说某个框架是异步I/O的框架,这也是正确的,因为说的其实是框架提供给业务代码的接口是异步的,不管是回调还是协程,比如说我们可以说某个库是异步的HTTPClient,并没有什么问题,因为说的是给业务代码的接口。由于通常异步的框架都需要在2中使用非阻塞的接口,的确会有很多人把非阻塞和异步混为一谈。

“同步/异步”、“阻塞/非阻塞”可以从字面上理解。所谓“同步”,是指“相同的步调”,既然是相同,必然涉及比较,那么就是关于两者(调用方与被调用方)的。也即是说,“同步/异步”描述的是两个模块之间的关系。所谓“阻塞”,是指“无法前进”了,“卡住了”,等待在那里而不能做其他事情了。也即是说,“阻塞/非阻塞”描述的是一个模块自身的运行状态。“同步/异步”、“阻塞/非阻塞”一个是描述两者的关系,另一个是描述一者的状态,所以其实是讲的不同的事情。

Java中对BIO/NIO/AIO的支持

在Linux 2.6以后,Java中NIO和AIO都是通过epoll来实现的,而在Windows上,AIO是通过IOCP来实现的。

可以把Java中的BIO、NIO和AIO理解为是Java语言对操作系统的各种IO模型的封装。程序员在使用这些API的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码。只需要使用Java的API就可以了。

Java中的IO都是依赖操作系统内核进行的,我们程序中的IO读写其实调用的是操作系统内核中的read&write两大系统调用。

BIO

BIO(Blocking IO)阻塞式IO。同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。

采用 BIO 通信模型的服务端,通常有一个独立的 Acceptor 线程负责监听客户端的连接,它接收到客户端的连接请求之后,为每个客户端创建一个新的线程进行链路处理,处理完之后,通过输出流返回应答客户端,线程销毁。这就是典型的一请求一应答通信模型。这个是在多线程情况下执行的。当在单线程环境条件下时,在 while 循环中服务端会调用 accept 方法等待接收客户端的连接请求,一旦收到这个连接请求,就可以建立 socket,并在 socket 上进行读写操作,此时不能再接收其他客户端的连接请求,只能等待同当前服务端连接的客户端的操作完成或者连接断开。

该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈 1:1 的正比关系,由于线程是 Java 虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。

适用场景

BIO 适用于连接数目比较小且固定的结构。它对服务器资源要求比较高,并发局限于应用中,JDK1.4之前唯一选择,但程序直观简单易理解,如之前在 Apache 中使用。

优点缺点

优点

  • 模型简单,编码效率高

缺点

  • 原始BIO负载能力低,因为读写方法都是阻塞,当前只能处理单一任务,即便是开了多线程或线程池,当海量任务时其可处理能力依然低下。
  • 每一个客户端建立连接后都需要创建独立的线程与客户端进行数据的读写,业务处理
  • 当并发数较大时,会创建大量的流程来处理连接,系统资源会出现很大的开销
  • 连接建立后,如果服务该客户端的线程没有数据可读时,线程则会阻塞在Read操作上,等待有数据后才读取,造成线程资源的浪费


采用 BIO 通信模型 的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。我们一般通过在 while(true)循环中服务端会调用 accept() 方法等待接收客户端的连接的方式监听请求,请求一旦接收到一个连接请求,就可以建立通信套接字在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,只能等待同当前连接的客户端的操作执行完成,不过可以通过多线程来支持多个客户端的连接

如果要让 BIO 通信模型 能够同时处理多个客户端请求,就必须使用多线程(主要原因是 socket.accept()、 socket.read()、 socket.write() 涉及的三个主要函数都是同步阻塞的),也就是说它在接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的 一请求一应答通信模型 。我们可以设想一下如果这个连接不做任何事情的话就会造成不必要的线程开销,不过可以通过 线程池机制 改善,线程池还可以让线程的创建和回收成本相对较低。使用FixedThreadPool 可以有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N(客户端请求数量):M(处理客户端请求的线程数量)的伪异步I/O模型(N 可以远远大于 M)

伪异步 IO,为了解决同步阻塞I/O面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化一一一后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N.通过线程池可以灵活地调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。

伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。不过因为它的底层任然是同步阻塞的BIO模型,因此无法从根本上解决问题。

实现案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package com.example.demo.io.bio;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;

import static java.nio.charset.StandardCharsets.UTF_8;

public class BioServer {
/**
* 原始BIO
*
* @param args
* @throws IOException
*/
public static void main0(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("0.0.0.0", 8888), 50);
Socket socket;
while ((socket = serverSocket.accept()) != null) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
try {
while ((line = bufferedReader.readLine()) != null) {
System.out.println(line);
OutputStream out = socket.getOutputStream();
out.write(line.getBytes(UTF_8));
}
} catch (Exception e) {
e.printStackTrace();
}
socket.close();
}
}

/**
* 同时处理多个客户端请求
* @param args
* @throws IOException
*/
public static void main1(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("0.0.0.0", 8888), 50);
Socket acceptSocket;
while ((acceptSocket = serverSocket.accept()) != null) {
Socket finalAcceptSocket = acceptSocket;
new Thread(() -> {
Socket socket = finalAcceptSocket;
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
while ((line = bufferedReader.readLine()) != null) {
System.out.println(line);
OutputStream out = socket.getOutputStream();
out.write(line.getBytes(UTF_8));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}

/**
* 伪异步 IO
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("0.0.0.0", 8888), 50);
Socket acceptSocket;
while ((acceptSocket = serverSocket.accept()) != null) {
Socket finalAcceptSocket = acceptSocket;
FutureTask<String> task = new FutureTask<>(() -> {
Socket socket = finalAcceptSocket;
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
while ((line = bufferedReader.readLine()) != null) {
System.out.println(line);
OutputStream out = socket.getOutputStream();
out.write(line.getBytes(UTF_8));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
});
executorService.submit(task);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class BioClient {
public static void main(String[] args) throws IOException, InterruptedException {
//模拟五个client
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
Socket socket = new Socket("127.0.0.1", 8888);
for (int j = 0; j < 10; j++) {
try {
socket.getOutputStream().write(String.format("%s %s \r\n",Thread.currentThread().getName(),DateUtils.getDate()).getBytes(UTF_8));
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}

NIO

Not Blocking IO 非阻塞式IO,NIO采用基于channel(管道)面向Buffer(缓冲区),采用Selector(多路复用器)的。

Buffer(缓冲区)

Buffer是一个对象,它包含一些要写入或者要读取的数据。在NIO类库中加入Buffer对象,体现了新库与原IO的一个重要的区别。在面向流的IO中,可以将数据直接写入或读取到Stream对象中。在NIO库中,所有数据都是用缓冲区处理的(读写)。缓冲区实质上是一个数组,通常它是一个字节数组(ByteBuffer),也可以使用他类型的数组。这个数组为缓冲区提供了数据的访问读写等操作属性,如位置、容量、上限等概念,参考api文档。

Buffer类型:我们最常用的就是ByteBuffer,实际上每一种java基木类型都对于了一种缓存区(除了Boolean类型),如下所示:

  1. ByteBuffer:字节缓冲区
  2. CharBuffer:字符缓冲区
  3. ShortBuffer:短整型缓冲区
  4. IntBuffer:整形缓冲区
  5. LongBuffer:长整形缓冲区
  6. FloatBuffer:浮点型缓冲区
  7. DoubleBuffer:双精度浮点型缓冲区

缓冲区的类图继承关系如下所示:
Package nio.png

Channel (管道、通道)

通道(Channel),它就像自来水管道一样,网络数据通过Channel读取和写入,通道与流不同之处在于通道是双向的,而流只是一个方向上移动(一个流必须是Inputstream或OutputStream的子类),而通道可以用于读、写或者二者同时进行,最关键的是可以与多路复用器结合起来,有多种的状态位,方便多路复用器去识别。

事实上通道分为两大类,一类是网络读写的(SelectableChanneI),一类是用于文件操作的(FileChannel),我们使用MJSocketChanneI和ServerSockerChannel都是SelectableChannel的子类。

Selector(选择器、多路复用器)

多路复用器(Selector),他是NIO编程的基础,非常重要。多路复用器提供选择己经就绪的任务的能力。 简单说,就是Selctor会不断地轮询注册在其上的通道(Channel),如果某个通道发生了读写操作,这个通道就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以取得就绪的Channel集合,从而进行后续的IO操作。

一个多路复用器(Selector)可以负责成千上万Channel通道,没有上限,这也是JDK使用了epoll代了传统的select实现,获得连接句柄没有限制。这也就意味着我们只要一个线程负责轮询,就可以接入成千上万个客户端,这是JDK NIO库的巨大进步。

Selector线程就类似一个管理者(Master),管理了成千上万个管道,然后轮询那个管道的数据己经准备好,通知cpu执行IO的读取或写入操作。

Selector模式
当IO事件(管道)注册到选择器以后,Selector会分配给和个管道一个key值,相当于标签。Selector选择器是以轮询的方式进行查找注册的所有IO事件(管道),当我们的IO事件(管道)准备就绪后,select就会识别,会通过key值来找到相应的管道,进行相关的数据处理操作(从管道里读或写数据,写到我们的数据缓冲区中)。

每个管道都会对选择器进行注册不同的事件状态,以便选择器查找。

1
2
3
4
SelectionKey.OP_CONNECT//socket连接
SelectionKey.OP_ACCEPT//socket接收
SelectionKey.OP_READ//读取
SelectionKey.OP_WRlTE//写入

适用场景

NIO 适用于连接数目多且连接比较短的架构,比如聊天服务器,并发局限于应用中,变成比较复杂。JDK1.4开始支持,如在 Nginx、Netty 中使用。

优点缺点

优点

  • 客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞;
  • SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样IO通信线程就可以处理其它的链路,不需要同步等待这个链路可用;
  • 线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,因此,它非常适合做高性能、高负载的网络服务器。

缺点

  • 编程复杂、编程模型难、维护成本高

为什么大家都不愿意用 JDK 原生 NIO 进行开发呢?从上面的代码中大家都可以看出来,是真的难用!除了编程复杂、编程模型难之外,它还有以下让人诟病的问题:

  • JDK 的 NIO 底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%
  • 项目庞大之后,自行实现的 NIO 很容易出现各类 bug,维护成本较高,Netty 的出现很大程度上改善了 JDK 原生 NIO 所存在的一些让人难以忍受的问题。

实现案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.example.demo.io.nio;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioReadFile {
public static void main(String args[]) throws IOException {
RandomAccessFile file = new RandomAccessFile("/Users/biaoyang/IdeaProjects/demo/src/main/java/com/example/demo/io/nio/NioClient.java", "r");
FileChannel fileChannel = file.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while (fileChannel.read(byteBuffer) > 0) {
// 翻转缓冲区以准备get操作
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.print((char) byteBuffer.get());
}
}
file.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.example.demo.io.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class NioServer {
//缓冲区大小
private int BLOCK = 4096;
//接收数据缓冲区
private ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
//发送数据缓冲区
private ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK);
//io复用器
private Selector selector;

public NioServer(int port) throws IOException{
/**
* 以下的所有说明均已linux系统底层进行说明:
* nio 的底层实现是 epoll 模式,采用多路复用技术,对nio的代码进行深入分析,结合epoll的底层实现
* 进行详细的说明
* 1.linux网络编程是两个进程之间的通信,跨集群合网络
* 2.开启一个socket线程,在linux系统上任何操作均以文件句柄数表示,默认情况下
* 一个线程可以打开1024个句柄,也就说最多同时支持1024个网络连接请求。阿里云默认打开65535个文件
* 句柄,通常情况下,1G内存最多可以打开10w个句柄数
*/
//打开服务器套接字通道
//底层;在linux上面开启socket服务,启动一个线程,绑定ip地址和端口号
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//服务器配置为非阻塞
serverSocketChannel.configureBlocking(false);
//检索与此通道关联的服务器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
//进行服务绑定
serverSocket.bind(new InetSocketAddress(port));
//通过open()方法找到Selector
//底层:开启epoll,为当前socket服务创建epoll服务,epoll_create
selector = Selector.open();
//注册到selector
/**
* 底层:
* 1.将当前的epoll,服务器地址,端口号绑定,如果有连接请求,直接添加到epoll中,epoll的底层是红黑树,
* 可以快速的实现连接的查找和状态更新。如果有新的连接过来,直接存放到epoll中。如果有连接过期,中断,
* 会从epoll中删除。
* 2.通过epoll_ctl添加到epoll的同时,会注册一个回调函数给内核,当网卡有数据来的时候,会通知内核,内核
* 调用回调函数,将当前内核数据的事件状态添加到list链表中
*/
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("服务器启动 端口:"+port);
}

//监听
private void listen() throws IOException{
while (true){
//选择一组键,并且相应得通道已经打开
/**
* epoll底层维护一个链表,rdlist,基于事件驱动模式,当网卡有数据请求过来,会发起硬件中断,通知内核已经有来了。内核调用
* 回调函数,将当前的事件添加到rdlist中,将当前可用的rdlist列表发送给用户态,用户去遍历rdlist中的事件,进行处理
*/
int readyChannels = selector.select();
if(readyChannels == 0) continue;
//返回此选择器得已选择键集
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
//获取当前epoll的rdlist复制到用户态,遍历,同时删除当前rdlist中的事件
iterator.remove();
handleKEY(selectionKey);
}
}
}

//处理请求
private void handleKEY(SelectionKey selectionKey) throws IOException {
//接受请求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText;
String sendText;
int count = 0;
if(selectionKey.isAcceptable()){
System.out.println("当前通道已准备好接受新的套接字连接");
//返回为止创建此键的通道
server = (ServerSocketChannel) selectionKey.channel();
//接受次通道套接字的连接
//此方法返回的套接字通道(如果有)将处于阻塞模式
client = server.accept();
//配置为非阻塞
client.configureBlocking(false);
//注册到selector,等待连接
client.register(selector,SelectionKey.OP_READ);
}else if(selectionKey.isReadable()){
//返回为之创建此键的通道
client = (SocketChannel) selectionKey.channel();
//将缓冲区清空以备下次读取
receiveBuffer.clear();
//读取服务器发送来的数据到缓冲区
try {
count = client.read(receiveBuffer);
}catch (Exception e){}

if(count>0){
receiveText = new String(receiveBuffer.array(),0,count);
System.out.println("服务器端接受客户端数据 :"+receiveText);
client.register(selector,SelectionKey.OP_WRITE);
}
}else if(selectionKey.isWritable()){
//将缓冲区清空以备下次写入
sendBuffer.clear();
//返回为之创建此键的通道
client = (SocketChannel) selectionKey.channel();
sendText = "message form server " + new Date();
//向缓冲区中输入数据
sendBuffer.put(sendText.getBytes());
//将缓冲区个标志复位,因为李米娜put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendBuffer.flip();
//输出到通道
client.write(sendBuffer);
System.out.println("服务器端向客服端发送数据: "+sendText);
client.register(selector,SelectionKey.OP_READ);
}
}

public static void main(String[] args) throws IOException {
int port = 8888;
NioServer server = new NioServer(port);
server.listen();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.example.demo.io.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class NioClient {

private static boolean isEnding = false;
//缓冲区大小
private static int BLOCK = 4096;
//接受数据缓冲区
private static ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
//发送数据缓冲区
private static ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK);
//服务器地址
private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress("localhost",8888);

public static void main(String[] args) throws IOException {
//打开socker通道
SocketChannel socketChannel = SocketChannel.open();
//设置为非阻塞模式
socketChannel.configureBlocking(false);
//打开选择器
Selector selector = Selector.open();
//注册连接服务端socket动作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
//连接
socketChannel.connect(SERVER_ADDRESS);
//分配缓冲区内存大小
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String receiveText;
String sendText;
int count = 0;
while (!isEnding){
//选择一组键,其对应的通道已为 I/O 操作准备就绪
//此方法执行处于阻塞模式的选择操作
selector.select();
//返回此选择器的已选择的键集
selectionKeys = selector.selectedKeys();
iterator = selectionKeys.iterator();
while (iterator.hasNext()){
selectionKey = iterator.next();
if(selectionKey.isConnectable()){
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
//判断此通道伤是否正在进行连接操作
//完成套接字通道的连接过程
if(client.isConnectionPending()){
client.finishConnect();
System.out.println("完成连接!");
sendBuffer.clear();
sendBuffer.put("Hello,Server".getBytes());
sendBuffer.flip();
client.write(sendBuffer);
}
client.register(selector,SelectionKey.OP_READ);
}else if(selectionKey.isReadable()){
client = (SocketChannel) selectionKey.channel();
receiveBuffer.clear();
count = client.read(receiveBuffer);
if(count >0){
receiveText = new String(receiveBuffer.array(),0,count);
System.out.println("客户端接受服务器端的数据: "+receiveText);
client.register(selector,SelectionKey.OP_WRITE);
}
}else if (selectionKey.isWritable()) {
sendBuffer.clear();
client = (SocketChannel) selectionKey.channel();
sendText = "message from client " + new Date();
sendBuffer.put(sendText.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendBuffer.flip();
client.write(sendBuffer);
System.out.println("客户端向服务器端发送数据: "+sendText);
client.register(selector, SelectionKey.OP_READ);
//isEnding = true;
}
}
selectionKeys.clear();
}
selector.close();
socketChannel.close();
}
}

更多案例参考:https://www.tutorialspoint.com/java_nio/java_nio_quick_guide.htm

AIO

JDK1.7 升级了 NIO 类库,升级后的 NIO 类库被称为NIO2.0。也就是我们要介绍的 AIO。NIO2.0 引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。

异步通道提供两种方式获取操作结果。

  • 通过 Java.util.concurrent.Future 类来表示异步操作的结果;
  • 在执行异步操作的时候传入一个Java.nio.channels.CompletionHandler接口的实现类作为操作完成的回调。

NIO2.0 的异步套接字通道是真正的异步非阻塞 IO,它对应 UNIX 网络编程中的事件驱动 IO(AIO),它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了 NIO 的编程模型。

Package channels.png

结论:异步 Socket Channel是被动执行对象,我们不需要想NIO编程那样创建一个独立的IO线程来处理读写操作。对于AsynchronousServerSocketChannel和AsynchronousSocketChannel,它们都由 JDK 底层的线程池负责回调并驱动读写操作。正因为如此,基于 NIO2.0 新的异步非阻塞 Channel 进行编程比 NIO 编程更为简单。

适用场景

AIO 适用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持,在成长中,Netty 曾经使用过,后来放弃。

优点缺点

优点

  • 支持连接数目多

缺点

  • 编程比较复杂

实现案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package com.example.demo.io.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AioServer implements Runnable {

Charset charset = Charset.forName("UTF-8");

public static void main(String[] args) throws InterruptedException {
int port = 7890;
new Thread(new AioServer(port)).start();
TimeUnit.MINUTES.sleep(60);
}

int port;
AsynchronousChannelGroup group;
AsynchronousServerSocketChannel serverSocketChannel;

public AioServer(int port) {
this.port = port;
init();
}

public void init() {
try {
// 创建处理线程池
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
// 创建服务channel
serverSocketChannel = AsynchronousServerSocketChannel.open(group);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
// 接收请求
// accept的第一个参数附件,第二个参数是收到请求后的接收处理器
// 接收处理器AcceptHandler泛型的第一个参数的处理结果,这里是AsynchronousSocketChannel,即接收到的请求的channel
// 第二个参数是附件,这里是AioServer,即其实例
serverSocketChannel.accept(this, new AcceptHandler());
}


/**
* 接收请求处理器
* completionHandler泛型的第一个参数的处理结果,这里是AsynchronousSocketChannel,即接收到的请求的channel,
* 第二个参数是附件,这里是AioServer,即其实例
*/
class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> {

@Override
public void completed(AsynchronousSocketChannel result, AioServer attachment) {
// 继续接收下一个请求,构成循环调用
attachment.serverSocketChannel.accept(attachment, this);
try {
System.out.println("接收到连接请求:" + result.getRemoteAddress().toString());
// 定义数据读取缓存
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
// 读取数据,并传入数据到达时的处理器
// read的第一个参数数据读取到目标缓存,第二个参数是附件,第三个传输的读取结束后的处理器
// 读取处理器泛型的第一个参数是读取的字节数,第二个参数输附件对象
result.read(buffer, buffer, new ReadHandler(result));

// 新开新城发送数据
new Thread(new WriteThread(result)).start();
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, AioServer attachment) {

}
}

/**
* 读取数据处理器
* completionHandler第一个参数是读取的字节数,第二个参数输附件对象
*/
class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {

AsynchronousSocketChannel socketChannel;

public ReadHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result == -1) {
attachment.clear();
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
return;
}
attachment.flip();
String readMsg = charset.decode(attachment).toString();
System.out.println("服务端接收到的数据:" + readMsg);
attachment.compact();

// 继续接收数据,构成循环
socketChannel.read(attachment, attachment, this);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
}

/**
* 写出数据处理器
*/
class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel socketChannel;
Scanner scanner;

public WriteHandler(AsynchronousSocketChannel socketChannel, Scanner scanner) {
this.socketChannel = socketChannel;
this.scanner = scanner;
}


@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.compact();
String msg = scanner.nextLine();

System.out.println("服务端即将发送的数据:" + msg);
attachment.put(charset.encode(msg));
attachment.flip();

// 继续写数据,构成循环
socketChannel.write(attachment, attachment, this);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
}

class WriteThread implements Runnable {

private AsynchronousSocketChannel channel;

public WriteThread(AsynchronousSocketChannel channel) {
this.channel = channel;
}

@Override
public void run() {
// 第一缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
String msg = scanner.nextLine();
System.out.println("服务端输入数据:" + msg);
buffer.put(charset.encode(msg + System.lineSeparator()));
buffer.flip();
// 写入数据,并有写数据时的处理器
// write的第一个参数是数据写入的缓存,第二个参数是附件,第三个参数写结束后的处理器
// 读取处理器泛型的第一个参数是写入的字节数,第二个是附件类型
channel.write(buffer, buffer, new WriteHandler(channel, scanner));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package com.example.demo.io.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AioClient {

static Charset charset = Charset.forName("UTF-8");

public static void main(String[] args) throws InterruptedException {
int port = 7890;
String host = "127.0.0.1";
// 启动客户端
new Thread(new AIOClient(port, host)).start();
TimeUnit.MINUTES.sleep(100);
}

static class AIOClient implements Runnable {

int port;
String host;
AsynchronousChannelGroup group;
AsynchronousSocketChannel channel;
InetSocketAddress address;

public AIOClient(int port, String host) {
this.port = port;
this.host = host;
// 初始化
init();
}

private void init() {
try {
// 创建处理线程组
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
// 创建客户端channel
channel = AsynchronousSocketChannel.open(group);
address = new InetSocketAddress(host, port);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
// 接收请求,并传入收到请求后的处理器
// connect 方法的第一二个参数是目标地址,第二个参数是附件对象,第三个参数是连接处理器
// 连接处理器的泛型的第一个参数为空(即Void),第二个参数为附件
channel.connect(address, channel, new ConnectHandler());
}
}

/**
* 连接处理器
*/
static class ConnectHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {

@Override
public void completed(Void result, AsynchronousSocketChannel attachment) {
try {
System.out.println("connect server: " + attachment.getRemoteAddress().toString());
// 定义数据读取缓存
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取数据,并传入到数据到达时的处理器
attachment.read(buffer, buffer, new ReadHandler(attachment));
// 新开线程,发送数据
new WriteThread(attachment).start();
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {

}
}

/**
* 读处理器
*/
static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String readMsg = charset.decode(attachment).toString();
System.out.println("client receive msg: " + readMsg);
attachment.compact();
// 继续接收数据,构成循坏
channel.read(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}

/**
* 写处理器
*/
static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel channel;
Scanner scanner;

public WriteHandler(AsynchronousSocketChannel channel, Scanner scanner) {
this.channel = channel;
this.scanner = scanner;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.compact();
System.out.print("client input data: ");
String msg = scanner.nextLine();
System.out.println("clinet will send msg:" + msg);
attachment.put(charset.encode(msg));
attachment.flip();
// 继续写入数据,构成循环
channel.write(attachment, attachment, this);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
}


/**
* 写处理独立创建线程
*/
static class WriteThread extends Thread {

private AsynchronousSocketChannel channel;

public WriteThread(AsynchronousSocketChannel channel) {
this.channel = channel;
}

@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
System.out.print("client input data:");
String msg = scanner.nextLine();
System.out.println("client send msg:" + msg);
buffer.put(charset.encode(msg));
buffer.flip();
channel.write(buffer, buffer, new WriteHandler(channel, scanner));
}
}
}

相关资料

  1. https://www.zbpblog.com/blog-209.html
  2. https://programs.team/understand-what-bio-nio-aio-is.html
  3. https://www.cnblogs.com/henuliulei/p/15143649.html
  4. https://www.zhihu.com/question/19732473
  5. https://developer.aliyun.com/article/726698#slide-11
  6. https://blog.csdn.net/m0_38109046/article/details/89449305
  7. 漫话:如何给女朋友解释什么是Linux的五种IO模型?
  8. https://jenkov.com/tutorials/java-nio/index.html
  9. PCB与进程分配资源、虚拟内存与物理内存、进程共享
  10. 《UNIX网络编程卷1:套接字联网API(第3版)》

学习笔记-I/O模型详解
https://mikeygithub.github.io/2022/06/04/yuque/笔记篇-I!O模型详解/
作者
Mikey
发布于
2022年6月4日
许可协议