java IO模型

阻塞 与 非阻塞

阻塞

请求收到后,请求被一直阻塞,直到条件满足(可读,可写等)

非阻塞

请求收到后,立即返回一个标志信息,而不会一直阻塞等待。一个通过 Selector选择器遍历channel获取满足条件的channel来处理。

同步 与 异步

同步

每个请求必须逐个地被处理,一个请求的处理会导致整个流程的暂时等待,这些事件无法并发地执行。用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行。nio是业务线程在io操作准备好得到通知,接着由该线程进行io操作,但是io本身还是同步的。

异步I/O 

多个请求可以并发地执行,一个请求或者任务的执行不会导致整个流程的暂时等待。用户线程发起I/O请求后仍然继续执行,当内核I/O操作完成后会通知用户线程,或者调用用户线程注册的回调函数。AIO是在io操作完成后,给线程发出通知。

传统BIO简单实现及分析

传统bio 是同步阻塞io

这里简单实现一个直接输出客户端输入的server

server 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class SocketServer {
public static class HandleRequest implements Runnable {
private Socket socket;

public HandleRequest(Socket socket) {
this.socket = socket;
}

@Override

public void run() {

try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter writer = new PrintWriter(socket.getOutputStream())) {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("服务器收到请求");
String len;
while ((len = reader.readLine()) != null) {
log.info("收到请求:{}", len);
writer.println(len);
writer.flush();

}
log.info("处理相应耗时:{}", stopwatch.stop());
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
try (ServerSocket serverSocket = new ServerSocket(8000)) {
while (true) {
Socket socket = serverSocket.accept();
executorService.execute(new HandleRequest(socket));
}

} catch (Exception e) {
e.printStackTrace();
}

}
}

client 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SocketClient {
public static void sendRequest() {
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(8000));
try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter writer = new PrintWriter(socket.getOutputStream())) {
log.info("发送请求");
writer.print("hello");
LockSupport.parkNanos(1000 * 1000 * 6000L);
writer.println("world");
writer.flush();
log.info("服务器返回:{}", reader.readLine());

}
} catch (Exception e) {
e.printStackTrace();
}

}

public static void main(String[] args) throws Exception {
CompletableFuture[] futures=new CompletableFuture[10];
for (int i = 0; i < 10; i++) {
futures[i]=CompletableFuture.runAsync(SocketClient::sendRequest);
}
CompletableFuture.allOf(futures).join();

}
}

可以看到 这里server只能一个线程只处理一个请求。每个请求的处理时间都在6秒(client发送数据的时候我暂停了,模拟一个缓慢的网络环境)。

服务器需要处理大量的请求连接,如果每个请求都像上面的client 一样很慢,这就会拖慢服务器的处理速度,服务器每秒能处理的请求就会很少。

这里服务器慢不是由于服务端有很多繁重的任务,而是在等慢慢的cpu,高效cpu等低效 的网路io实在是太不划算了。

NIO简单实现及分析

NIO:准备好了通知我,是同步非阻塞io

了解nio,首先要知道 Channel,Buffer ,Selector这几个关键组件

Channel: 类似于流,和一个文件或者socket对应,往channel中写数据,就等于往socket中写数据

Buffer: 简单理解为一个内存区域或者byte数组,数据需要包装成Buffer才能与channel交互

selector: SelectableChannel可以注册到selector中,被selector管理。一个selector可以管理多个Channel

当Channel的数据准备好了 selector就会接到通知。

可以看到一个线程管理一个selector处理多个channel,也就是多个客户端连接,这就能用少量的线程处理大量的客户端连接。

server实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@Slf4j
public class SocketServer {

/**
* 建立连接
*
* @param selectionKey
*/
private static void doAccept(SelectionKey selectionKey, Selector selector) {
try {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//注册下一个感兴趣的状态为读
SelectionKey clientKey = socketChannel.register(selector, SelectionKey.OP_READ);
//绑定需要需要回复给客户端的数据队列
clientKey.attach(new LinkedBlockingDeque<ByteBuffer>());
selector.wakeup();

} catch (IOException e) {
throw new IllegalStateException(e);
}

}

/**
* 读取数据
*
* @param selectionKey
*/
private static void doRead(SelectionKey selectionKey, Selector selector) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int len = socketChannel.read(byteBuffer);
if (len < 0) {
// log.warn("读取到非法数据断开连接");
socketChannel.close();
return;
}
byteBuffer.flip();
LinkedBlockingDeque<ByteBuffer> byteBuffers = (LinkedBlockingDeque<ByteBuffer>) selectionKey.attachment();
byteBuffers.offer(byteBuffer);
//注册下一个感兴趣的状态为读或者写
selectionKey.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
//唤醒阻塞在 select() 方法的线程
selector.wakeup();

} catch (IOException e) {
throw new IllegalStateException(e);
}

}

