Java网络编程教程|Netty深入浅出全套教程——NIO基础(一)

non-blocking io 非阻塞IO

三大组件

Channel

channel优点类似于stream,它就是读写数据的双向通道,可以从channel将数据读写到buffer,也可以将buffer的数据读写到channel,而之前的stream要么是输入,要么是输出,channelstream更为底层

常用的channel有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

Buffer

buffer用来缓冲数据
常见大的buffer有:

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

Selector

选择器,selector单从字面以上不好理解,需要结合服务器的设计宴会来理解它的用途

服务器设置演化过程

多线程版设计

多线程版缺点

服务端每来一个请求创建一个线程去处理,每个线程的创建都是需要成本的,会占用一定的内存,如果同时有大量的请求进来,就会创建大量的线程,占用大量内存,CPU因为核数有限,所以同时能够处理的线程是有限的,线程过多就会导致CPU不停的切换线程进行执行,每次切换都需要把原有线程的数据保存,然后加载新线程的数据

  • 内存占用高
  • 线程上下文切换成本高
  • 只适合连接数少的场景

线程池版设计

线程池版缺点

同一个线程在同一事件仅能处理一个请求,如果这个请求需要处理的内容比较耗时,就会影响到其他的请求的处理,同时如果有一些下载操作或者其他的需要长连接的操作,也会导致一个请求长时间独占一个线程,因为线程池中的线程是有一定数量的,如果有大量这种请求,就会导致剩余的请求无线程进行处理

  • 阻塞模式下,线程同一时间仅能处理一个socket连接
  • 仅适合短链接场景

selector版设计

selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上面。适合连接数特别多,但流量低的场景(low traffic)

调用selectorselect()会阻塞直到channel发生了读写就绪事件,这些事件发生,select方法就会返回这些事件交给thread来处理

selector可以监控多个channel,当某一个channel上面发生了事件,就会通知selectorselector就会调用线程去处理,但是如果一个channel发送了大量的数据,就会导致线程长时间处理当前的事件,导致其他的事件等待

ByteBuffer

ByteBuffer正确的使用姿势

  • Buffer写入数据,列如调用channel.read(buffer)
  • 调用flip()切换至读模式
  • buffer读取数据,列如调用buffer.get()
  • 调用clear()compact()切换至写模式
  • 重复上面步骤

ByteBuffer结构

ByteBuffer有以下重要属性

  • capacity 容量,buffer里面能装多少数据
  • position 读写指针
  • limit 读写的限制,应该读多少字节,应该写多少字节

写入数据过程:
一开始,假设指定buffer的容量是10,那么capacity就是10,因为最开始buffer是空的,所以最开始的时候,position指针就在最开始的位置,至于能写的数目,最开始是跟整个容量相同的,所以此时limit等于capacity
Buffer写入数据过程一
写模式下,position是写入位置,limit等于容量,下图标识写入了4个字节后的状态
每写入一个字节,position都会向后移动以一位
Buffer写入数据过程二
flip动作发生后(切换到读模式之后),position切换为读取位置,limit切换为读取限制
假设如下图,buffer中写入了4个字节,调用flip方法切换到读模式之后,会将position指针的位置切换到0,因为读取数据需要从第一位开始读,limit会被指向4,因为当前只有四个元素,后面的空位读取也没有意义
Buffer写入数据过程三
读取4个字节后,状态
假设是调用get方法,每次读取一个字节,调用一次position指针会从0指向1,再次调用会从1指向2,一直到和limit相同之后代表已经读完当前buffer中的数据
Buffer写入数据过程四
clear动作发生后,状态
相当于是将整个buffer重置,position回到起始位置,limit也和capacity相同,表示当前buffer是空的,全部空间都可以写入
Buffer写入数据过程五
compact方法,是把未读完的部分向前压缩,然后切换到写模式
假设由于一些原因,可能当前buffer中的数据还没有读完,就需要再次写入,假设有四个字节的数据,当前只读了两个,调用compact方法后,会将前面两个已经读取过的数据清除,后面未读取的数据前移,同时position指针则指向第三个位置,表示只能从这个位置开始写入,limit同样和capacity相同,表示可以写到这个位置
Buffer写入数据过程六

方法演示

用到了下面的工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import java.nio.ByteBuffer;

import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.MathUtil.*;

public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];

static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}

int i;

// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}

// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(StringUtil.NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}

// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}

// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}

// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}

/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}

/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}

private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");

final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;

// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;

// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");

// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}

// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");

// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}

dump.append(StringUtil.NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}

private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(StringUtil.NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}

public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}

具体测试代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestByteBufferReadWrite {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 0x61);//'a'
ByteBufferUtil.debugAll(buffer);
buffer.put(new byte[]{0x62,0x63,0x64});//b c d
ByteBufferUtil.debugAll(buffer);
//不切换到读模式,直接读取数据,因为当前指针position指向的是下一个要写入的下标,buffer原本是空的,所以读取出来的是0
// System.out.println(buffer.get());//0
buffer.flip();//切换到读模式
System.out.println(buffer.get());
ByteBufferUtil.debugAll(buffer);
buffer.compact();
ByteBufferUtil.debugAll(buffer);
buffer.put(new byte[]{0x65,0x6f});
ByteBufferUtil.debugAll(buffer);
}
}

