目录
非阻塞通信的流程
示例代码:
实现非阻塞通信的方式
缓冲区
通道
Selector类:
SelectionKey类
首页 Java java教程 Java网络编程由浅入深三 一文了解非阻塞通信的图文代码示例详解

Java网络编程由浅入深三 一文了解非阻塞通信的图文代码示例详解

Mar 04, 2017 am 10:10 AM

本文详细介绍组成非阻塞通信的几大类:BufferChannelSelectorSelectionKey

非阻塞通信的流程

  1. ServerSocketChannel通过open方法获取ServerSocketChannel,通过ServerSocketChannel设置为非阻塞模式,再通过ServerSocketChannel获取socket,绑定服务进程监听端口。服务启动成功。

  2. 然后就是非阻塞通信的精髓了,Selector通过静态的open()方法获取到Selector,然后ServerSocketChannel注册Selection.OP_ACCEPT事件到Selector上。

  3. Selector就会监控事件发生,Selector通过select()监控已发生的SelectionKey对象的数目,通过selectKeys()方法返回对应的selectionKey对象集合。遍历该集合得到相应的selectionKey对象,通过该对象的channel()方法获取关联的ServerSocketChannel对象, 通过selector()方法就可以获取关联的Selector对象。

  4. 通过上面获取的ServerSocketChannel执行accept()方法获取SocketChannel,再通过SocketChannel设置为非阻塞模式,在将SocketChannel注册到上面创建的Selector上,注册SelectionKey.OP_READ |SelectionKey.OP_WRITE 事件。

  5. Selector将在监控对应上面绑定的事件,监控到对应的事件的话执行读和写的操作。

示例代码:

上面描述了服务端非阻塞方式通信的一个流程,下面通过具体代码实现:

