我是靠谱客的博主 爱撒娇烧鹅,最近开发中收集的这篇文章主要介绍Netty学习二-核心类简介1 ByteBuf2 ChannelPipeline3 ChannelHandler,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1 ByteBuf

netty中的ByteBuf主要是为了替换JDK自带的 ByteBuffer。 ByteBuffer有如下缺点:

  • ByteBuffer 长度固定,一旦分配完成不能动态扩容缩容。
  • ByteBuffer 只有一个标识位置的指针position,读写的时候需要手动调用flip()和rewind()等。开发者必须小心地使用这些API,否则很容易出现程序处理失败
  • ByteBuffer 功能有限,一些高级特性不支持,需要开发者自己实现

1.1 工作原理

ByteBuf维护了读指针(readerIndex) 和写指针(writerIndex) 来维护读读写标识位。readerIndex 和 writerIndex 初始值都是0,随着数据的写入,writerIndex 会增加。读数据会使 readerIndex增加。读取数据后,0~readerIndex 这部分数据被视为 discard 的,调用 discardReadBytes 方法可以释放这部分空间。readerIndex 和 writerIndex之间的数据是可读的,writerIndex 和 capacity 之间的数据是可写的。通过读写指针的设计避免了频繁的调用 flip(),简化了缓冲区的读写操作。

  • 初始化后的ByteBuf
    在这里插入图片描述
  • 写入N个字节后的 ByteBuf
    在这里插入图片描述
  • 读取M个字节后的 ByteBuf
    在这里插入图片描述
  • 调用 discardReadBytes 后的ByteBuf
    在这里插入图片描述
  • 调用 clear 后的 ByteBuf
    在这里插入图片描述

1.2 ByteBuf功能介绍

1.2.1 顺序读操作

方法名称返回值方法说明
readBooleanboolean从 readIndex 开始获取 boolean 值,readIndex + 1
readByte/readUnsignedBytebyte从 readIndex 开始获取字节值/无符号字节值,readIndex + 1
readShort/readUnsignedShortshort从 readIndex 开始获取短整型值/无符号短整型值,readIndex + 2
readInt/readUnsignedIntint从 readIndex 开始获取整型值/无符号整型值,readIndex + 4
readLonglong从 readIndex 开始获取长整型值,readIndex + 8
readCharchar从 readIndex 开始获取字符值,readIndex + 2
readFloatfloat从 readIndex 开始获取浮点值,readIndex + 4
readDoubledouble从 readIndex 开始获取双精度浮点值,readIndex + 8
readBytes(int length)ByteBuf从 readIndex 开始读取 length长度到一个新的 ByteBuf 中
readSlice(int length)ByteBuf从 readIndex 开始读取 length长度到一个新的 ByteBuf 中,子区域与原区域共享缓冲区,但是维护自己的 readIndex 和 writeIndex
readBytes(byte[] dest)ByteBuf从 readIndex 开始读取数据到目标byte数组中,读取字节数长度dest.length

1.2.2 顺序写操作

方法名称返回值方法说明
writeBoolean(boolean value)ByteBuf将value写入到 ByteBuf 中,writerIndex + 1
writeByte(byte value)ByteBufByteBuf
readShort(short value)ByteBuf将 value 写入到 ByteBuf 中,writerIndex + 2
writeInt(int value)ByteBuf将 value 写入到 ByteBuf 中,writerIndex + 4
writeLong(long value)ByteBuf将 value 写入到 ByteBuf 中,writerIndex + 8
writeChar(char value)ByteBuf将 value 写入到 ByteBuf 中,writerIndex + 2
writeFloat(float value)ByteBuf将 value 写入到 ByteBuf 中,writerIndex + 4
writeDouble(double value)ByteBuf将 value 写入到 ByteBuf 中,writerIndex + 8
writeBytes(ByteBuf src)ByteBuf将 src 中所有可读字节写入到当前 ByteBuf writerIndex + src.readableBytes
writeBytes(byte[] src)ByteBuf将 src 中字节写入到当前 ByteBuf writerIndex + src.length

1.2.3 mark 和 reset

mark 操作会将当前指针备份到 positionIndex 中,reset 会将指针恢复到备份值。ByteBuf 中主要有四个相关的方法:markReaderIndex、markWriterIndex、resetReaderIndex、resetWriterIndex

1.2.4 随机读写

除了顺序读写外,ByteBuf还支持随机读写(getXXX、setXXX),可以任意指定读写位置。随机读写同样会校验索引、长度合法性,但是不会修改 readerIndex 和 writerIndex 的值

