BlazeMaple BlazeMaple
首页
  • 基础知识

    • Java的基本数据类型
    • Java中的常用类String
    • Java中的异常
    • Java中的注解
    • Java中的反射机制
    • Java中的泛型
    • Java为什么是值传递
  • 集合框架

    • Java集合核心知识总结
    • HashMap的7种遍历方式
    • 源码分析
  • Java新特性

    • Java8新特性
  • IO流

    • Java基础IO总结
    • Java IO中的设计模式
    • Java IO模型
    • IO多路复用详解
  • 并发编程

    • 并发编程基础总结
  • JVM

    • JVM基础总结
  • MySQL

    • MySQL核心知识小结
    • MySQL 45讲
  • Redis

    • Redis核心入门知识简记
  • Spring
  • SpringCloud Alibaba
  • 开发工具

    • Git详解
    • Maven详解
    • Docker详解
    • Linux常用命令
  • 在线工具

    • json (opens new window)
    • base64编解码 (opens new window)
    • 时间戳转换 (opens new window)
    • unicode转换 (opens new window)
    • 正则表达式 (opens new window)
    • md5加密 (opens new window)
    • 二维码 (opens new window)
    • 文本比对 (opens new window)
  • 学习资源

    • 计算机经典电子书PDF
    • hot120
GitHub (opens new window)
首页
  • 基础知识

    • Java的基本数据类型
    • Java中的常用类String
    • Java中的异常
    • Java中的注解
    • Java中的反射机制
    • Java中的泛型
    • Java为什么是值传递
  • 集合框架

    • Java集合核心知识总结
    • HashMap的7种遍历方式
    • 源码分析
  • Java新特性

    • Java8新特性
  • IO流

    • Java基础IO总结
    • Java IO中的设计模式
    • Java IO模型
    • IO多路复用详解
  • 并发编程

    • 并发编程基础总结
  • JVM

    • JVM基础总结
  • MySQL

    • MySQL核心知识小结
    • MySQL 45讲
  • Redis

    • Redis核心入门知识简记
  • Spring
  • SpringCloud Alibaba
  • 开发工具

    • Git详解
    • Maven详解
    • Docker详解
    • Linux常用命令
  • 在线工具

    • json (opens new window)
    • base64编解码 (opens new window)
    • 时间戳转换 (opens new window)
    • unicode转换 (opens new window)
    • 正则表达式 (opens new window)
    • md5加密 (opens new window)
    • 二维码 (opens new window)
    • 文本比对 (opens new window)
  • 学习资源

    • 计算机经典电子书PDF
    • hot120
GitHub (opens new window)
  • IO流

    • Java基础IO总结
    • Java IO中的设计模式
    • Java IO模型
    • IO多路复用详解
      • 多路复用的NIO实现
      • Reactor模型
        • 传统IO模型
        • Reactor事件驱动模型(业务处理与IO分离)
        • 图解
        • 代码示例
        • Reactor事件驱动模型并发读写
        • 图解
      • Java对多路IO复用的支持
        • 核心概念介绍
        • 代码示例
        • 服务端代码
        • 客户端代码
      • 小结
      • Netty
        • 简介
        • 使用示例
        • 导入pom
        • 服务端代码
        • 客户端代码
  • 并发编程

    • 并发编程基础总结
    • 深入理解volatile关键字
    • 深入理解synchronized关键字
  • JVM

    • JVM基础总结
  • Java进阶
  • IO流
BlazeMaple
2023-12-22
目录

IO多路复用详解

# 多路复用的NIO实现

  1. poll:性能较高,也是基于Reactor模型,仅Linux系统支持,Linux下kernels 2.6内核版本之前使用poll来支持Java的NIO框架的。
  2. epoll:性能高,仅仅Linux下支持,Linux的kernels 2.6之后的内核版本都是基于epoll支持Java的NIO框架,但Linux没有windows系统的IOCP技术,所以也都是用epoll来模拟异步IO技术。
  3. select (重点):性能较高,Linux和Windows系统都支持,Linux的kernels 2.6之前默认都是使用select模型,Windows的同步IO也都是既有select模型。
  4. kqueue:性能高,基于Proactor,目前的Java版本不支持。

# Reactor模型

# 传统IO模型