/**
* 写数据
*
* @param selectionKey
*/
private static void doWrite(SelectionKey selectionKey, Selector selector) {
try {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
LinkedBlockingDeque<ByteBuffer> byteBuffers = (LinkedBlockingDeque<ByteBuffer>) selectionKey.attachment();
ByteBuffer byteBuffer = byteBuffers.take();
int len = socketChannel.write(byteBuffer);
if (len < 0) {
// log.warn("读取到非法数据断开连接");
socketChannel.close();
selectionKey.cancel();
return;
}

//注册下一个感兴趣的状态为读或者写
if (byteBuffers.size() == 0) {
selectionKey.interestOps(SelectionKey.OP_READ);
}
//唤醒阻塞在 select() 方法的线程
selector.wakeup();

} catch (Exception e) {
throw new IllegalStateException(e);
}

}

public static void main(String[] args) throws Exception {
Selector selector = SelectorProvider.provider().openSelector();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
Map<SelectionKey, Stopwatch> map = Maps.newHashMap();
channel.bind(new InetSocketAddress(8000));
//向selector注册感兴趣的状态
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞直到有数据准备好(状态处于注册时的状态)
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//处理了需要删除
iterator.remove();
if (selectionKey.isAcceptable()) {
doAccept(selectionKey, selector);
}
if (selectionKey.isValid() && selectionKey.isReadable()) {
map.putIfAbsent(selectionKey, Stopwatch.createStarted());
doRead(selectionKey, selector);
}
if (selectionKey.isValid() && selectionKey.isWritable()) {
doWrite(selectionKey, selector);
log.info("处理耗时:{}", map.get(selectionKey).stop());
}
}
}
}
}

使用前面bio 的client连接nio的server,可以看到及时客户端迟钝或者出现网络延迟,对服务端影响也不大。

这里的server还是只用了一个main线程来处理请求,也可以像上面bio那样使用线程池来处理请求,将读写请求读写注册到不同的selector中,一个主selector负责监控连接请求

多个子selector负责监控处理读写请求

多selector实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
public class ThreadPoolSocketServer {

private static int readWriteProccessCount = 5;
private static int totalCount = 0;
private static ReadWriteProccess[] proccesses = new ReadWriteProccess[readWriteProccessCount];

static {
for (int i = 0; i < readWriteProccessCount; i++) {
proccesses[i] = new ReadWriteProccess();
}
}

public static class ReadWriteProccess {
private volatile boolean isRun = false;
private Selector selector;

public ReadWriteProccess() {
try {
selector = Selector.open();
// run();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}

public void register(SocketChannel socketChannel) {
try {
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(new LinkedBlockingDeque<ByteBuffer>());
System.out.println("attach" + System.currentTimeMillis());
run();
} catch (ClosedChannelException e) {
throw new IllegalStateException(e);
}
}

public void wakeup() {
selector.wakeup();
}

public void run() {
if (!isRun) {
isRun = true;
new Thread(() -> {
while (true) {
// 阻塞直到有数据准备好(状态处于注册时的状态)
try {

//由于select方法与register 方法都需要获取相同的监视器,使用select会一直阻塞在register方法上所以采用 selectNow
// selector.select();
if (selector.selectNow() <= 0) {
continue;
}
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//处理了需要删除
iterator.remove();
if (selectionKey.isValid() && selectionKey.isReadable()) {
doRead(selectionKey, selector);
}
if (selectionKey.isValid() && selectionKey.isWritable()) {
doWrite(selectionKey, selector);
}
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}).start();
}
}

}

/**
* 建立连接
*
* @param selectionKey
*/
private static void doAccept(SelectionKey selectionKey) {
try {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
ReadWriteProccess proccess = proccesses[totalCount++%readWriteProccessCount];
proccess.register(socketChannel);
proccess.wakeup();

} catch (IOException e) {
throw new IllegalStateException(e);
}

}

/**
* 读取数据
*
* @param selectionKey
*/
private static void doRead(SelectionKey selectionKey, Selector selector) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int len = socketChannel.read(byteBuffer);
if (len < 0) {
log.warn("读取到非法数据断开连接");
socketChannel.close();
selectionKey.cancel();
return;
}
byteBuffer.flip();
LinkedBlockingDeque<ByteBuffer> byteBuffers = (LinkedBlockingDeque<ByteBuffer>) selectionKey.attachment();
byteBuffers.offer(byteBuffer);
//注册下一个感兴趣的状态为读或者写
selectionKey.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
//唤醒阻塞在 select() 方法的线程
selector.wakeup();

} catch (IOException e) {
throw new IllegalStateException(e);
}

}

/**
* 写数据
*
* @param selectionKey
*/
private static void doWrite(SelectionKey selectionKey, Selector selector) {
try {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
LinkedBlockingDeque<ByteBuffer> byteBuffers = (LinkedBlockingDeque<ByteBuffer>) selectionKey.attachment();
ByteBuffer byteBuffer = byteBuffers.take();
int len = socketChannel.write(byteBuffer);
if (len < 0) {
log.warn("--读取到非法数据断开连接");
socketChannel.close();
selectionKey.cancel();
return;
}

//注册下一个感兴趣的状态为读或者写
if (byteBuffers.size() == 0) {
selectionKey.interestOps(SelectionKey.OP_READ);
}
//唤醒阻塞在 select() 方法的线程
selector.wakeup();

} catch (Exception e) {
throw new IllegalStateException(e);
}

}

public static void main(String[] args) throws Exception {
Selector selector = SelectorProvider.provider().openSelector();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(8000));
//向selector注册感兴趣的状态
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞直到有数据准备好(状态处于注册时的状态)
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//处理了需要删除
iterator.remove();
if (selectionKey.isAcceptable()) {
doAccept(selectionKey);
}
}
}
}
}