1.3 ByteBuf分类

从内存分配来看,ByteBuf可以分为两类

  • 堆内存字节缓冲区:优点是内存分配和回收速度快,可以被 JVM 自动回收。缺点是如果进行 IO 操作需要额外做一次内存复制,将堆内存对应的缓冲区复制到内核 channel 中,性能会有一定程度的下降
  • 直接内存字节缓冲区:利用堆外内存分配空间,优点是 IO 操作的时候少了一次内存复制。缺点是分配和回收速度会慢一些。

实践表明 IO 通信线程中使用直接内存缓冲区,后端业务消息编解码模块儿使用堆内存字节缓冲区,这样组合可以达到性能最优

从内存回收角度,ByteBuf 也可以分为两类

  • 基于对象池的 ByteBuf:优点是维护了一个内存池,可以循环利用创建的 ByteBuf,提升内存使用效率。缺点是内存池管理比较复杂
  • 普通 ByteBuf:优点是使用比较简单。缺点是不能循环利用

2 ChannelPipeline

2.1 概述

ChannelPipeline 是 ChannelHandler 的容器,它负责 ChannelHandler 的管理和事件拦截。用户不用自己创建,ServerBootStrap 或 BootStrap 启动的时候会为每个 Channel创建一个独立的 pipeline,使用者只需要将自定义拦截器添加到pipeline中即可

        serverBootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        List<ChannelHandler> handlers = supplier.get();
                        for (ChannelHandler handler: handlers) {
                            socketChannel.pipeline().addLast(handler);
                        }
                    }
                });

ChannelPipeline 是线程安全的,N 个业务线可以并发操作而不会出现线程安全问题。但是 ChannelHandler 不是线程安全的,需要用户自己保证

2.1 事件处理流程

  • 底层 SocketChannel read 方法读取 ByteBuf,触发 channelRead 事件,由 IO 线程 NioEventLoop 调用 ChannelPipeline 的 fireChannelRead 方法,将消息传到 ChannelPipeline 中。
  • 消息依次经过 HeadHandler、ChannelHandler1、ChannelHandler2……TailHandler 处理。任何一个处理有异常都会导致消息中断
  • 调用 ChannelHandlerContext write 方法发送消息,从 TailHandler 开始到 HeadHandler 最终被添加到消息缓冲区等待消息发送

在这里插入图片描述

3 ChannelHandler

ChannelHandler 类似于 Servlet 中的 Filter 过滤器,负责对 IO 事件或者 IO 操作进行拦截处理。可以选择自己关注的事件进行处理,也可以透传和终止事件传递。常用的编解码器都是一个个 ChannelHandler

3.1 ByteToMessageDecoder

将 ByteBuf 解码成 POJO 对象,核心代码如下

3.1.1 channelRead入口

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
        	// 只解析 ByteBuf 数据
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                	// 如果 cumulation 为空则说明是第一个消息或者上个消息已经解码完毕
                    cumulation = data;
                } else {
                	// 如果 cumulation 不为空则表示上个消息没有解析完全,将新收到的消息拼接到后面
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 解码
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                	// 如果消息已经读取完毕则释放
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // 如果消息没有读取完成且读取次数超过阈值(默认16)则释放已经度过的数据,避免 OOM
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

3.1.2 callDecode解码

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
        	// 循环读取,直到消息读取完毕
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                	// 将解析后的消息向后传
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
				
				// 记录旧的读指针,用于后面判断是否继续解析
                int oldInputLength = in.readableBytes();

				// 调用 decode 方法,由不同的子类来实现
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                	// 如果 outSize 大小没有变化则根据不同的情况判断是否继续解析
                    if (oldInputLength == in.readableBytes()) {
                    	// 经过解码后如果读指针仍未发生变化则直接 break,中断解析
                        break;
                    } else {
                    	// 经过解码后如果读指针发生变化则仍需要解析
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

3.2 LengthFieldBaseFreamDecoder

3.2.1 核心参数简介

LengthFieldBaseFreamDecoder参数比较多,核心参数如下

  • maxFrameLength:最大消息长度
  • lengthFieldOffset:表示长度信息的数据偏移量
  • lengthFieldLength:表示长度信息的数据长度
  • lengthAdjustment:长度调整值。计算公式:lengthAdjustment = [整个消息长度(包含长度位) - 长度位表示长度] - 长度位长度 - 长度位偏移量

通常情况下,长度位表示长度不包含长度位长度,这个时候 整个消息长度(包含长度位) - 长度位表示长度 = 长度位长度,如果长度位偏移量为0,则 lengthAdjustment = 0。该公式可以进一步简化:

  • 长度位表示长度不包含长度位长度:- 长度位偏移量
  • 长度位表示长度包含长度位长度:- 长度位长度 - 长度位偏移量
  • initialBytesToStrip:接收到的发送数据包,去除前initialBytesToStrip位

3.2.2 decode 代码

LengthFieldBaseFreamDecoder 继承了 ByteToMessageDecoder,这里看下 decode 逻辑即可

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            discardingTooLongFrame(in);
        }

        if (in.readableBytes() < lengthFieldEndOffset) {
        	// 如果可读字节量 < 长度字节截止位置则直接返回
            return null;
        }
		
		// 根据长度偏移量计算实际长度偏移量
        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;

		// 获取消息长度值
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

        if (frameLength < 0) {
            failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
        }
        
		// 长度值调整
        frameLength += lengthAdjustment + lengthFieldEndOffset;

        if (frameLength < lengthFieldEndOffset) {
            failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
        }

        if (frameLength > maxFrameLength) {
            exceededFrameLength(in, frameLength);
            return null;
        }

        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }

        if (initialBytesToStrip > frameLengthInt) {
            failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
        }

		// 跳过一些字节不解析
        in.skipBytes(initialBytesToStrip);

        // 解析字节片段
        int readerIndex = in.readerIndex();
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