输出结果如下:(添加了一些描述)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//写入了0x61 也就是字符a,position指针指向1,表示下一个数据从1开始插入
+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 00 00 00 00 00 00 00 00 00 |a......... |
+--------+-------------------------------------------------+----------------+
//批量插入三个字符:0x62,0x63,0x64(b c d),加上上面的写入的一个字符,一共是四个字符,
//所以position指向下标为4的位置,也就是第5个字符位,表示下一个数据从第5个开始
+--------+-------------------- all ------------------------+----------------+
position: [4], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+
//切换为读模式之后,调用get()方法打印一个字符,是第一个插入的字符a,直接打印为10进制97,
//如果不切换读模式直接读取,那么读到的是上面position指向的下标为4的数据,也就是0
97
//此时下标为0,也就是第一个数据已经打印,position指向下标为1的位置(切换为读模式之后,position指向0位,读取一位后指向1位),
//limit为限制位,当前只有四个数据,所以limit为4,也就是指向下标为4的第5位,表示最多只能读取到这一位
+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [4]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+
//调用compact()方法切换到写模式,该方法会保留未读取的数据,之前一共写入了4个数据,读出了一个,还剩3个未读,
//会将后面的三个数据前移,同时position也会指向3,现在第4位,也就是下标为3的数据位不是空是之前遗留的脏数据,
//下次写入数据的时候会从下标3开始写入,会将该数据覆盖
+--------+-------------------- all ------------------------+----------------+
position: [3], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 64 00 00 00 00 00 00 |bcdd...... |
+--------+-------------------------------------------------+----------------+
//写入两个字符之后,已经将第4位遗留的脏数据覆盖,同时position也已经指向到了5
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 65 6f 00 00 00 00 00 |bcdeo..... |
+--------+-------------------------------------------------+----------------+

ByteBuffer常见的方法

分配空间

可以使用allocate方法为ByteBuffer分配空间,其他的Buffer也有类似的方法
buffer分配空间后不能动态调整空间大小,空间大固定

1
ByteBuffer buffer = ByteBuffer.allocate(10);

allocate()allocateDirect()方法

方法 类型 内存 效率 GC 分配
allocate() class java.nio.HeapByteBuffer java堆内存 读写效率较低 受GC影响
allocateDirect() class java.nio.DirectByteBuffer 直接内存 读写效率高(少一次拷贝) 不受GC影响(因为不受GC控制,所以如果不能及时回收会占用大量服务器内存) 分配的效率低(选哟调用服务器函数分配)

buffer写入数据

有两种方法:

  • 调用channelread方法
  • 调用bufferput方法
    1
    int readBytes = channel.read(buf);
    1
    buf.read((byte)127);

buffer中读取数据

同样有两种放法:

  • 调用channelwrite方法
  • 嗲用buffer自己的get方法
1
int writeBytes = channel.write(buf);

1
byte b = buf.get();

get方法会然让position指针向后走,如果想要重复读取数据

  • 可以调用rewind方法将position重置为0
  • 或者调用get(int i)方法获取索引i的内容,它不会移动指针

markreset

mark是在读取时,做一个标记,即使position改变,只要调用reset就能回到mark的位置

注意:
rewindflip都会清除mark位置

方法演示

rewind

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ByteBufferRead {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{'a','b','c','d'});
buffer.flip();//切换读模式

//从头开始读
//将buffer中的数据读到一个4位的数组中,此时position会移动到下标为4的位置
buffer.get(new byte[4]);
ByteBufferUtil.debugAll(buffer);
/*
+--------+-------------------- all ------------------------+----------------+
position: [4], limit: [4]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 00 00 00 00 00 |.......... |
+--------+-------------------------------------------------+----------------+
*/
//rewind从头开始读,将position重置为0
buffer.rewind();
//存进去了4个字节,上面读出了4个字节,这里调用了rewind方法重置,所以读出的应该是第一个字节 a
System.out.println((char) buffer.get());// a
}
}

mark & reset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ByteBufferRead {

public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{'a','b','c','d'});
buffer.flip();//切换读模式

//mark & reset
//mark 做一个标记,记录 position 位置,reset 是将 position 重置到 mark 记录的位置
System.out.println((char)buffer.get());//读取第一个字节,position指向1的位置 a
System.out.println((char)buffer.get());//读取第二个自己,position指向2的位置 b
buffer.mark();//使用mark方法记录当前的位置,也就是索引为2的位置
System.out.println((char)buffer.get());//读取第3个字节,position指向3的位置 c
System.out.println((char)buffer.get());//读取第4个字节,position指向4的位置 d
buffer.reset();//使用reset方法将position重置到之前mark记录的位置,也就是下标2,后面继续读取
System.out.println((char)buffer.get());//重新读取第3个字节 c
System.out.println((char)buffer.get());//重新读取第4个字节 d

}
}

get(i)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ByteBufferRead {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{'a','b','c','d'});
buffer.flip();//切换读模式
System.out.println((char) buffer.get(3));//直接通过下标获取最后一个字节,d
ByteBufferUtil.debugAll(buffer);
//position位置依然是0,并没有因为get方法而改变
/*
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [4]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+
*/
}
}

字符串和ByteBuffer之间互相转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class TestByteBufferString {

public static void main(String[] args) {
//1.字符串转为ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put("hello".getBytes());
ByteBufferUtil.debugAll(buffer);

//2.Charset
//调用该方法生成的buffer是已经切换到读模式之后的,position指向0
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
ByteBufferUtil.debugAll(buffer1);

//3.wrap
//调用该方法生成的buffer是已经切换到读模式之后的,position指向0
ByteBuffer buffer2 = ByteBuffer.wrap("hello".getBytes());
ByteBufferUtil.debugAll(buffer2);

//buffer转字符串,第一个因为是直接写入的,所以需要切换到读模式之后才能转字符串,不然转出来的是空
//后面两个因为已经是读模式了,可以直接转字符串
String str = StandardCharsets.UTF_8.decode(buffer2).toString();
System.out.println(str);
}
}