/**
 * 非阻塞模式
 * 
 */public class EchoServer2 {
    private Selector selector = null;    
    private ServerSocketChannel serverSocketChannel = null;    
    private int port = 8001;    
    private Charset charset = Charset.forName("UTF-8");    
    public EchoServer2() throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();        //服务器重启的时候,重用端口
        serverSocketChannel.socket().setReuseAddress(true);        //设置非阻塞模式
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        System.out.println("服务器启动成功");
    }    /**
     * 服务方法
     */
    public void service() throws IOException {
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        
        while (selector.select() > 0) {
            Set readyKes = selector.selectedKeys();
            Iterator it = readyKes.iterator();            
            while (it.hasNext()) {
                SelectionKey key = null;                
                try {
                    key = (SelectionKey) it.next();
                    it.remove();                    
                    if (key.isAcceptable()) {
                        System.out.println("连接事件");                        
                        //连接事件
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = ssc.accept();
                        System.out.println("接收到客户连接,来自:" + socketChannel.socket().getInetAddress() +                                " : " + socketChannel.socket().getPort());
                        socketChannel.configureBlocking(false);
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        socketChannel.register(selector, SelectionKey.OP_READ |
                                SelectionKey.OP_WRITE, buffer);
                    } else if (key.isReadable()) {                        
                    //接收数据
                        receive(key);
                    } else if (key.isWritable()) {                        
                    //发送数据
                        send(key);
                    }
                } catch (IOException e) {
                    e.printStackTrace();                    
                    try {                        
                    if (key != null) {
                            key.cancel();
                            key.channel().close();
                        }
                    }catch (IOException ex){
                        ex.printStackTrace();
                    }
                }
            }
        }
    }    private void send(SelectionKey key) throws IOException {
        ByteBuffer buffer = (ByteBuffer) key.attachment();

        SocketChannel channel = (SocketChannel) key.channel();
        buffer.flip(); //把极限设置为位置,把位置设置为0
        String data = decode(buffer);        
        if (data.indexOf("\r\n") == -1) {            
        return;
        }
        String outputData = data.substring(0, data.indexOf("\n") + 1);
        System.out.println("请求数据:" + outputData);

        ByteBuffer outputBuffer = encode("echo:" + outputData);        
        while (outputBuffer.hasRemaining()) {
            channel.write(outputBuffer);
        }
        ByteBuffer temp = encode(outputData);
        buffer.position(temp.limit());
        buffer.compact();        if (outputData.equals("bye\r\n")) {
            key.cancel();
            channel.close();
            System.out.println("关闭与客户的连接");
        }
    }    private String decode(ByteBuffer buffer) {
        CharBuffer charBuffer = charset.decode(buffer);        return charBuffer.toString();
    }    private ByteBuffer encode(String s) {        return charset.encode(s);
    }    private void receive(SelectionKey key) throws IOException {
        ByteBuffer buffer = (ByteBuffer) key.attachment();

        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer readBuff = ByteBuffer.allocate(32);
        socketChannel.read(readBuff);
        readBuff.flip();

        buffer.limit(buffer.capacity());
        buffer.put(readBuff);
    }    public static void main(String[] args) throws IOException {        new EchoServer2().service();
    }
}/**
 * 创建非阻塞客户端
 * 
 */public class EchoClient2 {

    private SocketChannel socketChannel;    private int port = 8001;    private Selector selector;    private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);    private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);    private Charset charset = Charset.forName("UTF-8");    public EchoClient2() throws IOException {
        socketChannel = SocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
        socketChannel.connect(inetSocketAddress);//
        socketChannel.configureBlocking(false);//设置为非阻塞模式
        System.out.println("与服务器连接成功");
        selector = Selector.open();
    }    public static void main(String[] args) throws IOException {        final EchoClient2 client = new EchoClient2();
        Thread receiver = new Thread(new Runnable() {            @Override
            public void run() {
                client.receiveFromUser();
            }
        });
        receiver.start();
        client.talk();
    }    private void receiveFromUser() {        try {
            System.out.println("请输入数据:");
            BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in));
            String msg = null;            while ((msg = localReader.readLine()) != null) {
                System.out.println("用户输入的数据:" + msg);                synchronized (sendBuffer) {
                    sendBuffer.put(encode(msg + "\r\n"));
                }                if (msg.equalsIgnoreCase("bye")) {                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }    private ByteBuffer encode(String s) {        return charset.encode(s);
    }    private void talk() throws IOException {
        socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);        while (selector.select() > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();            while (it.hasNext()) {
                SelectionKey key = null;                try {
                    key = it.next();
                    it.remove();                    if (key.isReadable()) {                        //System.out.println("读事件");
                        //读事件
                        receive(key);
                    }                    if (key.isWritable()) {                       // System.out.println("写事件");
                        //写事件
                        send(key);
                    }
                } catch (IOException e) {
                    e.printStackTrace();                    if (key != null) {
                        key.cancel();
                        key.channel().close();
                    }
                }
            }
        }
    }    private void send(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();        synchronized (sendBuffer) {
            sendBuffer.flip();//把极限设为位置,把位置设为零
            channel.write(sendBuffer);
            sendBuffer.compact();//删除已经发送的数据。
        }
    }    private void receive(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        channel.read(receiveBuffer);
        receiveBuffer.flip();//将limit的值设置为position的值,将position的值设置为0
        String receiveData = decode(receiveBuffer);        if (receiveData.indexOf("\n") == -1) {            return;
        }
        String outputData = receiveData.substring(0, receiveData.indexOf("\n") + 1);
        System.out.println("响应数据:" + outputData);        if (outputData.equalsIgnoreCase("echo:bye\r\n")) {
            key.cancel();
            socketChannel.close();
            ;
            System.out.println("关闭与服务器的连接");
            selector.close();
            System.exit(0);
        }

        ByteBuffer temp = encode(outputData);
        receiveBuffer.position(temp.limit());
        receiveBuffer.compact();//删除已经打印的数据
    }    private String decode(ByteBuffer receiveBuffer) {
        CharBuffer buffer = charset.decode(receiveBuffer);        return buffer.toString();
    }
}
登录后复制

实现非阻塞通信的方式

  • 缓冲区

  • 通道

  • Selector

    缓冲区

