Java网络编程教程|Netty深入浅出全套教程——NIO基础(一)
non-blocking io 非阻塞IO
三大组件
Channel
channel优点类似于stream,它就是读写数据的双向通道,可以从channel将数据读写到buffer,也可以将buffer的数据读写到channel,而之前的stream要么是输入,要么是输出,channel比stream更为底层
常用的channel有:
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
Buffer
buffer用来缓冲数据
常见大的buffer有:
- ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
Selector
选择器,selector单从字面以上不好理解,需要结合服务器的设计宴会来理解它的用途
服务器设置演化过程
多线程版设计
多线程版缺点
服务端每来一个请求创建一个线程去处理,每个线程的创建都是需要成本的,会占用一定的内存,如果同时有大量的请求进来,就会创建大量的线程,占用大量内存,CPU因为核数有限,所以同时能够处理的线程是有限的,线程过多就会导致CPU不停的切换线程进行执行,每次切换都需要把原有线程的数据保存,然后加载新线程的数据
- 内存占用高
- 线程上下文切换成本高
- 只适合连接数少的场景
线程池版设计
线程池版缺点
同一个线程在同一事件仅能处理一个请求,如果这个请求需要处理的内容比较耗时,就会影响到其他的请求的处理,同时如果有一些下载操作或者其他的需要长连接的操作,也会导致一个请求长时间独占一个线程,因为线程池中的线程是有一定数量的,如果有大量这种请求,就会导致剩余的请求无线程进行处理
- 阻塞模式下,线程同一时间仅能处理一个socket连接
- 仅适合短链接场景
selector版设计
selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上面。适合连接数特别多,但流量低的场景(low traffic)
调用selector的select()会阻塞直到channel发生了读写就绪事件,这些事件发生,select方法就会返回这些事件交给thread来处理
selector可以监控多个channel,当某一个channel上面发生了事件,就会通知selector,selector就会调用线程去处理,但是如果一个channel发送了大量的数据,就会导致线程长时间处理当前的事件,导致其他的事件等待
ByteBuffer
ByteBuffer正确的使用姿势
- 向
Buffer写入数据,列如调用channel.read(buffer) - 调用
flip()切换至读模式 - 从
buffer读取数据,列如调用buffer.get() - 调用
clear()或compact()切换至写模式 - 重复上面步骤
ByteBuffer结构
ByteBuffer有以下重要属性
capacity容量,buffer里面能装多少数据position读写指针limit读写的限制,应该读多少字节,应该写多少字节
写入数据过程:
一开始,假设指定buffer的容量是10,那么capacity就是10,因为最开始buffer是空的,所以最开始的时候,position指针就在最开始的位置,至于能写的数目,最开始是跟整个容量相同的,所以此时limit等于capacity
写模式下,position是写入位置,limit等于容量,下图标识写入了4个字节后的状态
每写入一个字节,position都会向后移动以一位
flip动作发生后(切换到读模式之后),position切换为读取位置,limit切换为读取限制
假设如下图,buffer中写入了4个字节,调用flip方法切换到读模式之后,会将position指针的位置切换到0,因为读取数据需要从第一位开始读,limit会被指向4,因为当前只有四个元素,后面的空位读取也没有意义
读取4个字节后,状态
假设是调用get方法,每次读取一个字节,调用一次position指针会从0指向1,再次调用会从1指向2,一直到和limit相同之后代表已经读完当前buffer中的数据
clear动作发生后,状态
相当于是将整个buffer重置,position回到起始位置,limit也和capacity相同,表示当前buffer是空的,全部空间都可以写入
compact方法,是把未读完的部分向前压缩,然后切换到写模式
假设由于一些原因,可能当前buffer中的数据还没有读完,就需要再次写入,假设有四个字节的数据,当前只读了两个,调用compact方法后,会将前面两个已经读取过的数据清除,后面未读取的数据前移,同时position指针则指向第三个位置,表示只能从这个位置开始写入,limit同样和capacity相同,表示可以写到这个位置
方法演示
用到了下面的工具类
1 | import java.nio.ByteBuffer; |
具体测试代码如下所示:
1 | public class TestByteBufferReadWrite { |
输出结果如下:(添加了一些描述)
1 | //写入了0x61 也就是字符a,position指针指向1,表示下一个数据从1开始插入 |
ByteBuffer常见的方法
分配空间
可以使用allocate方法为ByteBuffer分配空间,其他的Buffer也有类似的方法buffer分配空间后不能动态调整空间大小,空间大固定
1 | ByteBuffer buffer = ByteBuffer.allocate(10); |
allocate()和allocateDirect()方法
| 方法 | 类型 | 内存 | 效率 | GC | 分配 |
|---|---|---|---|---|---|
| allocate() | class java.nio.HeapByteBuffer | java堆内存 | 读写效率较低 | 受GC影响 | |
| allocateDirect() | class java.nio.DirectByteBuffer | 直接内存 | 读写效率高(少一次拷贝) | 不受GC影响(因为不受GC控制,所以如果不能及时回收会占用大量服务器内存) | 分配的效率低(选哟调用服务器函数分配) |
向buffer写入数据
有两种方法:
- 调用
channel的read方法 - 调用
buffer的put方法和1
int readBytes = channel.read(buf);
1
buf.read((byte)127);
从buffer中读取数据
同样有两种放法:
- 调用
channel的write方法 - 嗲用
buffer自己的get方法
1 | int writeBytes = channel.write(buf); |
和
1 | byte b = buf.get(); |
get方法会然让position指针向后走,如果想要重复读取数据
- 可以调用
rewind方法将position重置为0 - 或者调用
get(int i)方法获取索引i的内容,它不会移动指针
mark 和 reset
mark是在读取时,做一个标记,即使position改变,只要调用reset就能回到mark的位置
注意:rewind和flip都会清除mark位置
方法演示
rewind
1 | public class ByteBufferRead { |
mark & reset
1 | public class ByteBufferRead { |
get(i)
1 | public class ByteBufferRead { |
字符串和ByteBuffer之间互相转换
1 | public class TestByteBufferString { |
Buffer的线程安全
buffer是非线程安全的
Scattering Reads
分散读取,有一个文本test.txt
如下数据,可以选择一次把数据读取到一个buffer中,然后再进行分割,不过因为每段的长度是已经知道的,所以可以直接读取到三个buffer中
1 | onetwothree |
使用如下方式读取,可以将数据填充至多个buffer
1 | public class TestScatteringReads { |
Gathering Write
集中写,假设需要把多个buffer写入到同一个文件中,可以先把多个buffer汇总成一个buffer,也可以使用下面的方法一次性写入
1 | public class TestGatheringWrites { |
黏包/半包
1 | public class TestByteBufferExam { |
文件编程
FileChannel
FileChannel只能工作在阻塞模式下
获取 FileChannel
不能直接打开FileChannel,必须通过FileInputStream、FileOutputStream或者RandomAccessFile来获取FileChannel,它们都有getChannel方法
- 通过
FileInputStream获取的channel只能读 - 通过
FileOutputStream获取的channel只能写 - 通过
RandomAccessFile获取的channel是否能读写根据构造RandomAccessFile时的读写模式决定
读取
会从channel读取数据填充ByteBuffer,返回值表示读到了多少字节,-1表示到达了文件的末尾
1 | int readBytes = channel.read(buffer); |
写入
写入的正确姿势如下:
1 | ByteBuffer buffer = ...; |
在while中调用channel.write是因为write方法并不能包装一次将buffer中的内容全部写入到channel中
关闭
channel必须关闭,不过调用了FileInputStream、FileOutputStream或者RandomAccessFile的close方法会间接的调用channel的close方法
位置
获取当前位置:
1 | long pos = channel.position(); |
设置当前位
1 | long newPos = ...; |
设置当前位置时,如果设置为文件的末尾
- 这时读取会返回-1
- 这时写入,会追加内容,但要注意如果
position超过了文件末尾,再写入时会在新内容和原末尾之间有空洞(00)
大小
使用size方法获取文件的大小
强制写入
操作系统出于性能考虑,会将数据缓存,不是立刻写入磁盘,可以体哦啊用fore(true)方法将文件内容和元数据(文件的权限等信息)立即写入磁盘
两个 Channel 传输数据
从channel1中把数据传输到channel2中(1传输到2)transferTo
传输文件有限制,最大只能是2G
1 | channel1.transferTo(0,channel.size(),channel2); |
三个参数position: 起始位置count: 传输数据量target: 传输到哪个Channel
1 | public abstract long transferTo(long position, long count,WritableByteChannel target) throws IOException; |
1 | public class TestFileChannelTransferTo { |
Path
jdk7引入了Path 和Paths类
Path用来表示文件路径Paths是工具类,用来获取Path实例1
2
3
4Path source = Paths.get("1.txt"); //相对路径,使用user.dir环境变量来定位1.txt
Path source = Paths.get("D:\\1.txt"); //绝对路径代表了 D:\1.txt
Path source = Paths.get("D:/1.txt"); //绝对路径,同样代表了 D:\1.txt
Path source = Paths.get("D:\\data","projects"); //代表了 D:\data\projects.代表了当前路径..代表了上一级路径1
2
3Path path = Paths.get("D:\\data\\projects\\a\\..\\b"); //寻找a目录上级目录下的b目录
System.out.println(path);
System.out.println(path.normalize());//正常化路径1
2D:\data\projects\a\..\b
D:\data\projects\b
Files
检测文件是否存在
1
2Path path = Paths.get("test/data.txt");
System.out.println(Files.exists(path));创建一级目录
- 如果目录已经存在,会抛出异常
FileAlreadyExistsException - 不能一次创建多级目录,否则会抛出异常
NoSuchFileException1
2Path path = Paths.get("test/data");
Files.createDirectory(path);
- 如果目录已经存在,会抛出异常
创建多级目录
1
2Path path = Paths.get("test/data/da");
Files.createDirectories(path);拷贝文件
1
2
3
4Path source = Paths.get("test/data.txt");
Path target = Paths.get("test/target.txt");
Files.copy(source,target);如果文件已经存在,会抛出异常
FileAlreadyExistsException
如果希望用source覆盖掉target ,需要用StandardCopyOption来控制1
Files.copy(source,target, StandardCopyOption.REPLACE_EXISTING);
循环拷贝整个文件夹及其下面的路径:
这里使用的是Files.walk()方法,使用Files.walkFileTree()方法也可以达到同样的效果1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class TestFilesCopy {
public static void main(String[] args) throws IOException {
String source = "D:\\export";
String target = "D:\\export-aaa";
Files.walk(Paths.get(source)).forEach(path->{
try {
//获取文件或者文件夹在新文件夹下的全路径名
String targetName = path.toString().replace(source,target);
//如果循环的是目录
if (Files.isDirectory(path)){
//如果是目录,就在新文件夹内创建目录
Files.createDirectory(Paths.get(targetName));
}else if (Files.isRegularFile(path)){
//如果循环到的是文件,就拷贝到新文件夹下
Files.copy(path,Paths.get(targetName));
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}移动文件
StandardCopyOption.ATOMIC_MOVE保证移动文件的时候的原子性1
2
3
4Path source = Paths.get("test/data.txt");
Path target = Paths.get("test1/data.txt");
Files.move(source,target,StandardCopyOption.ATOMIC_MOVE);
删除文件
1
2Path source = Paths.get("test/data.txt");
Files.delete(source);删除目录
- 如果目录中还有内容存在,会抛出异常
DirectoryNotEmptyException - 想要删除非空目录需要先把目录里面的内容全部删除之后才行
1
2Path source = Paths.get("test/data");
Files.delete(source);
- 如果目录中还有内容存在,会抛出异常
遍历文件夹及其子目录和文件
- 如下所示,该方法一共有两个参数
- 第一个参数是需要遍历的文件夹的根目录
- 第二个参数是通过访问者模式实现,通过对具体方法的重写,达到在遍历每个阶段插入自己代码逻辑的目的示例如下,使用的是
1
public static Path walkFileTree(Path start, FileVisitor<? super Path> visitor) throws IOException{}
SimpleFileVisitor类来实现逻辑,FileVisitor接口有四个方法,分别如下:
preVisitDirectory():在遍历到文件夹前会调用该方法visitFile():在遍历到文件时会调用该方法visitFileFailed():遍历文件失败时会调用该方法postVisitDirectory():遍历文件夹之后会调用该方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67public class TestFilesWalkFileTree {
//删除文件夹下的所有文件
public static void main(String[] args) throws IOException {
Files.walkFileTree(Paths.get("E:\\export"),new SimpleFileVisitor<Path>(){
//先删除文件
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}
//遍历文件夹结束后删除文件夹,因为遍历文件夹里面的文件的时候,已经把文件全部删除了
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});
}
//遍历文件夹下所有jar包文件
public static void main2(String[] args) throws IOException {
AtomicInteger jarCount = new AtomicInteger(0);
Files.walkFileTree(Paths.get("D:\\jdk\\jdk8"),new SimpleFileVisitor<Path>(){
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (file.toString().endsWith(".jar")){
System.out.println(file);
jarCount.incrementAndGet();
}
return super.visitFile(file, attrs);
}
});
System.out.println("jar count:"+jarCount.get());
}
//遍历文件夹下所有的文件和文件夹
public static void main1(String[] args) throws IOException {
AtomicInteger dirCount = new AtomicInteger(0);
AtomicInteger fileCount = new AtomicInteger(0);
Files.walkFileTree(Paths.get("D:\\jdk\\jdk8"),new SimpleFileVisitor<Path>(){
//遍历到文件夹前,会执行该方法
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("=====>"+dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
//遍历到文件会执行该方法
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
//遍历文件异常时会调用
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return super.visitFileFailed(file, exc);
}
//遍历文件夹后会调用
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return super.postVisitDirectory(dir, exc);
}
});
System.out.println("dir count:"+dirCount.get());
System.out.println("file count:"+fileCount.get());
}
}
网络编程
阻塞 VS 非阻塞
阻塞
- 阻塞模式下,相关方法都会导致线程暂停
ServerSocketChannel.accept()会在没有连接建立时让线程暂停SocketChannel.read()会在没有数据可读时让线程暂停- 阻塞的表现其实就是线程暂停了,暂停期间不会占用cpu,但线程相当于闲置
- 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
- 但多线程下,有新的问题,体现在一下几个方面
- 32位JVM一个线程320k,64位JVM一个线程1024k,如果连接数过多,必然导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
- 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间
inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
阻塞模式,单线程方式,如下所示:
调用ssc.accept()方法的时候会等待新的连接建立,陷入阻塞中
当连接建立后,channel.read(buffer),从channel中读取数据的时候同样会陷入阻塞
假设,我们创建了一个连接,然后发送了一条消息,这个过程是没有问题的
但是因为只有一个线程,当进入下一次循环的时候,会继续调用ssc.accept()方法等待连接,这个时候,第一个建立连接的方法再次发送消息,是收不到的,
只能等再次有新的连接建立后才能收到消息
假设第一个连接建立之后,并没有发消息,同样会在read方法进行阻塞,这个时候在有新的连接建立同样是接收不到的,必须等第一个连接发送了消息之后才行
1 |
|
1 | public class Client { |
非阻塞
- 非阻塞模式下,相关方法都不会让线程暂停
- 在
ServerSocketChannel.accept()在没有连接建立时,会返回null,继续运行 SocketChannel.read()在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannel的read方法或是去执行ServerSocketChannel.accept()方法- 写数据时,线程只是等待数据写入
Channel即可,无需等Channel通过网络把数据发送出去
- 在
- 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了CPU
- 数据复制过程中,线程事件还是阻塞的(AIO改进的地方)
非阻塞模式实现如下:
主要是通过configureBlocking(false)方法,将参数设置为false切换
针对ServerSocketChannel设置,影响的是ssc.accept()方法,如果没有连接,线程在执行该方法后,不会阻塞,会立即返回null
针对SocketChannel设置,影响的是channel.read(buffer)方法,如果没有数据,线程在执行了该方法后,不会阻塞,会立即返回0
线程会不停的循环
1 |
|
(Selector)多路复用
单线程可以配合Selector完成对多个Channel可读写事件的监控,这称之为多路复用
- 多路复用仅针对网络IO、普通文件IO没法录用多路复用
- 如果不用
Selector的非阻塞模式,线程大部分时间都是在做无用功,而Selector能保证- 有可连接事件时才去连接
- 有可读事件才去读取
- 有可写事件才去写入
- 限于网络传输能力,
Channel未必时时可写,一旦Channel可写,会触发Selector的可写事件
好处:
- 一个线程配合
selector就可以监控多个channel的事件,事件发生线程才去处理,避免非阻塞模式下做无用功 - 让这个线程能够被充分利用
- 节约了线程的数量
- 减少了线程上下文切换
创建
1 | Selector selector = Selector.open(); |
绑定Channel事件
也称之为注册事件,绑定的事件Selector才会关心
1 | channel.configureBlocking(false); |
channel必须工作在非阻塞模式FileChannel没有非阻塞模式,因此不能配合Selector一起使用- 绑定的事件类型可以有:
connect:客户端连接成功时触发accept:服务端成功接受连接时触发read:数据可读入时触发,有因为接受能力弱,数据暂不能读入的情况write:数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
监听Channel事件
可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少channel发生了事件
方法1,阻塞知道绑定事件发生
1 | int count = selector.select(); |
方法2,阻塞直到绑定事件发生,或是超时(时间单位ms)
1 | int count = selector.select(long timeout); |
方法3,不会阻塞,也就是不管有没有时间,立即返回,自己根据返回值检查是否有事件
1 | int count = selector.selectNow(); |
select何时不阻塞
- 事件发生时
- 客户端发起连接请求,会触发
accept事件 - 客户端发送数据过来,客户端正常、异常关闭时,都会触发
read事件,另外如果发送的数据大于buffer缓冲区,会触发多次读取事件 channel可写,会触发write事件- 在linux下nio bug发生时
- 客户端发起连接请求,会触发
- 调用
selector.wakeup() - 调用
selector.close() selector躲在线程interrupt
处理accept和read事件
事件发生后能否不处理
事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍然会触发,这是因为nio底层使用的水平触发
1 | public class Server { |
为什么要itr.remove()
因为select在事件发生后,就会将相关key放进selectedKeys集合,但不会在处理完后从selectedKeys集合中移除,需要我们自己编码移除
- 第一次触发了
sscKey上的accept事件,没有移除ssKey - 第二次触发了
scKey上的read事件,但这个时候selectedKeys中还有上次的sscKey,在处理时因为没有真正的serverSocket连上,就会导致空指针异常
cancel的作用cancel会取消注册在selector上的channel,并从keys集合中删除key后续不会再监听事件
处理消息边界
如下图所示,我们通过网络接收到的消息的长度并不一定都是固定的,在服务端如果设置一个固定长度的buffer,就会出现如下的情况
- 接收到的消息比设定的
buffer长,这个时候就需要想办法扩容buffer - 接收到的消息比
buffer短,这样就会可能导致半包的情况,第一条消息完全接收了,第二条消息接收了一部分 - 接收到的消息更短一些,这样就会出现黏包、半包的现象

解决思路:
- 一种思想是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
- 另一种思路是按分隔符拆分,缺点是效率低,每次都需要将整个buffer循环一遍,查找分隔符
- TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是需要提前分配,如果内容过大,则影响server吞吐量
- Http1.1是TLV格式
- Http2.0是LTV格式
当客户端发送来一条消息,但是长度超过了buffer的长度,假设buffer为16字节,消息长度为20字节,此时第一次触发read事件,拿到了前面的16个字节,但是因为消息没有结束,所以read不会返回-1,客户端也就无法感知,但是当前read事件的数据并未处理完毕,所以会再次触发read事件,第二次循环创建了一个新的buffer,此时消息只剩下4个字节,16字节的buffer完全能够接受,最终打印就只剩下四个字节
改进思路:
buffer不能设置为局部变量,需要设置为全局变量(至少是针对当前连接的全局变量),当客户端发送请求,buffer感知到当前长度无法承载全部数据,就会进行扩容,创建一个新的buffer,然后将之前的数据存放到新的buffer中,之后因为没有完全读取消息,会再次触发read事件,第二次read事件将剩余消息获取
具体代码实现如下,split(buffer)方法在上面讲解黏包半包的时候已经写过
1 |
|
ByteBuffer大小分配
- 每个
Channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个Channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer ByteBuffer不能太大,比如一个ByteBuffer1Mb的话,要支持百万连接就要1Tb内存,因此需要设计大小可变的ByteBuffer- 一种思路是首先分配一个较小的
buffer,例如4k,如果发现数据不够,再分配8k的buffer,将4k内容拷贝至8kbuffer,优点是消息连续容易处理,缺点是数据拷贝消耗性能 - 另一种思路是用多个数组组成
buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续,解析复杂,优点是避免了拷贝引起的性能损耗
- 一种思路是首先分配一个较小的
处理write事件
一次无法写完
- 非阻塞模式下,无法保证把
buffer中的所有数据都写入channel,因此需要追踪write方法的返回值(代表实际写入的字节数) - 用
selector监听所有channel的可写事件,每个channel都需要一个key来追踪buffer,但这样又会导致占用内存过多,就有两阶段策略- 当消息处理器第一次写入消息时,才将
channel注册到selector上 selector检查channel上的可写事件,如果所有的数据写完了,就取消channel的注册- 如果不取消,会每次可写均会触发
write事件
- 当消息处理器第一次写入消息时,才将
代码如下:
1 | public class WriteServer { |
1 | public class WriteClient { |
write为何要取消
只要想channel发送数据时,socket缓冲可写,这个事件就会频繁触发,因此应该只在socket缓冲区写不下时再关注可写事件,数据写完之后再取消关注
更近一步(利用多线程优化)
现在都是多核CPU,设计时要充分考虑被让cpu的力量被白白浪费,前面的代码只有一个选择器,没有充分利用多核cpu,进行改进
分两组选择器
- 单线程配一个选择器,专门处理
accept事件 - 创建cpu核心数的线程,每个线程配置一个选择器,轮流处理
read事件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//创建固定数量的worker,,创建CPU核数个线程
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-"+i);
}
AtomicInteger index = new AtomicInteger(0);
while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected... {}",sc.getRemoteAddress());
//2.关联channel
log.debug("before register... {}",sc.getRemoteAddress());
//轮询
workers[index.getAndIncrement()%workers.length].register(sc);//boss 调用,初始化selector,启动worker-0
log.debug("after register... {}",sc.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;//还未初始化
// private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
//初始化线程 和 selector
public void register(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this, name);
selector = Selector.open();
thread.start();
start = true;
}
//向队列添加了任务,但这个任务并没有立即执行 boss
// queue.add(()->{
// try {
// sc.register(selector,SelectionKey.OP_READ,null);
// } catch (ClosedChannelException e) {
// e.printStackTrace();
// }
// });
// selector.wakeup();//唤醒select
selector.wakeup();
sc.register(selector,SelectionKey.OP_READ,null);
}
/*
当channel执行register方法注册到selector上的时候,如果selector已经执行了select方法陷入了阻塞,
register也会被阻塞,需要等selector下次被唤醒之后才能注册
第一种解决办法:
如上代码使用队列,保证register方法是在selector唤醒后执行,在向队列插入数据后执行wakeup唤醒select,
这样当有channel需要注册到selector上的时候,就会先把任务封装放进队列,然后唤醒selector,唤醒后就立刻从队列中拿出任务执行注册
第二种解决办法:
直接执行wakeup方法唤醒select,然后进行注册
如果在执行了select后执行wakeup方法,会唤醒select然后执行注册
如果是在select之前执行了wakeup方法,在执行select的时候会判断,然后跳过这次阻塞,进行下次循环
*/
public void run() {
while (true){
try {
selector.select();// worker-0 阻塞
// Runnable task = queue.poll();
// if (task != null) task.run(); //执行任务内容
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("after register... {}",channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}1
2
3
4
5
6
7
8public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8080));
sc.write(Charset.defaultCharset().encode("1234567890abcdef"));
System.in.read();
}
}
如何拿到cpu核数
Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理cpu个数,而不是容器申请时的个数- 这个问题直到jdk10才修复,使用jvm参数
UseContainerSupport配置,默认开启
NIO VS BIO
stream VS channel
stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区(更为底层)stream仅支持阻塞API,channel同时支持阻塞、非阻塞API,网络channel可配合selector实现多路复用- 二者均为全双工,即读写可以同时进行
IO模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,而是由其他线程发送结果(至少两个线程)
当调用一次channel.read或者stream.read后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:
- 等待数据阶段
- 复制数据阶段

阻塞IO
用户线程发起read方法,并不一定立即就会有数据,没有数据的时候会阻塞等待数据,直到有数据之后,完成复制返回
非阻塞IO
在非阻塞的情况下,用户线程发起read之后并不会等待,如果没有数据会立即返回,此时可以去完成其他的业务,也可以循环继续等待数据
多路复用
用户线程使用select方法等待数据,当有了数据之后会获取所有的数据信息返回,然后根据不同数据类型在分别进行处理
异步IO
用户线程发起read方法后,如果没有数据,会立即返回,然后去做其他的处理,等有数据之后内核会调用回调方法返回数据
零拷贝
传统IO问题
传统IO将一个文件通过socket写出
1 | File f = new File("test/data.txt"); |
内部工作流程是这样的:
- java本身并不具备IO读写能力,因此
read方法调用后,要从java程序的用户态,切换至内核态,去调用操作系统的(Kernel)的读写能力,将数据读入内核缓存区。这期间用户线程阻塞,操作系统使用DMA(Direct Memory Access)来实现文件读,期间也不会使用cpu - 从内核态切换到用户态,将数据从内核缓冲区读入用户缓冲区(即byte[] buf),这期间cpu会参与拷贝,无法利用DMA
- 调用
write方法,这时将数据从用户缓冲区(byte[] buf)写入socket缓冲区,cpu会参与拷贝 - 接下来要向网卡写数据,这项能力java又不具备,因此又要从用户态切换到内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,不会使用cpu
可以看到中间环节较多,java的io实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
- 用户态和内核态的切换发生了3次,这个操作比较重量级
- 数据拷贝了共4次
NIO优化
通过DirectByteBuffer
ByteBuffer.allocate(10)HeapByteBuffer使用的好还是java内存ByteBuffer.allocateDirect(10)DirectByteBuffer使用的是操作系统内存

大部分步骤和优化前相同,不再多说,唯有一点:java可以使用DirectByteBuffer将堆外内存映射到jvm内存中来直接访问使用
- 这块内存不受jvm垃圾回收的影响,因此内存地址固定,有助于IO读写
- java中的
DirectByteBuffer对象仅维护了此内存的虚引用,内存回收分成两步DirectByteBuffer对象被垃圾回收,将虚引用加入引用队列- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用 linux2.1 后提供的sendFile方法),java中对应着两个channel调用transferTo/transferFrom方法拷贝数据
- java调用
transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu - 数据从内核缓冲区传输到socket缓冲区,cpu会参与拷贝
- 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu
可以看到
- 只发生了一次用户态和内核态的切换
- 数据拷贝了3次
进一步优化(linux 2.4)
- java调用
transferTo方法后,要从java程序的用户态切换到内核态,使用DMA将数据读入内核缓冲区,不会使用cpu - 只会将一些
offset和length信息考入socket缓冲区,几乎无消耗 - 使用DMA将内核缓冲区的数据写入王咖啡,不会使用cpu
真个过程仅只发生了一次用户态与内核态的切换,数据拷贝了2次。
所谓【零拷贝】,并不是真的无拷贝,而是在不会拷贝重复数据到jvm内存中,零拷贝的优点有:
- 更少的用户态与内核态的切换
- 不利用cpu计算,减少cpu缓冲伪共享
- 零拷贝适合小文件传输
AIO
AIO用来解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,好还是相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows系统通过
IOCP实现了真正的异步IO - Linux系统异步IO在2.6半包引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势
文件AIO
AsynchronousFileChannel
1 |
|
输出
1 | 11:33:39 [DEBUG] [main] c.y.n.c.AioFileChannel - read begin... |
可以看到
- 响应文件读取成功的是另一个线程
Thread-12 - 主线程并没有IO操作阻塞
💡 守护线程
默认文件AIO使用的线程都是守护线程,所以最后要执行System.in.read()以避免守护线程意外结束
网络IO
1 | public class AioServer { |