Buffer的线程安全

buffer是非线程安全的

Scattering Reads

分散读取,有一个文本test.txt
如下数据,可以选择一次把数据读取到一个buffer中,然后再进行分割,不过因为每段的长度是已经知道的,所以可以直接读取到三个buffer

1
onetwothree

使用如下方式读取,可以将数据填充至多个buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestScatteringReads {
public static void main(String[] args) {
try (FileChannel channel = new RandomAccessFile("netty-demo/test.txt","r").getChannel()){
ByteBuffer b1 = ByteBuffer.allocate(3);
ByteBuffer b2 = ByteBuffer.allocate(3);
ByteBuffer b3 = ByteBuffer.allocate(4);
channel.read(new ByteBuffer[]{b1,b2,b3});
b1.flip();
b2.flip();
b3.flip();
ByteBufferUtil.debugAll(b1);
ByteBufferUtil.debugAll(b2);
ByteBufferUtil.debugAll(b3);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

Gathering Write

集中写,假设需要把多个buffer写入到同一个文件中,可以先把多个buffer汇总成一个buffer,也可以使用下面的方法一次性写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TestGatheringWrites {
public static void main(String[] args) {
ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello ");
ByteBuffer b2 = StandardCharsets.UTF_8.encode("world!");
ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");
try (FileChannel channel = new RandomAccessFile("netty-demo/test1.txt", "rw").getChannel();){
channel.write(new ByteBuffer[]{b1,b2,b3});
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

黏包/半包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class TestByteBufferExam {
/*
网络上有多条数据发送给服务端,数据之间使用\n进行分割
但是由于某些原因这些数据在接收的时候,被进行了重新组合,例如原始数据有3条:
Hello World\n
I'm zhangshan\n
How are you?\n
变成了下面的两个buffer (黏包,半包)
黏包:多条数据合并成一条数据
半包:一条消息被截断成了多条
(因为效率问题,将多条数据合并成一条数据发送,就会有黏包的现象)
(半包是因为服务器buffer大小有限制,如果数据的长度超过了buffer的大小,就会分两次读取,就会出现半包的现象)
Hello World\nI'm zhangshan\nHo
w are you?\n
现在需要你编写程序,将错乱的数据恢复成原始的按\n分割的数据
*/

public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello World\nI'm zhangshan\nHo".getBytes(StandardCharsets.UTF_8));
split(source);
source.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
split(source);
}

private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
//读取每一个字节,判断到\n的时候表示找到一条完整的消息
//get(i)不会导致指针移动
if (source.get(i) == '\n'){
int length = i+1-source.position();
ByteBuffer target = ByteBuffer.allocate(length);
//从 source 向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
ByteBufferUtil.debugAll(target);
}
}
source.compact();
}
}

文件编程

FileChannel

FileChannel只能工作在阻塞模式下

获取 FileChannel

不能直接打开FileChannel,必须通过FileInputStreamFileOutputStream或者RandomAccessFile来获取FileChannel,它们都有getChannel方法

  • 通过FileInputStream获取的channel只能读
  • 通过FileOutputStream获取的channel只能写
  • 通过RandomAccessFile获取的channel是否能读写根据构造RandomAccessFile时的读写模式决定

读取

会从channel读取数据填充ByteBuffer,返回值表示读到了多少字节,-1表示到达了文件的末尾

1
int readBytes = channel.read(buffer);

写入

写入的正确姿势如下:

1
2
3
4
5
6
ByteBuffer buffer = ...;
buffer.put(...);//存入数据
buffer.flip();//切换读模式
while(buffer.hasRemaining()){//检查还有没有数据
channel.write(buffer);//调用方法写入
}

while中调用channel.write是因为write方法并不能包装一次将buffer中的内容全部写入到channel

关闭

channel必须关闭,不过调用了FileInputStreamFileOutputStream或者RandomAccessFileclose方法会间接的调用channelclose方法

位置

获取当前位置:

1
long pos = channel.position();

设置当前位

1
2
long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回-1
  • 这时写入,会追加内容,但要注意如果position超过了文件末尾,再写入时会在新内容和原末尾之间有空洞(00)

大小

使用size方法获取文件的大小

强制写入

操作系统出于性能考虑,会将数据缓存,不是立刻写入磁盘,可以体哦啊用fore(true)方法将文件内容和元数据(文件的权限等信息)立即写入磁盘

两个 Channel 传输数据

channel1中把数据传输到channel2中(1传输到2)
transferTo
传输文件有限制,最大只能是2G

1
channel1.transferTo(0,channel.size(),channel2);

三个参数
position 起始位置
count 传输数据量
target 传输到哪个Channel

1
public abstract long transferTo(long position, long count,WritableByteChannel target) throws IOException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try (FileChannel from = new FileInputStream("netty-demo/data.txt").getChannel();
FileChannel to = new FileOutputStream("netty-demo/to.txt").getChannel()) {
//效率高,底层会利用操作系统的零拷贝进行优化
//from.transferTo(0,from.size(),to);
//使用transferTo传输数据大小有上线,每次最多2g
//可以使用下面的方法处理
long size = from.size();
for (long left = size; left > 0; ) {
left = from.transferTo((size - left), left, to);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

Path

jdk7引入了PathPaths

  • Path用来表示文件路径
  • Paths是工具类,用来获取Path实例
    1
    2
    3
    4
    Path source = Paths.get("1.txt");     //相对路径,使用user.dir环境变量来定位1.txt
    Path source = Paths.get("D:\\1.txt"); //绝对路径代表了 D:\1.txt
    Path source = Paths.get("D:/1.txt"); //绝对路径,同样代表了 D:\1.txt
    Path source = Paths.get("D:\\data","projects"); //代表了 D:\data\projects
  • .代表了当前路径
  • ..代表了上一级路径
    1
    2
    3
    Path path = Paths.get("D:\\data\\projects\\a\\..\\b"); //寻找a目录上级目录下的b目录
    System.out.println(path);
    System.out.println(path.normalize());//正常化路径
    1
    2
    D:\data\projects\a\..\b
    D:\data\projects\b

Files

  • 检测文件是否存在

    1
    2
    Path path = Paths.get("test/data.txt");
    System.out.println(Files.exists(path));
  • 创建一级目录

    • 如果目录已经存在,会抛出异常FileAlreadyExistsException
    • 不能一次创建多级目录,否则会抛出异常NoSuchFileException
      1
      2
      Path path = Paths.get("test/data");
      Files.createDirectory(path);
  • 创建多级目录

    1
    2
    Path path = Paths.get("test/data/da");
    Files.createDirectories(path);
  • 拷贝文件

    1
    2
    3
    4
    Path source = Paths.get("test/data.txt");
    Path target = Paths.get("test/target.txt");

    Files.copy(source,target);

    如果文件已经存在,会抛出异常FileAlreadyExistsException
    如果希望用source覆盖掉target ,需要用StandardCopyOption来控制

    1
    Files.copy(source,target, StandardCopyOption.REPLACE_EXISTING);

    循环拷贝整个文件夹及其下面的路径:
    这里使用的是Files.walk()方法,使用Files.walkFileTree()方法也可以达到同样的效果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class TestFilesCopy {
    public static void main(String[] args) throws IOException {
    String source = "D:\\export";
    String target = "D:\\export-aaa";
    Files.walk(Paths.get(source)).forEach(path->{
    try {
    //获取文件或者文件夹在新文件夹下的全路径名
    String targetName = path.toString().replace(source,target);
    //如果循环的是目录
    if (Files.isDirectory(path)){
    //如果是目录,就在新文件夹内创建目录
    Files.createDirectory(Paths.get(targetName));
    }else if (Files.isRegularFile(path)){
    //如果循环到的是文件,就拷贝到新文件夹下
    Files.copy(path,Paths.get(targetName));
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    });
    }
    }
  • 移动文件

    • StandardCopyOption.ATOMIC_MOVE保证移动文件的时候的原子性
      1
      2
      3
      4
      Path source = Paths.get("test/data.txt");
      Path target = Paths.get("test1/data.txt");

      Files.move(source,target,StandardCopyOption.ATOMIC_MOVE);
  • 删除文件

    1
    2
    Path source = Paths.get("test/data.txt");
    Files.delete(source);
  • 删除目录

    • 如果目录中还有内容存在,会抛出异常DirectoryNotEmptyException
    • 想要删除非空目录需要先把目录里面的内容全部删除之后才行
      1
      2
      Path source = Paths.get("test/data");
      Files.delete(source);
  • 遍历文件夹及其子目录和文件

    • 如下所示,该方法一共有两个参数
    • 第一个参数是需要遍历的文件夹的根目录
    • 第二个参数是通过访问者模式实现,通过对具体方法的重写,达到在遍历每个阶段插入自己代码逻辑的目的
      1
      public static Path walkFileTree(Path start, FileVisitor<? super Path> visitor) throws IOException{}
      示例如下,使用的是SimpleFileVisitor类来实现逻辑,FileVisitor接口有四个方法,分别如下:
  • preVisitDirectory():在遍历到文件夹前会调用该方法

  • visitFile():在遍历到文件时会调用该方法

  • visitFileFailed():遍历文件失败时会调用该方法

  • postVisitDirectory():遍历文件夹之后会调用该方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    public class TestFilesWalkFileTree {
    //删除文件夹下的所有文件
    public static void main(String[] args) throws IOException {
    Files.walkFileTree(Paths.get("E:\\export"),new SimpleFileVisitor<Path>(){
    //先删除文件
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
    Files.delete(file);
    return super.visitFile(file, attrs);
    }
    //遍历文件夹结束后删除文件夹,因为遍历文件夹里面的文件的时候,已经把文件全部删除了
    @Override
    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
    Files.delete(dir);
    return super.postVisitDirectory(dir, exc);
    }
    });
    }
    //遍历文件夹下所有jar包文件
    public static void main2(String[] args) throws IOException {
    AtomicInteger jarCount = new AtomicInteger(0);
    Files.walkFileTree(Paths.get("D:\\jdk\\jdk8"),new SimpleFileVisitor<Path>(){
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
    if (file.toString().endsWith(".jar")){
    System.out.println(file);
    jarCount.incrementAndGet();
    }
    return super.visitFile(file, attrs);
    }
    });
    System.out.println("jar count:"+jarCount.get());
    }
    //遍历文件夹下所有的文件和文件夹
    public static void main1(String[] args) throws IOException {
    AtomicInteger dirCount = new AtomicInteger(0);
    AtomicInteger fileCount = new AtomicInteger(0);
    Files.walkFileTree(Paths.get("D:\\jdk\\jdk8"),new SimpleFileVisitor<Path>(){
    //遍历到文件夹前,会执行该方法
    @Override
    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
    System.out.println("=====>"+dir);
    dirCount.incrementAndGet();
    return super.preVisitDirectory(dir, attrs);
    }
    //遍历到文件会执行该方法
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
    System.out.println(file);
    fileCount.incrementAndGet();
    return super.visitFile(file, attrs);
    }
    //遍历文件异常时会调用
    @Override
    public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
    return super.visitFileFailed(file, exc);
    }
    //遍历文件夹后会调用
    @Override
    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
    return super.postVisitDirectory(dir, exc);
    }
    });
    System.out.println("dir count:"+dirCount.get());
    System.out.println("file count:"+fileCount.get());
    }
    }