如下图每个客户端连接到达时,服务端都会分配一个线程进行各种处理,所以这种模型线程数和用户访问量呈线性关系。这种模型在访问量不大的情况下还是可以扛住的,但是在高并发场景下,会有以下几个问题:

  1. 操作系统可以创建的线程数是有限制的。可以使用这条命令查看Linux可以创建的最大线程数:cat /proc/sys/kernel/threads-max。
  2. 无论用户是何种的请求我们都会专门分配一个线程让他处理数据接受、解码、业务处理、发送数据等操作,在网络质量不好的情况下,这就会大量线程阻塞,进而导致服务器效率降低,从而降低了服务器的吞吐量。

image-20240327092355394

# Reactor事件驱动模型(业务处理与IO分离)

# 图解

如下图,相较于传统IO模型来说,将处理用户请求和网络请求拆分,而且该模型还是非阻塞IO模型,所以当网络事件需要处理时,程序还可以处理其他任务,处理效率有显著提高。

image-20240327092335594

# 代码示例

首先是Reactor ,可以看到该代码就是服务端的入口,他做的事情就是初始通道,一旦监听到客户端的请求就分发给Accept代码

public class Reactor implements Runnable {


    private static Logger logger = LoggerFactory.getLogger(Reactor.class);

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;


    public Reactor(int port) throws IOException {
        //创建一个频道 并注册到selector
        serverSocketChannel = ServerSocketChannel.open();
        //设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(port));

        selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        selectionKey.attach(new Acceptor(serverSocketChannel));

    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
            } catch (IOException e) {
                logger.error("服务端使用一个线程不断等待客户端的连接到达失败,失败原因{}", e.getMessage(), e);
            }

