netty之数据读写源码阅读

2023-09-21 09:10:14

数据读写

write

从client端的写开始看

client与服务端建立完connect后可以从future里拿到连接的channel对象。这里的channel是io.netty.channel.Channel对象。

调用其channel.writeAndFlush(msg);方法可以进行数据发送。

writeAndFlush会调用pipeline的writeAndFlush方法

public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}

pipeline实现是DefaultChannelPipeline类,其writeAndFlush方法如下

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

我们回顾下pipeline的初始化,默认会设置两个handler,tail和head。tail是Inbound类型的handler。head既是outbound又是inbound类型的handler

protected DefaultChannelPipeline(Channel channel) {
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

写数据是从handler的tail开始的。

tail里的write方法会先创建一个promise方法,然后调用write方法,最后返回promise。

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);
    return promise;
}

write方法在父类AbstractChannelHandlerContext的默认实现。这里也是handler责任链式递归调用主要方法。每一个handler都有该write方法(都包装成HandlerContext),当自身invokeWriteAndFlush自行完后会继续调用write方法获取next handler。

private void write(Object msg, boolean flush, ChannelPromise promise) {
    //从tail往前找outBound类型的handler
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    //拿出来handler绑定的线程executor
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) { //判断是不是当前线程是不是eventLoop绑定的线程
        if (flush) {//需要flush调用writeflush方法
            next.invokeWriteAndFlush(m, promise);
        } else {//不需要flush
            next.invokeWrite(m, promise);
        }
    } else {//不是同一个线程,构造task放入线程执行队列里
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

write方法首先从tail往前找下一个outBound类型的handler。如果我们在初始化client连接的时候没有往pipeline里新加入outBound类的handler,那么这里找到的就是head。

再往下拿出的executor这里是channel绑定的nioEvenLoop对象。在前面channel启动过程我们知道,Bootstrap会绑定一个EventLoopGroup。新一个channel,EventLoopGroup会拿出一个child与之进行绑定,child是单线程的executor实现。需要执行的task会先加入taskQueue。这里的executor就是一个child,NioEventLoop类型。

由于我们当前调用write的线程是业务线程,executor.inEventLoop()这一步判断(判断当前线程和NioEventLoop线程池中的线程是否是同一个线程)是不成立的,所以会走else,构造一个task添加到taskQueue里。然后wakeup NioEventLoop里的监听线程执行任务。这些都是前面分析server启动代码流程。

再来看WriteTask里的run方法

public void run() {
    try {
        decrementPendingOutboundBytes();
        if (size >= 0) {
            ctx.invokeWrite(msg, promise);
        } else {
            ctx.invokeWriteAndFlush(msg, promise);
        }
    } finally {
        recycle();
    }
}

也是调用当前handler的invokeWrite或invokeWriteAndFlush方法。和上面if成立是逻辑一致。

HeadContext-write

假如我们这里没有往pipeline里添加任何handler。按照逻辑找到的next就是head。会调用head的invokeWriteAndFlush方法。

void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

这里invokehandler()是成立的,主要判断当前handler的状态。

invokeWrite0方法就是调用Context对应的handler的write方法。

headhandlerwrite方法

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

unsafe的write

public final void write(Object msg, ChannelPromise promise) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    int size;
    try {//过滤消息
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            safeSetFailure(promise, t);
        }
        return;
    }
    outboundBuffer.addMessage(msg, size, promise);
}

断点跟踪发现,在执行filterOutboundMessage()方法这里就异常终止了。

protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        return newDirectBuffer(buf);
    }
    if (msg instanceof FileRegion) {
        return msg;
    }
    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

这个方法主要对msg类型进行了判断。在最开始调用channel发送数据的时候传入的一个字符串,不符合可以传输的两个类型,抛出了UnsupportedOperationException。

数据包装

看来在初始化pipeline的时候还是需要搞一个outboudhandler进行数据的包装。这里我们使用netty自带的StringEncoder进行数据包装

channel.pipeline()
        .addLast(new StringEncoder())
        .addLast(new StringDecoder());

那这样从tail找outbound就找到了StringEncoder。StringEncoder继承自MessageToMessageEncoder。

StringEncoder extends MessageToMessageEncoder<CharSequence>{}