这里采用了5个ReadWriteProccess来处理读写请求,主线程只用建立连接,建立好连接之后就交给ReadWriteProccess来处理降低了主线程的压力。netty中也是类似的多Reactor模式。

当然客户端也能使用nio来实现

client实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class SocketClient {

private static void connect(SelectionKey selectionKey, Selector selector) {
try {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//正在连接完成连接
if (socketChannel.isConnectionPending()) {
socketChannel.finishConnect();
}
socketChannel.write(ByteBuffer.wrap("hello world\n".getBytes()));
selectionKey.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}

private static void read(SelectionKey selectionKey, Selector selector) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.read(byteBuffer);
log.info("得到服务端结果:{}", new String(byteBuffer.array()));
socketChannel.close();
selector.close();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}

private static void sendRequest() {
try {
Selector selector = SelectorProvider.provider().openSelector();
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(8000));
channel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
if (!selector.isOpen()) {
return;
}
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//处理了需要删除
iterator.remove();
if (selectionKey.isConnectable()) {
connect(selectionKey, selector);
} else if (selectionKey.isValid() && selectionKey.isReadable()) {
read(selectionKey, selector);
}
}

}
} catch (IOException e) {
throw new IllegalStateException(e);
}

}

public static void main(String[] args) throws Exception {
CompletableFuture[] futures = new CompletableFuture[10];
for (int i = 0; i < 10; i++) {
futures[i] = CompletableFuture.runAsync(SocketClient::sendRequest);
}
CompletableFuture.allOf(futures).join();

}
}

AIO简单实现及分析

aio比nio更进一步,它不是在io准备好时通知线程,而是在io完成时,再通过回调函数的方式给线程发送通知

可以看到下面的实现全是异步回调函数。

server实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class SocketServer {
public static void start() throws Exception{
AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8000));
//注册 回调
socketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);

// 连接成功回调
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
Future<Integer> writeResult = null;
try {
byteBuffer.clear();
//这里读写都是异步的
result.read(byteBuffer).get(100, TimeUnit.SECONDS);
byteBuffer.flip();
writeResult = result.write(byteBuffer);
} catch (Exception e) {
log.error("错误", e);
} finally {
try {
//服务器进行下一个客户端连接的准备
socketChannel.accept(null, this);
//等待数据写完
writeResult.get();
result.close();
} catch (Exception e) {
log.error("错误", e);
}
}
}

//连接失败回调
@Override
public void failed(Throwable exc, Object attachment) {
log.error("错误", exc);
}
});


}

public static void main(String[] args) throws Exception {
start();
//主线程可以继续操作
TimeUnit.SECONDS.sleep(200);
}
}

client实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class SocketClient {
public static void sendRequest() {
try {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
//注册 回调
// 这里必须加 localhost 不知道为啥
channel.connect(new InetSocketAddress("localhost",8000), null, new CompletionHandler<Void, Object>() {
// 连接成功回调
@Override
public void completed(Void result, Object attachment) {
//写完数据回调 从服务端读取数据
channel.write(ByteBuffer.wrap("hello world".getBytes()), null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
//读完数据回调
channel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
byteBuffer.flip();
log.info("服务器返回数据:{}", new String(attachment.array()));
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
});
}

@Override
public void failed(Throwable exc, Object attachment) {

}
});
}

// 连接失败回调
@Override
public void failed(Throwable exc, Object attachment) {
log.error("错误", exc);
}
});

} catch (IOException e) {
e.printStackTrace();
}

}

public static void main(String[] args) throws Exception {
CompletableFuture[] futures = new CompletableFuture[10];
for (int i = 0; i < 10; i++) {
futures[i] = CompletableFuture.runAsync(SocketClient::sendRequest);
}
CompletableFuture.allOf(futures).join();
//由于方法全是异步的,不停一下main方法会马上退出
TimeUnit.SECONDS.sleep(20);

}
}