作用:减少物理读写次数,减少内存创建和销毁次数。 缓冲区的属性:capacity(最大容量)、limit(实际容量)、position(当前位置)。PS:其他地方是翻译成capacity(容量)、limit(极限)、position位置),我个人觉得翻译成上面的更好理解,为啥通过下面的方法解析和图解就可明白。当然最好通过英文表达这样最清楚。
三个属性的关系为:capacity≥limit≥position≥0

图解关系如下:
这里写图片描述
缓冲区类结构:
java.nio.ByteBuffer类是一个抽象类,不能被实例化。但是提供了8个具体的实现类,其中最基本的的缓冲区是ByteBuffer,它存放的数据单元是字节。
这里写图片描述

常用方法:

clear():把limit设置为capacity,再把位置设为0
flip():把limit设置为position,再把位置设置为0。
rewind():不改变limit,把位置设为0。
allocate():创建一个缓冲中,方法参数指定缓冲区大小
compact():将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。

测试上述方法:
测试clear()方法

    @Test
    public void testClear() {        //创建一个10chars大小的缓冲区,默认情况下limit和capacity是相等的
        CharBuffer buffer = CharBuffer.allocate(10);
        System.out.println("创建默认情况");
        printBufferInfo(buffer);
        buffer.limit(8);//修改limit的值
        System.out.println("修改limit后");
        printBufferInfo(buffer);        // clear():把limit设置为capacity,再把位置设为0
        buffer.clear();
        System.out.println("执行clear()方法后");
        printBufferInfo(buffer);
    }
登录后复制

执行结果如下:
这里写图片描述