这里有个泛型类型,下面会校验当前消息类型是该泛型的子类

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    CodecOutputList out = null;
    try {
        if (acceptOutboundMessage(msg)) {//判断消息类型是否可处理,这里判断需要是CharSequence
            out = CodecOutputList.newInstance();
            //I是泛型CharSequence
            I cast = (I) msg;
            //encode编码转换
            encode(ctx, cast, out);   
            }
        } else {
            ctx.write(msg, promise);
        }
    } finally {
        if (out != null) {
            try {//out的size是1,sizeMinusOne = 0
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne == 0) { //走if
                    //这里write方法就会递归的调用下一个handler的write方法
                    ctx.write(out.getUnsafe(0), promise);
                } else if (sizeMinusOne > 0) {
                    if (promise == ctx.voidPromise()) {
                        writeVoidPromise(ctx, out);
                    } else {
                        writePromiseCombiner(ctx, out, promise);
                    }
                }
            } finally {
                out.recycle();
            }
        }
    }
}

encode方法只是将msg转成bytebuf类型,放到out里

protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
    out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}

被包装成ByteBuf后,调用ctx.write进行调用下一个handler。这里StringEncoder只是对msg进行了封装,就又回到了head handler里。

数据缓存

在回到head handler里。这时候filterOutboundMessage()过滤消息就不会报错了。然后会调用outboundBuffer.addMessage(msg, size, promise);

这里addMessage就是构造一个entry,然后将entry放到链表尾部。到这里整个write方法就执行完了,从头到尾只是把数据组装,并没有数据流的操作.

数据发送

context的invokeWriteAndFlush方法有两步。上面看完了invokeWrite0方法只是组装数据,发送其实在invokeFlush0方法里

invokeWriteAndFlush(){
  invokeWrite0(msg, promise);
  invokeFlush0();
}

这里invokeWriteAndFlush的开始,是从tail开始找的next,也就是对应我们这里设定的StringEncoder。invokeWrite0()和invokeFlush0()方法都是递归往后调,直到head。write看完了,下面看invokeFlush0方法。

invokeFlush0()方法会调用当前Context对应handler的flush方法。

((ChannelOutboundHandler) handler()).flush(this);

StringEncoder的flush方法

public void flush(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
}

这里什么也没做,只是调ctx.flush方法调起下一个handler。

flush实现在里AbstractChannelHandlerContext里

public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeFlush();
    } else {
        Tasks tasks = next.invokeTasks;
        if (tasks == null) {
            next.invokeTasks = tasks = new Tasks(next);
        }
        safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
    }

    return this;
}

这里和write方法是相似的,也是MASK_FLUSH匹配的outbound然后递归调用invokeFlush方法,这里最终会调到handler的flush方法。

下一个HeadContext的flush方法

public void flush(ChannelHandlerContext ctx) {
    unsafe.flush();
}

最后还是unsafe的flush方法

public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    //好像只是做了一些标记
    outboundBuffer.addFlush();
    flush0();
}

outboundBuffer.addFlush()这里标记了flushedEntry位置。将flushedEntry指向unflushedEntry。并修改每个entry对应的promise为不可取消

再往下调用调用unsafe.flush0()

flush0()方法

protected void flush0() {
    //...
    try {//dowrite方法了
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        handleWriteError(t);
    } finally {
        inFlush0 = false;
    }
}

然后doWrite方法:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    //获取写循环的次数,默认16
    int writeSpinCount = config().getWriteSpinCount();
    do {//循环处理待写数据
        if (in.isEmpty()) {
            // All written so clear OP_WRITE
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }

        // 获取每个ByteBuf最大字节数
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        //转换成ByteBuffer
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        int nioBufferCnt = in.nioBufferCount();

        // Always use nioBuffers() to workaround data-corruption.
        // See https://github.com/netty/netty/issues/2761
        switch (nioBufferCnt) {
            case 0:
                // We have something else beside ByteBuffers to write so fallback to normal writes.
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                // Only one ByteBuf so use non-gathering write
                // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                // to check if the total size of all the buffers is non-zero.
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                //channel写数据
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                // to check if the total size of all the buffers is non-zero.
                // We limit the max amount to int above so cast is safe
                long attemptedBytes = in.nioBufferSize();
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);
    //循环16次还未写完生成task执行
    incompleteWrite(writeSpinCount < 0);
}

这里终于看到了channel.write()方法进行写数据。数据都存储在ChannelOutboundBuffer中。通过其nioBuffers()方法将缓冲数据转换成ByteBuffer[] buffers。将buffers写出到channel。最后调用removeBytes()方法将已写出数据从缓冲区刷出清理。

数据写出整个流程

在这里插入图片描述

read

读和写类似,selector监听到read事件后最终调用unsafe.read()进行读数据操作。这里unsafe的实例是NioByteUnsafe类型。首先读取数据到ByteBuf,然后从head开始递归调用pipleine里的handler进行消息的处理。

流程如下:

在这里插入图片描述

更多推荐

第74步 时间序列建模实战:多步滚动预测 vol-2(以决策树回归为例)