网络编程

阻塞 VS 非阻塞

阻塞

  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept()会在没有连接建立时让线程暂停
    • SocketChannel.read()会在没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
  • 但多线程下,有新的问题,体现在一下几个方面
    • 32位JVM一个线程320k,64位JVM一个线程1024k,如果连接数过多,必然导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

阻塞模式,单线程方式,如下所示:
调用ssc.accept()方法的时候会等待新的连接建立,陷入阻塞中
当连接建立后,channel.read(buffer),从channel中读取数据的时候同样会陷入阻塞
假设,我们创建了一个连接,然后发送了一条消息,这个过程是没有问题的
但是因为只有一个线程,当进入下一次循环的时候,会继续调用ssc.accept()方法等待连接,这个时候,第一个建立连接的方法再次发送消息,是收不到的,
只能等再次有新的连接建立后才能收到消息
假设第一个连接建立之后,并没有发消息,同样会在read方法进行阻塞,这个时候在有新的连接建立同样是接收不到的,必须等第一个连接发送了消息之后才行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
//使用 nio 来理解阻塞模式

//0、ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
//1、创建一个服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
//2、绑定坚挺端口
ssc.bind(new InetSocketAddress(8080));

//3、连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true){
//4、简历与客户端的连接,SocketChannel用来与客户端之间通信
log.debug("connecting...");
//阻塞方法,线程停止运行,
SocketChannel sc = ssc.accept();
log.debug("connected... {}",sc);
channels.add(sc);
for (SocketChannel channel : channels) {
//5、接收客户端发送的数据
log.debug("before read... {}",channel);
//阻塞方法,线程停止运行
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugRead(buffer);
buffer.clear();
log.debug("after read... {}",channel);
}
}
}
}
1
2
3
4
5
6
7
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8080));
System.in.read();
}
}