测试flip()方法:

    @Test
    public void testFlip() {
        CharBuffer buffer = CharBuffer.allocate(10);
        System.out.println("创建默认情况");
        printBufferInfo(buffer);        //put的方法会修改position的值
        buffer.put(&#39;H&#39;);
        buffer.put(&#39;E&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;O&#39;);
        System.out.println("调用put方法后:");
        printBufferInfo(buffer);        //flip():把limit设置为position,再把位置设置为0。
        buffer.flip();
        System.out.println("调用flip方法后:");
        printBufferInfo(buffer);
    }
登录后复制

执行结果如下:
这里写图片描述

测试rewind()方法

        @Test
    public void testRewind() {
        CharBuffer buffer = CharBuffer.allocate(10);
        System.out.println("创建默认情况");
        printBufferInfo(buffer);        //put的方法会修改position的值
        buffer.put(&#39;H&#39;);
        buffer.put(&#39;E&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;O&#39;);
        buffer.limit(8);
        System.out.println("调用put、limit方法后:");
        printBufferInfo(buffer);        //rewind():不改变limit,把位置设为0。
        buffer.rewind();
        System.out.println("调用rewind方法后:");
        printBufferInfo(buffer);
    }
登录后复制

执行结果如下:
这里写图片描述

测试compact()方法

    @Test
    public void testCompact(){
        CharBuffer buffer = CharBuffer.allocate(10);
        System.out.println("创建默认情况");
        printBufferInfo(buffer);        //put的方法会修改position的值
        buffer.put(&#39;H&#39;);
        buffer.put(&#39;E&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;O&#39;);
        buffer.limit(8);//修改limit的值
        System.out.println("调用put和limit方法后:");
        printBufferInfo(buffer);
        System.out.println("调用compact方法后:");        //将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。
        buffer.compact();
        printBufferInfo(buffer);
    }
登录后复制

这里写图片描述

这是JDK中介绍该方法的作用:

将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。即将索引 p = position() 处的字节复制到索引 0 处,将索引 p + 1 处的字节复制到索引 1 处,依此类推,直到将索引 limit() - 1 处的字节复制到索引 n = limit() - 1 - p 处。然后将缓冲区的位置设置为 n+1,并将其界限设置为其容量。如果已定义了标记,则丢弃它。

官方表示的太难理解了:

将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。并将limit(实际容量)设置为 capacity(最大容量)。执行compact()方法前,limit的值是:8,position的值是:5。按照上面描述的执行完compact()后,position的值计算方式是:n+1;n=limit-1-p;所有n=8-1-5=2,最后position的值为:2+1=3。和程序运行的结果一致。
可以在这种情况:从缓冲区写入数据之后调用此方法,以防写入不完整。

buf.clear();          // Prepare buffer for use
  while (in.read(buf) >= 0 || buf.position != 0) {
     buf.flip();
     out.write(buf);
     buf.compact();    // In case of partial write
 }
登录后复制

如果out.write()方法没有将缓存中的数据读取完,这个时候的position位置指向的是剩余数据的位置。达到防止写入不完整。

通道

作用:  连接缓冲区与数据源或数据目的地。

常用类:

Channel
 接口有下面两个子接口ReadableByteChannel和WritableByteChannel和一个抽象实现类SelectableChannel。
 在ReadableByteChannel接口中申明了read(ByteBuffer
 dst)方法。在WritableByteChannel接口中申明了write(ByteBuffer[]
 srcs):方法。SelectableChannel抽象类中主要方法,configureBlocking(boolean
 block)、register();方法。 ByteChannel
 接口继承了ReadableChannel和WritableChannel。所以ByteChannel具有读和写的功能。

ServerSocketChannel继承了SelectableChannel类抽象类,所以SocketChannel具有设置是否是阻塞模式、向selector注册事件功能。

SocketChannel也继承了SelectableChannel类还实现ByteChannel接口,所以SocketChannel具有设置是否是阻塞模式、向selector注册事件、从缓冲区读写数据的功能。

通过类图展现:
这里写图片描述

Selector类:

作用:只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监听这些事件的发生。

流程:
 Selector通过静态的open()方法创建一个Selector对象,SelectableChannel类向Selector注册了特定的事件。Selector就会监控这些事件发生,Selector通过select()监控已发生的SelectionKey对象的数目,通过selectKeys()方法返回对应的selectionKey对象集合。遍历该集合得到相应的selectionKey对象,通过该对象的channel()方法获取关联的SelectableChannel对象,
 通过selector()方法就可以获取关联的Selector对象。

Note:
当Selector的select()方法还有一个重载方式:select(long timeout)。并且该方法采用阻塞的工作方式,如果相关事件的selectionKey对象的数目一个也没有,就进入阻塞状态。知道出现以下情况之一,才从select()方法中返回。

  • 至少有一个SelectionKey的相关事件已经发生。

  • 其他线程调用了Selector的wakeup()方法,导致执行select()方法的线程立即返回。

  • 当前执行的select()方法的线程被中断。

  • 超出了等待时间。仅限调用select(long timeout)方法时出现。如果没有设置超时时间,则永远不会超时。

Selector类有两个非常重要的方法: 静态方法open(),这是Selector的静态工厂方法,创建一个Selector对象。
 selectedKeys()方法返回被Selector捕获的SelectionKey的集合。

SelectionKey类

作用:
 ServerSocketChannel或SocketChannel通过register()方法向Selector注册事件时,register()方法会创建一个SelectionKey对象,该对象是用来跟踪注册事件的句柄。在SelectionKey对象的有效期间,Selector会一直监控与SelectionKey对象相关的事件,如果事件发生,就会把SelectionKey对象添加到Selected-keys集合中。

SelectionKey中定义的事件: 定义了4种事件:
 1、SelectionKey.OP_ACCEPT:接收连接就绪事件,表示服务器监听到了客户连接,服务器可以接收这个连接了。常量值为16.
 2、SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功。常量值为8.
 3、SelectionKey.OP_READ:读就绪事件,表示通道中已经有了可读数据可以执行读操作。常量值为1.
 4、SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了。常量值为4.

常用方法:
 channel()方法:返回与它关联的SelectedChannel(包括ServerSocketChannel和SocketChannel)。
 selector()方法:返回与它关联的Selector对象。
 它们之间的关系如下:
这里写图片描述

 以上就是Java网络编程由浅入深三 一文了解非阻塞通信的图文代码示例详解的内容,更多相关内容请关注PHP中文网(www.php.cn)!


本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

Java Spring 面试题 Java Spring 面试题 Aug 30, 2024 pm 04:29 PM

在本文中,我们保留了最常被问到的 Java Spring 面试问题及其详细答案。这样你就可以顺利通过面试。

突破或从Java 8流返回? 突破或从Java 8流返回? Feb 07, 2025 pm 12:09 PM

Java 8引入了Stream API,提供了一种强大且表达力丰富的处理数据集合的方式。然而,使用Stream时,一个常见问题是:如何从forEach操作中中断或返回? 传统循环允许提前中断或返回,但Stream的forEach方法并不直接支持这种方式。本文将解释原因,并探讨在Stream处理系统中实现提前终止的替代方法。 延伸阅读: Java Stream API改进 理解Stream forEach forEach方法是一个终端操作,它对Stream中的每个元素执行一个操作。它的设计意图是处

手机上如何将XML转换成PDF? 手机上如何将XML转换成PDF? Apr 02, 2025 pm 10:18 PM

直接在手机上将XML转换为PDF并不容易,但可以借助云端服务实现。推荐使用轻量级手机App上传XML文件并接收生成的PDF,配合云端API进行转换。云端API使用无服务器计算服务,选择合适的平台至关重要。处理XML解析和PDF生成时需要考虑复杂性、错误处理、安全性和优化策略。整个过程需要前端App与后端API协同工作,需要对多种技术有所了解。

Java程序查找胶囊的体积 Java程序查找胶囊的体积 Feb 07, 2025 am 11:37 AM

胶囊是一种三维几何图形,由一个圆柱体和两端各一个半球体组成。胶囊的体积可以通过将圆柱体的体积和两端半球体的体积相加来计算。本教程将讨论如何使用不同的方法在Java中计算给定胶囊的体积。 胶囊体积公式 胶囊体积的公式如下: 胶囊体积 = 圆柱体体积 两个半球体体积 其中, r: 半球体的半径。 h: 圆柱体的高度(不包括半球体)。 例子 1 输入 半径 = 5 单位 高度 = 10 单位 输出 体积 = 1570.8 立方单位 解释 使用公式计算体积: 体积 = π × r2 × h (4

PHP与Python:了解差异 PHP与Python:了解差异 Apr 11, 2025 am 12:15 AM

PHP和Python各有优势,选择应基于项目需求。1.PHP适合web开发,语法简单,执行效率高。2.Python适用于数据科学和机器学习,语法简洁,库丰富。

创造未来:面向零基础的 Java 编程 创造未来:面向零基础的 Java 编程 Oct 13, 2024 pm 01:32 PM

Java是热门编程语言,适合初学者和经验丰富的开发者学习。本教程从基础概念出发,逐步深入讲解高级主题。安装Java开发工具包后,可通过创建简单的“Hello,World!”程序实践编程。理解代码后,使用命令提示符编译并运行程序,控制台上将输出“Hello,World!”。学习Java开启了编程之旅,随着掌握程度加深,可创建更复杂的应用程序。

如何在Spring Tool Suite中运行第一个春季启动应用程序? 如何在Spring Tool Suite中运行第一个春季启动应用程序? Feb 07, 2025 pm 12:11 PM

Spring Boot简化了可靠,可扩展和生产就绪的Java应用的创建,从而彻底改变了Java开发。 它的“惯例惯例”方法(春季生态系统固有的惯例),最小化手动设置

PHP:网络开发的关键语言 PHP:网络开发的关键语言 Apr 13, 2025 am 12:08 AM

PHP是一种广泛应用于服务器端的脚本语言,特别适合web开发。1.PHP可以嵌入HTML,处理HTTP请求和响应,支持多种数据库。2.PHP用于生成动态网页内容,处理表单数据,访问数据库等,具有强大的社区支持和开源资源。3.PHP是解释型语言,执行过程包括词法分析、语法分析、编译和执行。4.PHP可以与MySQL结合用于用户注册系统等高级应用。5.调试PHP时,可使用error_reporting()和var_dump()等函数。6.优化PHP代码可通过缓存机制、优化数据库查询和使用内置函数。7

See all articles