3.3 MessageToMessageDecoder

将一个 POJO 转换为另一个POJO

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                	// 解码
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
        	// 将解码后的对象传到后面的 Handler
            int size = out.size();
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
            out.recycle();
        }
    }

MessageToMessageDecoder 逻辑比较简单,但是要注意拆包、粘包的问题,进入该解码器的消息一定要是整包消息

3.4 MessageToByteEncoder

把 POJO 对象转换为 ByteBuf 以便进行网络传输。

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                // 分配一个字节缓冲区
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                	// 把对象进行编码
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                	// 写入字节缓冲区
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }

3.5 MessageToMessageEncoder

将一个编码对象转换为另一个编码对象

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                	// 把对象进行编码
                    encode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (out.isEmpty()) {
                    out.recycle();
                    out = null;

                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
            if (out != null) {
            	// 将 list 循环写出,交给后面的handler处理
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne == 0) {
                    ctx.write(out.getUnsafe(0), promise);
                } else if (sizeMinusOne > 0) {
                    if (promise == ctx.voidPromise()) {
                        writeVoidPromise(ctx, out);
                    } else {
                        writePromiseCombiner(ctx, out, promise);
                    }
                }
                out.recycle();
            }
        }
    }

3.6 LengthFieldPrepender

LengthFieldPrepender 解码器用于在消息前面添加一个长度字节。该解码器继承了MessageToMessageEncoder,流到该编码器的消息一定要是整包消息

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
    	// 计算消息长度
        int length = msg.readableBytes() + lengthAdjustment;
        if (lengthIncludesLengthFieldLength) {
        	// 如果长度包括长度字段的长度则增加长度字段的长度
            length += lengthFieldLength;
        }

        checkPositiveOrZero(length, "length");
		
		// 写入长度信息
        switch (lengthFieldLength) {
        case 1:
            if (length >= 256) {
                throw new IllegalArgumentException(
                        "length does not fit into a byte: " + length);
            }
            out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
            break;
        case 2:
            if (length >= 65536) {
                throw new IllegalArgumentException(
                        "length does not fit into a short integer: " + length);
            }
            out.add(ctx.alloc().buffer(2).order(byteOrder).writeShort((short) length));
            break;
        case 3:
            if (length >= 16777216) {
                throw new IllegalArgumentException(
                        "length does not fit into a medium integer: " + length);
            }
            out.add(ctx.alloc().buffer(3).order(byteOrder).writeMedium(length));
            break;
        case 4:
            out.add(ctx.alloc().buffer(4).order(byteOrder).writeInt(length));
            break;
        case 8:
            out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
            break;
        default:
            throw new Error("should not reach here");
        }
        // 添加消息
        out.add(msg.retain());
    }

最后

以上就是爱撒娇烧鹅为你收集整理的Netty学习二-核心类简介1 ByteBuf2 ChannelPipeline3 ChannelHandler的全部内容,希望文章能够帮你解决Netty学习二-核心类简介1 ByteBuf2 ChannelPipeline3 ChannelHandler所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部