非阻塞

  • 非阻塞模式下,相关方法都不会让线程暂停
    • ServerSocketChannel.accept()在没有连接建立时,会返回null,继续运行
    • SocketChannel.read()在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannelread方法或是去执行ServerSocketChannel.accept()方法
    • 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了CPU
  • 数据复制过程中,线程事件还是阻塞的(AIO改进的地方)

非阻塞模式实现如下:
主要是通过configureBlocking(false)方法,将参数设置为false切换
针对ServerSocketChannel设置,影响的是ssc.accept()方法,如果没有连接,线程在执行该方法后,不会阻塞,会立即返回null
针对SocketChannel设置,影响的是channel.read(buffer)方法,如果没有数据,线程在执行了该方法后,不会阻塞,会立即返回0
线程会不停的循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class Server {

public static void main(String[] args) throws IOException {
//使用 nio 来理解阻塞模式

//0、ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
//1、创建一个服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
//设置为非阻塞模式,影响的是下面的accept()方法
ssc.configureBlocking(false);
//2、绑定坚挺端口
ssc.bind(new InetSocketAddress(8080));

//3、连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
//4、简历与客户端的连接,SocketChannel用来与客户端之间通信
//log.debug("connecting...");
//非阻塞模式下,此方法不会阻塞线程,会继续运行,不过在没有连接建立的时候sc会返回为null
SocketChannel sc = ssc.accept();
if (sc != null) {
log.debug("connected... {}", sc);
//将SocketChannel设置为非阻塞模式,影响的是下面的read方法
sc.configureBlocking(false);
channels.add(sc);
}

for (SocketChannel channel : channels) {
//5、接收客户端发送的数据
//log.debug("before read... {}",channel);
//非阻塞模式,线程不会阻塞,会继续运行,如果没有数据read返回0,
int read = channel.read(buffer);
if (read != 0) {
buffer.flip();
ByteBufferUtil.debugRead(buffer);
buffer.clear();
log.debug("after read... {}", channel);
}
}
}
}
}

(Selector)多路复用

单线程可以配合Selector完成对多个Channel可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络IO、普通文件IO没法录用多路复用
  • 如果不用Selector的非阻塞模式,线程大部分时间都是在做无用功,而Selector能保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
    • 限于网络传输能力,Channel未必时时可写,一旦Channel可写,会触发Selector的可写事件

好处:

  • 一个线程配合selector就可以监控多个channel的事件,事件发生线程才去处理,避免非阻塞模式下做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

创建

1
Selector selector = Selector.open();

绑定Channel事件
也称之为注册事件,绑定的事件Selector才会关心

1
2
channel.configureBlocking(false);
SelectionKey sscKey = channel.register(selector, 绑定事件);
  • channel必须工作在非阻塞模式
  • FileChannel没有非阻塞模式,因此不能配合Selector一起使用
  • 绑定的事件类型可以有:
    • connect:客户端连接成功时触发
    • accept:服务端成功接受连接时触发
    • read:数据可读入时触发,有因为接受能力弱,数据暂不能读入的情况
    • write:数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

监听Channel事件
可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少channel发生了事件
方法1,阻塞知道绑定事件发生

1
int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位ms)

1
int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有时间,立即返回,自己根据返回值检查是否有事件

1
int count = selector.selectNow();

