IO多路复用详解
# 多路复用的NIO实现
- poll:性能较高,也是基于
Reactor
模型,仅Linux
系统支持,Linux
下kernels 2.6内核版本之前使用poll来支持Java的NIO框架的。 - epoll:性能高,仅仅
Linux
下支持,Linux
的kernels 2.6之后的内核版本都是基于epoll
支持Java
的NIO
框架,但Linux
没有windows
系统的IOCP
技术,所以也都是用epoll
来模拟异步IO技术。 - select (重点):性能较高,
Linux
和Windows
系统都支持,Linux
的kernels 2.6之前默认都是使用select
模型,Windows
的同步IO也都是既有select
模型。 - kqueue:性能高,基于
Proactor
,目前的Java
版本不支持。
# Reactor模型
# 传统IO模型
如下图每个客户端连接到达时,服务端都会分配一个线程进行各种处理,所以这种模型线程数和用户访问量呈线性关系。这种模型在访问量不大的情况下还是可以扛住的,但是在高并发场景下,会有以下几个问题:
- 操作系统可以创建的线程数是有限制的。可以使用这条命令查看Linux可以创建的最大线程数:
cat /proc/sys/kernel/threads-max
。 - 无论用户是何种的请求我们都会专门分配一个线程让他处理数据接受、解码、业务处理、发送数据等操作,在网络质量不好的情况下,这就会大量线程阻塞,进而导致服务器效率降低,从而降低了服务器的吞吐量。
# Reactor事件驱动模型(业务处理与IO分离)
# 图解
如下图,相较于传统IO模型来说,将处理用户请求和网络请求拆分,而且该模型还是非阻塞IO模型,所以当网络事件需要处理时,程序还可以处理其他任务,处理效率有显著提高。
# 代码示例
首先是Reactor ,可以看到该代码就是服务端的入口,他做的事情就是初始通道,一旦监听到客户端的请求就分发给Accept代码
public class Reactor implements Runnable {
private static Logger logger = LoggerFactory.getLogger(Reactor.class);
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException {
//创建一个频道 并注册到selector
serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(port));
selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor(serverSocketChannel));
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
selector.select();
} catch (IOException e) {
logger.error("服务端使用一个线程不断等待客户端的连接到达失败,失败原因{}", e.getMessage(), e);
}
// 客户端请求到达,使用迭代器遍历处理
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
logger.info("监听到客户端连接事件后将其分发给Acceptor");
dispatch(iterator.next());
//因为我们注册的通道是accept,这里已经调用dispatch了,所以需要将其删除掉,避免迭代器二次使用
iterator.remove();
}
try {
int count = selector.selectNow();
logger.info(
"此方法执行非阻塞 选择操作。如果自上一次 ,选择操作以来没有通道成为可选择的,则此方法立即返回零。,返回结果[{}]",
count);
} catch (IOException e) {
logger.error("非阻塞选择操作失败,失败原因[{}]", e.getMessage(), e);
}
}
}
private void dispatch(SelectionKey selectionKey) {
//将客户端的请求交给accept处理
Runnable accept = (Runnable) selectionKey.attachment();
accept.run();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class Acceptor implements Runnable {
private static Logger logger = LoggerFactory.getLogger(Acceptor.class);
private final ServerSocketChannel serverSocketChannel;
private static final ExecutorService threadPool = Executors.newFixedThreadPool(20);
public Acceptor(ServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
//将任务分发到线程池中处理
try {
threadPool.execute(new Handler(socketChannel));
} catch (Exception e) {
logger.error("任务分发失败,失败原因:[{}]", e.getMessage(), e);
}
}
} catch (IOException e) {
logger.error("Acceptor分发任务到线程池失败,失败原因:[{}]", e.getMessage(), e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Handler implements Runnable {
private static Logger logger = LoggerFactory.getLogger(Handler.class);
private volatile static Selector selector;
private final SocketChannel serverSocketChannel;
private SelectionKey selectionKey;
private volatile ByteBuffer input = ByteBuffer.allocate(1024);
private volatile ByteBuffer output = ByteBuffer.allocate(1024);
public Handler(SocketChannel channel) throws Exception {
selector = Selector.open();
serverSocketChannel = channel;
channel.configureBlocking(false);
selectionKey = channel.register(selector, SelectionKey.OP_READ);
}
@Override
public void run() {
while (selector.isOpen() && serverSocketChannel.isOpen()) {
try {
Set<SelectionKey> keys = select();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key) throws IOException {
output.flip();
if (serverSocketChannel != null) {
serverSocketChannel.write(output);
selectionKey.channel();
serverSocketChannel.close();
output.clear();
}
}
private void read(SelectionKey key) throws IOException {
serverSocketChannel.read(input);
if (input.position() == 0) {
return;
}
input.flip();
process();
input.clear();
// 完成读取后就监听写入事件
key.interestOps(SelectionKey.OP_WRITE);
}
private void process() throws UnsupportedEncodingException {
byte[] bytes = new byte[input.remaining()];
input.get(bytes);
String str = new String(bytes, CharsetUtil.UTF_8);
logger.info("读取到的数据为[{}]", str);
output.put("receive u msg".getBytes());
}
// 这里处理的主要目的是处理Jdk的一个bug,该bug会导致Selector被意外触发,但是实际上没有任何事件到达,
// 此时的处理方式是新建一个Selector,然后重新将当前Channel注册到该Selector上
private Set<SelectionKey> select() throws IOException {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
while (keys.isEmpty()) {
int interestOps = selectionKey.interestOps();
selector = Selector.open();
selectionKey = serverSocketChannel.register(selector, interestOps);
return select();
}
return keys;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# Reactor事件驱动模型并发读写
# 图解
上述仅仅实现网络处理和网络读写业务处理的拆分,在高并发场景下,如果将网络读写和业务处理耦合在一起的情况下,很可能因为网络质量问题导致线程阻塞,进而导致服务器吞吐量降低,所以我们对Reactor进行一番改造,如下图,可以看到我们将Reactor拆分成来个,一个处理网络请求,另一个负责网络读写和业务计算,但是网络请求我们也会专门开个线程池进行处理,处理完成的分发的业务处理的线程池中,这样性能就一下子提高了。
# Java对多路IO复用的支持
# 核心概念介绍
多路IO复用工作机制整体如下图所示,以下为里面几个核心概念:
channel
:与操作系统内核交互、传递数据的通道,一个通道通常有一个专属的文件描述符,常见的通道有**ServerSocketChannel
、ScoketChannel
、DatagramChannel
**,其中应用程序只有通过ServerSocketChannel
向选择器注册时才能实现多路IO复用的事件监听(这种监听支持TCP
、UDP
),而ScoketChannel
则是TCP
套接字监听通道,DatagramChannel
则是UDP
报文监听通道。buffer
:Java
的NIO
框架为每种类型的读写都设置的缓存buffer
,buffer
缓冲区有3个比较重要的概念,position
在读状态下时代表读取到的位置,在写状态代表写的起始位置。limit在读状态代表数据最多可以写到位置,在写状态代表还可以写capacity-limit
个位置。capacity
则代表buffer
容量。Selector
(重要):Java
NIO
中比较重要的概念,你可以将其理解为轮询代理器、channel
容器管理器,有了selector
,应用程序不再通过阻塞或者非阻塞模式询问操作系统是否有事件发生,取而待之的是selector
去代理轮询。
# 代码示例
我们现在不妨就编写一个支持多路IO复用的Java NIO框架
# 服务端代码
代码如下所示,为该应用程序注册感兴趣的通道后,即可启动选择器不断轮询,如果有读请求进来则对读数据进行处理,我们会将这个频道的hash值算出来去concurrentHashMap 查看这个用户之前是否有发送消息,若有则且用户本次发送了over,我们则将信息拼接起来输出。若没有用户本次发送的文本没有over,我们则将本次的消息拼接起来存到concurrentHashMap 中。
public class SocketServer1 {
/**
* 日志
*/
private static Logger logger = LoggerFactory.getLogger(SocketServer1.class);
private static ConcurrentHashMap<Integer, StringBuffer> concurrentHashMap = new ConcurrentHashMap<>();
public static void main(String[] args) throws Exception {
// 创建服务端的ServerSocketChannel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置为非阻塞模式,即可read、write没有结果返回一个标志即直接结束了
serverChannel.configureBlocking(false);
// 获得与此通道相关的服务套接字
ServerSocket serverSocket = serverChannel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(83));
// 获得选择器
Selector selector = Selector.open();
//将我们上面感兴趣的频道注册到selector中,注意:服务端的通道只能注册SelectionKey.OP_ACCEPT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
try {
while (true) {
//如果条件成立,说明本次询问selector,并没有获取到任何准备好的、感兴趣的事件
//java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
if (selector.select(100) == 0) {
logger.info("100ms毫秒没有没有频道事件,可以再次处理一些业务逻辑");
continue;
}
//这里就是本次询问操作系统,所获取到的“所关心的事件”的事件类型(每一个通道都是独立的)
Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();
while (selecionKeys.hasNext()) {
SelectionKey readyKey = selecionKeys.next();
//这个已经处理的readyKey一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
//待到下一次selector.select() > 0时,这个readyKey又会被处理一次
selecionKeys.remove();
SelectableChannel selectableChannel = readyKey.channel();
if (readyKey.isValid() && readyKey.isAcceptable()) {
logger.info("======accept =======");
/*
* 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
* 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
* 否则无法监听到这个socket channel到达的数据
* */
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
SocketChannel socketChannel = serverSocketChannel.accept();
registerSocketChannel(socketChannel, selector);
} else if (readyKey.isValid() && readyKey.isConnectable()) {
logger.info("======socket channel 建立连接=======");
} else if (readyKey.isValid() && readyKey.isReadable()) {
logger.info("======socket channel 数据准备完成,可以进行数据读取");
readSocketChannel(readyKey);
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
serverSocket.close();
}
}
/**
* 在server socket channel接收到/准备好 一个新的 TCP连接后。
* 就会向程序返回一个新的socketChannel。<br>
* 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
* 所以程序还没法通过selector通知这个socket channel的事件。
* 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
* socket channel感兴趣的事件
*
* @param socketChannel 新的socket channel
* @param selector selector“选择器/代理器”
* @throws Exception
*/
private static void registerSocketChannel(SocketChannel socketChannel, Selector selector) throws Exception {
socketChannel.configureBlocking(false);
//socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(2048));
}
/**
* 这个方法用于读取从客户端传来的信息。
* 并且观察从客户端过来的socket channel在经过多次传输后,是否完成传输。
* 如果传输完成,则返回一个true的标记。
*
* @throws Exception
*/
private static void readSocketChannel(SelectionKey readyKey) throws Exception {
logger.info("开始阅读客户端传入的数据**************************");
SocketChannel clientSocketChannel = (SocketChannel) readyKey.channel();
//获取客户端使用的端口
InetSocketAddress sourceSocketAddress = (InetSocketAddress) clientSocketChannel.getRemoteAddress();
Integer resoucePort = sourceSocketAddress.getPort();
ByteBuffer contextBytes = (ByteBuffer) readyKey.attachment();
StringBuffer stringBuffer = new StringBuffer();
int realLen = -1;
try {
// 若有读取到数据则皆有缓存将数据到String
while ((realLen = clientSocketChannel.read(contextBytes)) != -1) {
// 将limit到position的位置,position调整到其实位置,为读数据做好准备
contextBytes.flip();
int position = contextBytes.position();
int length = contextBytes.limit();
byte[] bytes = new byte[length];
contextBytes.get(bytes, position, length);
String messageEncode = new String(bytes, 0, length, "UTF-8");
stringBuffer.append(messageEncode);
//position变为0,limit回去,即切换为写模式
contextBytes.clear();
}
} catch (Exception e) {
//这里抛出了异常,一般就是客户端因为某种原因终止了。所以关闭channel就行了
logger.error("读取客户端数据异常,异常原因:[{}]", e.getMessage(), e);
clientSocketChannel.close();
return;
}
//如果收到了“over”关键字,才会清空buffer,并回发数据;
//否则不清空缓存,还要还原buffer的“写状态”
if (URLDecoder.decode(stringBuffer.toString(), "UTF-8").indexOf("over") != -1) {
Integer hashCode = clientSocketChannel.hashCode();
String message = null;
if (concurrentHashMap.containsKey(hashCode)) {
StringBuffer str = concurrentHashMap.get(hashCode);
str.append(stringBuffer);
message = str.toString();
}else{
message = stringBuffer.toString();
}
logger.info("端口:" + resoucePort + "客户端发来的信息======message : " + message);
clientSocketChannel.close();
concurrentHashMap.remove(hashCode);
} else {
logger.info("端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + URLDecoder.decode(stringBuffer.toString(), "UTF-8"));
Integer hashCode = clientSocketChannel.hashCode();
String message = null;
if (concurrentHashMap.containsKey(hashCode)) {
StringBuffer str = concurrentHashMap.get(hashCode);
str.append(stringBuffer);
concurrentHashMap.put(hashCode, str);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# 客户端代码
public class Client {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 83);
OutputStream out = socket.getOutputStream();
String s = "hello world over";
out.write(s.getBytes());
out.close();
}
}
2
3
4
5
6
7
8
9
# 小结
- Java NIO多路复用不再使用多线程进行IO处理,当然在业务处理中,我们仍然可以通过线程池进行处理。
- 同一个端口可以处理TCP或者UDP,例如你的应用程序向选择器注册ServerSocketChannel。
- Java NIO是操作系统级别的优化,能够同时处理多个客户端IO事件,同时具有阻塞式同步IO和非阻塞式同步IO的所有特点。当然多路复用IO还是属于操作系统级别的同步IO。
# Netty
# 简介
Netty是一款高性能、异步事件驱动的强大的NIO框架,业界主流的RPC框架、zookeeper等都是基于其构建的。其特点为:
1. api简单,开发门槛低 功能强大
2. 内置了多种编码、解码功能
3. 与其它业界主流的NIO框架对比,netty的综合性能最优 社区活跃,使用广泛,经历过很多商业应用项目的考验
4. 定制能力强,可以对框架进行灵活的扩展
2
3
4
# 使用示例
# 导入pom
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.5.Final</version>
</dependency>
2
3
4
5
# 服务端代码
入口
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
public class NettyServer {
private static Logger logger = LoggerFactory.getLogger(NettyServer.class);
public void bind(int port) throws Exception {
ServerBootstrap serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
serverBootstrap.setPipelineFactory(() -> {
ChannelPipeline channelPipeline = Channels.pipeline();
channelPipeline.addLast(MessageHandler.class.getName(), new MessageHandler());
return channelPipeline;
});
serverBootstrap.bind(new InetSocketAddress(port));
}
public static void main(String[] args) {
try {
new NettyServer().bind(83);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
消息处理器
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
/**
* 收到客户端请求的消息处理器
*
*/
public class MessageHandler extends SimpleChannelHandler {
private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
ChannelBuffer message = (ChannelBuffer) messageEvent.getMessage();
byte[] bytes = message.readBytes(message.readableBytes()).array();
logger.info("服务端收到消息:[{}]", new String(bytes, "UTF-8"));
byte[] body = "服务端已收到".getBytes();
byte[] header = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN).putInt(body.length).array();
Channels.write(channelHandlerContext.getChannel(), ChannelBuffers.wrappedBuffer(header, body));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 客户端代码
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
public class NettyClient {
private static Logger logger = LoggerFactory.getLogger(NettyClient.class);
private final ByteBuffer readHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
private final ByteBuffer writeHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
private SocketChannel socketChannel;
public void sendMessage(byte[] body) throws Exception {
this.socketChannel = SocketChannel.open();
this.socketChannel.socket().setSoTimeout(60000);
this.socketChannel.connect(new InetSocketAddress(83));
writeWithHeader(this.socketChannel, body);
readHeader.clear();
read(this.socketChannel, readHeader);
int bodyLen = readHeader.getInt(0);
ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN);
read(this.socketChannel, bodyBuf);
logger.info("客户端收到的响应内容:[{}]", new String(bodyBuf.array(), "UTF-8"));
}
private void writeWithHeader(SocketChannel channel, byte[] body) throws IOException {
writeHeader.clear();
writeHeader.putInt(body.length);
writeHeader.flip();
channel.write(ByteBuffer.wrap(body));
}
private void read(SocketChannel channel, ByteBuffer buffer) throws IOException {
while (buffer.hasRemaining()) {
int len = channel.read(buffer);
if (len == -1) {
throw new IOException("end of stream when reading header");
}
}
}
public static void main(String[] args) throws Exception {
String body = "客户发的测试请求!";
new NettyClient().sendMessage(body.getBytes());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60