基于WIN10的64位系统演示一、写在前面上一期,我们讲了多步滚动预测的第一种策略:对于重复的预测值,取平均处理。例如,(1,2,3)预测出3.9和4.5,(2,3,4)预测出5.2和6.3,那么拼起来的结果就是3.9,(4.5+5.2)/2,6.3。这一期,我们来介绍第二种策略:删除一半的输入数据集。例如,4,5由(

【网络八股】TCP八股

网络八股请简述TCP/IP模型中每层的作用,典型协议和典型设备介绍一下三次握手的过程介绍一下四次挥手的过程必须三次握手吗,两次不行吗?为什么ACK数据包消耗TCP的序号吗三次握手中可以携带应用层数据吗四次挥手时,可以携带应用层数据吗?主动断开连接方,为什么要等待2MSL时间?如果服务端不进行accept,那么最多可以完

如何压缩图片大小?图片太大这样压缩

过大的图片文件不仅会占用大量的存储空间,还会影响图片传输和处理效率。为了解决这个问题,下面给大家分享几个图片压缩的方法,帮助你轻松解决图片过大带来的困扰。一、使用嗨格式压缩大师这是一款专业的图片压缩工具,采用先进的压缩算法,能够在保证图片质量的同时,快速高效地降低图片的文件大小。1、在电脑上打开软件后,选择“图片压缩”

激活函数总结(四十):激活函数补充(AHAF、SERLU)

激活函数总结(四十):激活函数补充1引言2激活函数2.1AHAF激活函数2.2SERLU激活函数3.总结1引言在前面的文章中已经介绍了介绍了一系列激活函数(Sigmoid、Tanh、ReLU、LeakyReLU、PReLU、Swish、ELU、SELU、GELU、Softmax、Softplus、Mish、Maxout

MySQL使用Xtrabackup备份到AWS存储桶

1.安装Xtrabackupcd/tmpwgethttps://downloads.percona.com/downloads/Percona-XtraBackup-8.0/Percona-XtraBackup-8.0.33-28/binary/redhat/7/x86_64/percona-xtrabackup-80

【山河送书第十二期】:《巧用ChatGPT快速搞定数据分析》参与活动,送书两本!!

【山河送书第十二期】:《巧用ChatGPT快速搞定数据分析》参与活动,送书两本!!关键亮点内容简介作者简介购买链接参与方式往期赠书回顾关键亮点用ChatGPT颠覆数据分析,1分钟生成数据分析结果!30多个精心挑选的ChatGPT数据分析案例+50多种ChatGPT数据分析策略涵盖从数据预处理到高级分析的全过程助你在竞争

【Python】pyecharts 模块 ② ( 命令行安装 pyecharts 模块 | PyCharm 安装 pyecharts 模块 )

文章目录一、命令行安装pyecharts模块1、安装过程2、命令行验证pyecharts模块是否安装成功二、PyCharm安装pyecharts模块1、通过错误提示安装2、在Settings设置界面安装pyecharts画廊网站:https://gallery.pyecharts.org/#/在该网站可查看官方示例一、

SAP服务器文件管理

SAP服务器文件管理文件说明:对于SAP服务器的文件管理,系统给出3个事物码,分别是显示目录的AL11,下载文件的CG3Y和上传文件的CG3Z。AL11显示目录:以查找系统参数文件为例,在前台执行事物码AL11进入,如图所示,这里显示的是根目录列表,在此找到DIR_SETUPS条目。右边显示相应目录,用鼠标点击进入。此

操作系统之——中断

一.概念铺垫1.操作系统在了解中断之前,我们先来了解一下操作系统.🫡首先,操作系统是一种软件,它控制和管理计算机硬件和软件资源,为用户和应用程序提供统一的接口和环境。它负责管理计算机的内存、文件系统、输入输出设备、进程调度、网络通信等功能。操作系统使得计算机可以高效地运行各种应用程序,并提供了用户与计算机之间的交互界

《C++标准库第2版》3.2 虽旧犹新的语言特性 笔记

3.2虽旧犹新的语言特性非类型模板参数1.除了类型参数之外,我们也可以为template使用nontypeparamatter.2.非类型参数看作是template类型的一部分bitset<32>flags32;bitset<50>flags50;//这两个看作是两个不同类型的template模板参数默认值classt

详解junit

目录1.概述2.断言3.常用注解3.1.@Test3.2.@Before3.3.@After3.4.@BeforeClass3.5.@AfterClass4.异常测试5.超时测试6.参数化测试1.概述什么是单元测试:单元测试,是针对最小的功能单元编写测试代码,在JAVA中最小的功能单元是方法,单元测试针对JAVA就是单

热文推荐