select何时不阻塞

  • 事件发生时
    • 客户端发起连接请求,会触发accept事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发read事件,另外如果发送的数据大于buffer缓冲区,会触发多次读取事件
    • channel可写,会触发write事件
    • 在linux下nio bug发生时
  • 调用selector.wakeup()
  • 调用selector.close()
  • selector躲在线程interrupt

处理acceptread事件

事件发生后能否不处理
事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍然会触发,这是因为nio底层使用的水平触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class Server {

/**
* selector事件类型
* accept 会在有连接请求时触发
* connect 是客户端,连接建立后触发
* read 可读事件
* write 可写事件
*/
public static void main(String[] args) throws IOException {
//1、创建Selector
Selector selector = Selector.open();

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

//2、建立Selector和Channel的联系(注册)
//SelectionKey 就是将来事件发生后,通过他可以知道是什么事件,和那个Channel发生的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key: {}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
//3、select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
//select在事件未处理时,他是不会阻塞的 (处理或者取消事件)
//事件发生后,要么处理,要么取消,不能置之不理
selector.select();
//4、处理事件,selectedKeys返回的是一个集合,内部包含了所有发生的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> itr = selectionKeys.iterator();
while (itr.hasNext()) {
SelectionKey key = itr.next();
//当有事件的时候,selector只会往selectedKeys集合中存放数据,并不会删除数据
//所以需要我们处理完之后自己删除
itr.remove();
log.debug("key: {}", key);
//key.cancel();//取消事件
//5、区分事件类型
if (key.isAcceptable()) {//如果是accept事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {//如果是read可读事件
try {
//拿到触发事件的Channel
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
//如果客户端正常断开,返回-1,如果客户端异常断开,会抛异常
int read = channel.read(buffer);
if (read == -1) {
key.cancel();
} else {
buffer.flip();
ByteBufferUtil.debugRead(buffer);
}
} catch (Exception e) {
e.printStackTrace();
//因为客户端断开了,因此需要将key取消,(从selector集合中真正删除)
key.cancel();
}

}
}
}
}
}

为什么要itr.remove()
因为select在事件发生后,就会将相关key放进selectedKeys集合,但不会在处理完后从selectedKeys集合中移除,需要我们自己编码移除

  • 第一次触发了sscKey上的accept事件,没有移除ssKey
  • 第二次触发了scKey上的read事件,但这个时候selectedKeys中还有上次的sscKey,在处理时因为没有真正的serverSocket连上,就会导致空指针异常

cancel的作用
cancel会取消注册在selector上的channel,并从keys集合中删除key后续不会再监听事件

处理消息边界

如下图所示,我们通过网络接收到的消息的长度并不一定都是固定的,在服务端如果设置一个固定长度的buffer,就会出现如下的情况

  • 接收到的消息比设定的buffer长,这个时候就需要想办法扩容buffer
  • 接收到的消息比buffer短,这样就会可能导致半包的情况,第一条消息完全接收了,第二条消息接收了一部分
  • 接收到的消息更短一些,这样就会出现黏包、半包的现象

消息边界图示

解决思路:

  • 一种思想是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低,每次都需要将整个buffer循环一遍,查找分隔符
  • TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是需要提前分配,如果内容过大,则影响server吞吐量
    • Http1.1是TLV格式
    • Http2.0是LTV格式

当客户端发送来一条消息,但是长度超过了buffer的长度,假设buffer为16字节,消息长度为20字节,此时第一次触发read事件,拿到了前面的16个字节,但是因为消息没有结束,所以read不会返回-1,客户端也就无法感知,但是当前read事件的数据并未处理完毕,所以会再次触发read事件,第二次循环创建了一个新的buffer,此时消息只剩下4个字节,16字节的buffer完全能够接受,最终打印就只剩下四个字节
改进思路:
buffer不能设置为局部变量,需要设置为全局变量(至少是针对当前连接的全局变量),当客户端发送请求,buffer感知到当前长度无法承载全部数据,就会进行扩容,创建一个新的buffer,然后将之前的数据存放到新的buffer中,之后因为没有完全读取消息,会再次触发read事件,第二次read事件将剩余消息获取

具体代码实现如下,split(buffer)方法在上面讲解黏包半包的时候已经写过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key: {}",sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> itr = selectionKeys.iterator();
while (itr.hasNext()){
SelectionKey key = itr.next();
itr.remove();
log.debug("key: {}",key);
if (key.isAcceptable()){
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}",sc);
}else if (key.isReadable()){
try {
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer =(ByteBuffer) key.attachment();
int read = channel.read(buffer);
if (read == -1){
key.cancel();
}else {
split(buffer);
//根据split方法里面的逻辑来看,当没有找到分隔符的时候,是不会改变指针的位置的
//所以当buffer的指针和长度相等时,表示buffer是满的,需要扩容
if (buffer.position() == buffer.limit()){
//直接按原有buffer的2倍进行扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
//切换读模式
buffer.flip();
//将原本的buffer内容读取到新的buffer中
newBuffer.put(buffer);
//使用新的buffer替换到原本的buffer,挂载到selectedKey上
key.attach(newBuffer);
}
}
}catch (Exception e){
e.printStackTrace();
key.cancel();
}
}
}
}
}
}

ByteBuffer大小分配

  • 每个Channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个Channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer
  • ByteBuffer不能太大,比如一个ByteBuffer 1Mb的话,要支持百万连接就要1Tb内存,因此需要设计大小可变的ByteBuffer
    • 一种思路是首先分配一个较小的buffer,例如4k,如果发现数据不够,再分配8k的buffer,将4k内容拷贝至8k buffer,优点是消息连续容易处理,缺点是数据拷贝消耗性能
    • 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续,解析复杂,优点是避免了拷贝引起的性能损耗