            // 客户端请求到达,使用迭代器遍历处理
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                logger.info("监听到客户端连接事件后将其分发给Acceptor");
                dispatch(iterator.next());
                //因为我们注册的通道是accept,这里已经调用dispatch了,所以需要将其删除掉,避免迭代器二次使用
                iterator.remove();
            }

            try {
                int count = selector.selectNow();
                logger.info(
                    "此方法执行非阻塞 选择操作。如果自上一次 ,选择操作以来没有通道成为可选择的,则此方法立即返回零。,返回结果[{}]",
                    count);
            } catch (IOException e) {
                logger.error("非阻塞选择操作失败,失败原因[{}]", e.getMessage(), e);
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        //将客户端的请求交给accept处理
        Runnable accept = (Runnable) selectionKey.attachment();
        accept.run();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class Acceptor implements Runnable {

    private static Logger logger = LoggerFactory.getLogger(Acceptor.class);

    private final ServerSocketChannel serverSocketChannel;

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(20);

    public Acceptor(ServerSocketChannel serverSocketChannel) {
        this.serverSocketChannel = serverSocketChannel;
    }


    @Override
    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel != null) {
                //将任务分发到线程池中处理
                try {
                    threadPool.execute(new Handler(socketChannel));
                } catch (Exception e) {
                    logger.error("任务分发失败,失败原因:[{}]", e.getMessage(), e);
                }
            }

        } catch (IOException e) {
            logger.error("Acceptor分发任务到线程池失败,失败原因:[{}]", e.getMessage(), e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Handler implements Runnable {

    private static Logger logger = LoggerFactory.getLogger(Handler.class);

    private volatile static Selector selector;
    private final SocketChannel serverSocketChannel;
    private SelectionKey selectionKey;

    private volatile ByteBuffer input = ByteBuffer.allocate(1024);
    private volatile ByteBuffer output = ByteBuffer.allocate(1024);


    public Handler(SocketChannel channel) throws Exception {
        selector = Selector.open();
        serverSocketChannel = channel;
        channel.configureBlocking(false);
        selectionKey = channel.register(selector, SelectionKey.OP_READ);


    }


    @Override
    public void run() {
        while (selector.isOpen() && serverSocketChannel.isOpen()) {
            try {
                Set<SelectionKey> keys = select();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isReadable()) {
                        read(key);
                    } else if (key.isWritable()) {
                        write(key);
                    }

                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void write(SelectionKey key) throws IOException {
        output.flip();
        if (serverSocketChannel != null) {
            serverSocketChannel.write(output);
            selectionKey.channel();
            serverSocketChannel.close();
            output.clear();
        }

    }

    private void read(SelectionKey key) throws IOException {
        serverSocketChannel.read(input);
        if (input.position() == 0) {
            return;
        }

        input.flip();
        process();
        input.clear();
//        完成读取后就监听写入事件
        key.interestOps(SelectionKey.OP_WRITE);

    }

    private void process() throws UnsupportedEncodingException {
        byte[] bytes = new byte[input.remaining()];
        input.get(bytes);
        String str = new String(bytes, CharsetUtil.UTF_8);
        logger.info("读取到的数据为[{}]", str);
        output.put("receive u msg".getBytes());
    }


    // 这里处理的主要目的是处理Jdk的一个bug,该bug会导致Selector被意外触发,但是实际上没有任何事件到达,
    // 此时的处理方式是新建一个Selector,然后重新将当前Channel注册到该Selector上
    private Set<SelectionKey> select() throws IOException {
        selector.select();
        Set<SelectionKey> keys = selector.selectedKeys();

        while (keys.isEmpty()) {
            int interestOps = selectionKey.interestOps();
            selector = Selector.open();
            selectionKey = serverSocketChannel.register(selector, interestOps);
            return select();
        }

        return keys;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94

# Reactor事件驱动模型并发读写

# 图解

上述仅仅实现网络处理和网络读写业务处理的拆分,在高并发场景下,如果将网络读写和业务处理耦合在一起的情况下,很可能因为网络质量问题导致线程阻塞,进而导致服务器吞吐量降低,所以我们对Reactor进行一番改造,如下图,可以看到我们将Reactor拆分成来个,一个处理网络请求,另一个负责网络读写和业务计算,但是网络请求我们也会专门开个线程池进行处理,处理完成的分发的业务处理的线程池中,这样性能就一下子提高了。

image-20240327094552864

# Java对多路IO复用的支持

# 核心概念介绍

多路IO复用工作机制整体如下图所示,以下为里面几个核心概念:

  1. channel:与操作系统内核交互、传递数据的通道,一个通道通常有一个专属的文件描述符,常见的通道有**ServerSocketChannel、ScoketChannel、DatagramChannel**,其中应用程序只有通过ServerSocketChannel向选择器注册时才能实现多路IO复用的事件监听(这种监听支持TCP、UDP),而ScoketChannel则是TCP套接字监听通道,DatagramChannel则是UDP报文监听通道。
  2. buffer: Java的NIO框架为每种类型的读写都设置的缓存buffer,buffer缓冲区有3个比较重要的概念,position在读状态下时代表读取到的位置,在写状态代表写的起始位置。limit在读状态代表数据最多可以写到位置,在写状态代表还可以写capacity-limit个位置。capacity则代表buffer容量。
  3. Selector(重要):Java NIO中比较重要的概念,你可以将其理解为轮询代理器、channel容器管理器,有了selector,应用程序不再通过阻塞或者非阻塞模式询问操作系统是否有事件发生,取而待之的是selector去代理轮询。

image-20240329082043021

# 代码示例

我们现在不妨就编写一个支持多路IO复用的Java NIO框架

# 服务端代码

代码如下所示,为该应用程序注册感兴趣的通道后,即可启动选择器不断轮询,如果有读请求进来则对读数据进行处理,我们会将这个频道的hash值算出来去concurrentHashMap 查看这个用户之前是否有发送消息,若有则且用户本次发送了over,我们则将信息拼接起来输出。若没有用户本次发送的文本没有over,我们则将本次的消息拼接起来存到concurrentHashMap 中。

public class SocketServer1 {


    /**
     * 日志
     */
    private static Logger logger = LoggerFactory.getLogger(SocketServer1.class);


    private static ConcurrentHashMap<Integer, StringBuffer> concurrentHashMap = new ConcurrentHashMap<>();

    public static void main(String[] args) throws Exception {
        // 创建服务端的ServerSocketChannel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置为非阻塞模式,即可read、write没有结果返回一个标志即直接结束了
        serverChannel.configureBlocking(false);
        // 获得与此通道相关的服务套接字
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        // 获得选择器
        Selector selector = Selector.open();
        //将我们上面感兴趣的频道注册到selector中,注意:服务端的通道只能注册SelectionKey.OP_ACCEPT事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        try {
            while (true) {
                //如果条件成立,说明本次询问selector,并没有获取到任何准备好的、感兴趣的事件
                //java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
                if (selector.select(100) == 0) {
                    logger.info("100ms毫秒没有没有频道事件,可以再次处理一些业务逻辑");
                    continue;
                }
                //这里就是本次询问操作系统,所获取到的“所关心的事件”的事件类型(每一个通道都是独立的)
                Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();

                while (selecionKeys.hasNext()) {
                    SelectionKey readyKey = selecionKeys.next();
                    //这个已经处理的readyKey一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
                    //待到下一次selector.select() > 0时,这个readyKey又会被处理一次
                    selecionKeys.remove();

                    SelectableChannel selectableChannel = readyKey.channel();
                    if (readyKey.isValid() && readyKey.isAcceptable()) {
                        logger.info("======accept =======");
                        /*
                         * 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
                         * 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
                         * 否则无法监听到这个socket channel到达的数据
                         * */
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        registerSocketChannel(socketChannel, selector);

                    } else if (readyKey.isValid() && readyKey.isConnectable()) {
                        logger.info("======socket channel 建立连接=======");
                    } else if (readyKey.isValid() && readyKey.isReadable()) {
                        logger.info("======socket channel 数据准备完成,可以进行数据读取");
                        readSocketChannel(readyKey);
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            serverSocket.close();
        }
    }

    /**
     * 在server socket channel接收到/准备好 一个新的 TCP连接后。
     * 就会向程序返回一个新的socketChannel。<br>
     * 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
     * 所以程序还没法通过selector通知这个socket channel的事件。
     * 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
     * socket channel感兴趣的事件
     *
     * @param socketChannel 新的socket channel
     * @param selector      selector“选择器/代理器”
     * @throws Exception
     */
    private static void registerSocketChannel(SocketChannel socketChannel, Selector selector) throws Exception {
        socketChannel.configureBlocking(false);
        //socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
        socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(2048));
    }

    /**
     * 这个方法用于读取从客户端传来的信息。
     * 并且观察从客户端过来的socket channel在经过多次传输后,是否完成传输。
     * 如果传输完成,则返回一个true的标记。
     *
     * @throws Exception
     */
    private static void readSocketChannel(SelectionKey readyKey) throws Exception {
        logger.info("开始阅读客户端传入的数据**************************");
        SocketChannel clientSocketChannel = (SocketChannel) readyKey.channel();
        //获取客户端使用的端口
        InetSocketAddress sourceSocketAddress = (InetSocketAddress) clientSocketChannel.getRemoteAddress();
        Integer resoucePort = sourceSocketAddress.getPort();


        ByteBuffer contextBytes = (ByteBuffer) readyKey.attachment();
        StringBuffer stringBuffer = new StringBuffer();
        int realLen = -1;
        try {
//            若有读取到数据则皆有缓存将数据到String
            while ((realLen = clientSocketChannel.read(contextBytes)) != -1) {
//                将limit到position的位置,position调整到其实位置,为读数据做好准备
                contextBytes.flip();
                int position = contextBytes.position();
                int length = contextBytes.limit();
                byte[] bytes = new byte[length];
                contextBytes.get(bytes, position, length);

                String messageEncode = new String(bytes, 0, length, "UTF-8");
                stringBuffer.append(messageEncode);

                //position变为0,limit回去,即切换为写模式
                contextBytes.clear();
            }

        } catch (Exception e) {
            //这里抛出了异常,一般就是客户端因为某种原因终止了。所以关闭channel就行了
            logger.error("读取客户端数据异常,异常原因:[{}]", e.getMessage(), e);
            clientSocketChannel.close();
            return;
        }


        //如果收到了“over”关键字,才会清空buffer,并回发数据;
        //否则不清空缓存,还要还原buffer的“写状态”
        if (URLDecoder.decode(stringBuffer.toString(), "UTF-8").indexOf("over") != -1) {
            Integer hashCode = clientSocketChannel.hashCode();
            String message = null;
            if (concurrentHashMap.containsKey(hashCode)) {
                StringBuffer str = concurrentHashMap.get(hashCode);
                str.append(stringBuffer);
                message = str.toString();
            }else{
                message = stringBuffer.toString();
            }

            logger.info("端口:" + resoucePort + "客户端发来的信息======message : " + message);

            clientSocketChannel.close();
            concurrentHashMap.remove(hashCode);
        } else {
            logger.info("端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + URLDecoder.decode(stringBuffer.toString(), "UTF-8"));
            Integer hashCode = clientSocketChannel.hashCode();
            String message = null;
            if (concurrentHashMap.containsKey(hashCode)) {
                StringBuffer str = concurrentHashMap.get(hashCode);
                str.append(stringBuffer);
                concurrentHashMap.put(hashCode, str);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160

# 客户端代码

public class Client {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket("127.0.0.1", 83);
        OutputStream out = socket.getOutputStream();
        String s = "hello world over";
        out.write(s.getBytes());
        out.close();
    }
}
1
2
3
4
5
6
7
8
9

# 小结

  1. Java NIO多路复用不再使用多线程进行IO处理,当然在业务处理中,我们仍然可以通过线程池进行处理。
  2. 同一个端口可以处理TCP或者UDP,例如你的应用程序向选择器注册ServerSocketChannel。
  3. Java NIO是操作系统级别的优化,能够同时处理多个客户端IO事件,同时具有阻塞式同步IO和非阻塞式同步IO的所有特点。当然多路复用IO还是属于操作系统级别的同步IO。

# Netty

# 简介

Netty是一款高性能、异步事件驱动的强大的NIO框架,业界主流的RPC框架、zookeeper等都是基于其构建的。其特点为:

1. api简单,开发门槛低 功能强大
2. 内置了多种编码、解码功能 
3. 与其它业界主流的NIO框架对比,netty的综合性能最优 社区活跃,使用广泛,经历过很多商业应用项目的考验 
4. 定制能力强,可以对框架进行灵活的扩展 
1
2
3
4

# 使用示例

# 导入pom

<dependency>
    <groupId>org.jboss.netty</groupId>
    <artifactId>netty</artifactId>
    <version>3.2.5.Final</version>
</dependency>
1
2
3
4
5

# 服务端代码

入口

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

public class NettyServer {




    private static Logger logger = LoggerFactory.getLogger(NettyServer.class);


    public void bind(int port) throws Exception {

        ServerBootstrap serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool()));

        serverBootstrap.setPipelineFactory(() -> {
            ChannelPipeline channelPipeline = Channels.pipeline();
            channelPipeline.addLast(MessageHandler.class.getName(), new MessageHandler());
            return channelPipeline;
        });

        serverBootstrap.bind(new InetSocketAddress(port));

    }


    public static void main(String[] args) {
        try {
            new NettyServer().bind(83);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

消息处理器

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

/**
 * 收到客户端请求的消息处理器
 *
 */
public class MessageHandler extends SimpleChannelHandler {

    private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        ChannelBuffer message = (ChannelBuffer) messageEvent.getMessage();
        byte[] bytes = message.readBytes(message.readableBytes()).array();
        logger.info("服务端收到消息:[{}]", new String(bytes, "UTF-8"));

        byte[] body = "服务端已收到".getBytes();
        byte[] header = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN).putInt(body.length).array();
        Channels.write(channelHandlerContext.getChannel(), ChannelBuffers.wrappedBuffer(header, body));


    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 客户端代码

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;

public class NettyClient {


    private static Logger logger = LoggerFactory.getLogger(NettyClient.class);

    private final ByteBuffer readHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
    private final ByteBuffer writeHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);

    private SocketChannel socketChannel;


    public void sendMessage(byte[] body) throws Exception {
        this.socketChannel = SocketChannel.open();
        this.socketChannel.socket().setSoTimeout(60000);
        this.socketChannel.connect(new InetSocketAddress(83));

        writeWithHeader(this.socketChannel, body);

        readHeader.clear();

        read(this.socketChannel, readHeader);
        int bodyLen = readHeader.getInt(0);
        ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN);
        read(this.socketChannel, bodyBuf);

        logger.info("客户端收到的响应内容:[{}]", new String(bodyBuf.array(), "UTF-8"));
    }


    private void writeWithHeader(SocketChannel channel, byte[] body) throws IOException {
        writeHeader.clear();
        writeHeader.putInt(body.length);
        writeHeader.flip();
        channel.write(ByteBuffer.wrap(body));
    }


    private void read(SocketChannel channel, ByteBuffer buffer) throws IOException {
        while (buffer.hasRemaining()) {
            int len = channel.read(buffer);
            if (len == -1) {
                throw new IOException("end of stream when reading header");
            }
        }
    }

    public static void main(String[] args) throws Exception {
        String body = "客户发的测试请求!";
        new NettyClient().sendMessage(body.getBytes());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
帮助我们改善此页面! (opens new window)
上次更新: 2024/08/13, 09:07:12
Java IO模型
并发编程基础总结

← Java IO模型 并发编程基础总结→

最近更新
01
SpringCloud Alibaba实战
08-22
02
SpringCloud Alibaba核心知识
08-22
03
两数之和
08-08
更多文章>
Theme by Vdoing | Copyright © 2023-2024 BlazeMaple
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式