处理write事件

一次无法写完

  • 非阻塞模式下,无法保证把buffer中的所有数据都写入channel,因此需要追踪write方法的返回值(代表实际写入的字节数)
  • selector监听所有channel的可写事件,每个channel都需要一个key来追踪buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将channel注册到selector
    • selector检查channel上的可写事件,如果所有的数据写完了,就取消channel的注册
    • 如果不取消,会每次可写均会触发write事件

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true){
selector.select();
Iterator<SelectionKey> itr = selector.selectedKeys().iterator();
while (itr.hasNext()){
SelectionKey key = itr.next();
itr.remove();
if (key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, 0, null);
sckey.interestOps(SelectionKey.OP_READ);

//1.向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 30000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
//2.返回值代表实际写入的字节数
int write = sc.write(buffer);
System.out.println(write);
//3.判断是否还有剩余内容
if (buffer.hasRemaining()){
//4.关注可写事件
sckey.interestOps(sckey.interestOps()+SelectionKey.OP_WRITE);
//5.把未写完的数据挂到sckey上
sckey.attach(buffer);
}
}else if (key.isWritable()){
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);
//6.清理操作,完成所有数据的写出之后
if (!buffer.hasRemaining()){
//清除挂载的buffer
key.attach(null);
//不需要再关注写事件
key.interestOps(key.interestOps()-SelectionKey.OP_WRITE);
}
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8080));
//接收服务端的数据
int count =0;
while (true){
ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
count+= sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}

write为何要取消
只要想channel发送数据时,socket缓冲可写,这个事件就会频繁触发,因此应该只在socket缓冲区写不下时再关注可写事件,数据写完之后再取消关注

更近一步(利用多线程优化)

现在都是多核CPU,设计时要充分考虑被让cpu的力量被白白浪费,前面的代码只有一个选择器,没有充分利用多核cpu,进行改进

分两组选择器

  • 单线程配一个选择器,专门处理accept事件
  • 创建cpu核心数的线程,每个线程配置一个选择器,轮流处理read事件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
    Thread.currentThread().setName("boss");
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    Selector boss = Selector.open();
    SelectionKey bossKey = ssc.register(boss, 0, null);
    bossKey.interestOps(SelectionKey.OP_ACCEPT);
    ssc.bind(new InetSocketAddress(8080));
    //创建固定数量的worker,,创建CPU核数个线程
    Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
    for (int i = 0; i < workers.length; i++) {
    workers[i] = new Worker("worker-"+i);
    }

    AtomicInteger index = new AtomicInteger(0);
    while (true) {
    boss.select();
    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
    while (iter.hasNext()) {
    SelectionKey key = iter.next();
    iter.remove();
    if (key.isAcceptable()) {
    SocketChannel sc = ssc.accept();
    sc.configureBlocking(false);
    log.debug("connected... {}",sc.getRemoteAddress());
    //2.关联channel
    log.debug("before register... {}",sc.getRemoteAddress());
    //轮询
    workers[index.getAndIncrement()%workers.length].register(sc);//boss 调用,初始化selector,启动worker-0

    log.debug("after register... {}",sc.getRemoteAddress());
    }
    }
    }
    }

    static class Worker implements Runnable{
    private Thread thread;
    private Selector selector;
    private String name;
    private volatile boolean start = false;//还未初始化
    // private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

    public Worker(String name) {
    this.name = name;
    }
    //初始化线程 和 selector
    public void register(SocketChannel sc) throws IOException {
    if (!start) {
    thread = new Thread(this, name);
    selector = Selector.open();
    thread.start();
    start = true;
    }
    //向队列添加了任务,但这个任务并没有立即执行 boss
    // queue.add(()->{
    // try {
    // sc.register(selector,SelectionKey.OP_READ,null);
    // } catch (ClosedChannelException e) {
    // e.printStackTrace();
    // }
    // });
    // selector.wakeup();//唤醒select
    selector.wakeup();
    sc.register(selector,SelectionKey.OP_READ,null);
    }
    /*
    当channel执行register方法注册到selector上的时候,如果selector已经执行了select方法陷入了阻塞,
    register也会被阻塞,需要等selector下次被唤醒之后才能注册
    第一种解决办法:
    如上代码使用队列,保证register方法是在selector唤醒后执行,在向队列插入数据后执行wakeup唤醒select,
    这样当有channel需要注册到selector上的时候,就会先把任务封装放进队列,然后唤醒selector,唤醒后就立刻从队列中拿出任务执行注册
    第二种解决办法:
    直接执行wakeup方法唤醒select,然后进行注册
    如果在执行了select后执行wakeup方法,会唤醒select然后执行注册
    如果是在select之前执行了wakeup方法,在执行select的时候会判断,然后跳过这次阻塞,进行下次循环
    */
    @Override
    public void run() {
    while (true){
    try {
    selector.select();// worker-0 阻塞
    // Runnable task = queue.poll();
    // if (task != null) task.run(); //执行任务内容
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    while (iterator.hasNext()){
    SelectionKey key = iterator.next();
    iterator.remove();
    if (key.isReadable()){
    ByteBuffer buffer = ByteBuffer.allocate(16);
    SocketChannel channel = (SocketChannel) key.channel();
    log.debug("after register... {}",channel.getRemoteAddress());
    channel.read(buffer);
    buffer.flip();
    ByteBufferUtil.debugAll(buffer);
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }

    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    public class TestClient {
    public static void main(String[] args) throws IOException {
    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("127.0.0.1",8080));
    sc.write(Charset.defaultCharset().encode("1234567890abcdef"));
    System.in.read();
    }
    }

如何拿到cpu核数

  • Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理cpu个数,而不是容器申请时的个数
  • 这个问题直到jdk10才修复,使用jvm参数UseContainerSupport 配置,默认开启

NIO VS BIO

stream VS channel

  • stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream仅支持阻塞API,channel同时支持阻塞、非阻塞API,网络channel可配合selector实现多路复用
  • 二者均为全双工,即读写可以同时进行

IO模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其他线程发送结果(至少两个线程)

当调用一次channel.read或者stream.read后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

IO模型read图示

阻塞IO
用户线程发起read方法,并不一定立即就会有数据,没有数据的时候会阻塞等待数据,直到有数据之后,完成复制返回
阻塞IO图示

非阻塞IO
在非阻塞的情况下,用户线程发起read之后并不会等待,如果没有数据会立即返回,此时可以去完成其他的业务,也可以循环继续等待数据
非阻塞IO图示

多路复用
用户线程使用select方法等待数据,当有了数据之后会获取所有的数据信息返回,然后根据不同数据类型在分别进行处理
多路复用图示

异步IO
用户线程发起read方法后,如果没有数据,会立即返回,然后去做其他的处理,等有数据之后内核会调用回调方法返回数据
异步IO

零拷贝

传统IO问题
传统IO将一个文件通过socket写出

1
2
3
4
5
6
File f = new File("test/data.txt");
RandomAccessFile file = new RandomAccessFile(file,"r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流程是这样的:
文件写出工作流程图

  1. java本身并不具备IO读写能力,因此read方法调用后,要从java程序的用户态,切换至内核态,去调用操作系统的(Kernel)的读写能力,将数据读入内核缓存区。这期间用户线程阻塞,操作系统使用DMA(Direct Memory Access)来实现文件读,期间也不会使用cpu
  2. 内核态切换到用户态,将数据从内核缓冲区读入用户缓冲区(即byte[] buf),这期间cpu会参与拷贝,无法利用DMA
  3. 调用write方法,这时将数据从用户缓冲区(byte[] buf)写入socket缓冲区,cpu会参与拷贝
  4. 接下来要向网卡写数据,这项能力java又不具备,因此又要从用户态切换到内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

可以看到中间环节较多,java的io实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态内核态的切换发生了3次,这个操作比较重量级
  • 数据拷贝了共4次

NIO优化

通过DirectByteBuffer

  • ByteBuffer.allocate(10) HeapByteBuffer使用的好还是java内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer使用的是操作系统内存

DirectByteBuffer优化后工作图示

大部分步骤和优化前相同,不再多说,唯有一点:java可以使用DirectByteBuffer将堆外内存映射到jvm内存中来直接访问使用

  • 这块内存不受jvm垃圾回收的影响,因此内存地址固定,有助于IO读写
  • java中的DirectByteBuffer对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuffer对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用 linux2.1 后提供的sendFile方法),java中对应着两个channel调用transferTo/transferFrom方法拷贝数据
transferTo/transferFrom方法流程图

  1. java调用transferTo方法后,要从java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
  2. 数据从内核缓冲区传输到socket缓冲区,cpu会参与拷贝
  3. 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

可以看到

  • 只发生了一次用户态内核态的切换
  • 数据拷贝了3次

进一步优化(linux 2.4)
linux2.4零拷贝优化

  1. java调用transferTo方法后,要从java程序的用户态切换到内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
  2. 只会将一些offsetlength信息考入socket缓冲区,几乎无消耗
  3. 使用DMA将内核缓冲区的数据写入王咖啡,不会使用cpu

真个过程仅只发生了一次用户态内核态的切换,数据拷贝了2次。
所谓【零拷贝】,并不是真的无拷贝,而是在不会拷贝重复数据到jvm内存中,零拷贝的优点有:

  • 更少的用户态内核态的切换
  • 不利用cpu计算,减少cpu缓冲伪共享
  • 零拷贝适合小文件传输

AIO

AIO用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,好还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows系统通过IOCP实现了真正的异步IO
  • Linux系统异步IO在2.6半包引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势

文件AIO

AsynchronousFileChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class AioFileChannel {

public static void main(String[] args) {
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("netty-demo/data.txt"), StandardOpenOption.READ)) {
//参数1 ByteBuffer
//参数2 读取的起始位置
//参数3 附件(如果一次读不完,可以通过附件再次读取)
//参数4 回调对象 CompletionHandler
ByteBuffer buffer = ByteBuffer.allocate(16);
log.debug("read begin...");
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override //read 成功回调方法
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...");
attachment.flip();
ByteBufferUtil.debugAll(attachment);
}

@Override //read 失败回调方法
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
log.debug("read end...");
System.in.read();
} catch (IOException e) {
}
}
}

输出

1
2
3
4
5
6
7
8
9
10
11:33:39 [DEBUG] [main] c.y.n.c.AioFileChannel - read begin...
11:33:39 [DEBUG] [main] c.y.n.c.AioFileChannel - read end...
11:33:39 [DEBUG] [Thread-12] c.y.n.c.AioFileChannel - read completed...
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [13]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 35 36 37 38 39 30 61 62 63 00 00 00 |1234567890abc...|
+--------+-------------------------------------------------+----------------+

可以看到

  • 响应文件读取成功的是另一个线程Thread-12
  • 主线程并没有IO操作阻塞

💡 守护线程
默认文件AIO使用的线程都是守护线程,所以最后要执行System.in.read()以避免守护线程意外结束

网络IO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}

private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}

private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}

private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}

@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}