我是靠谱客的博主 老实舞蹈,最近开发中收集的这篇文章主要介绍Netty源码分析-数据处理器ChannelInboundHandler和ChannelOutboundHandler,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

概述

Channel接口针对Channel的读入和写出IO事件的处理,定义了两个拓展接口:ChannelInboundHandler用于定义对读入IO事件的处理,ChannelOutboundHandler用于定义写出IO事件的处理。

ChannelInboundHandler

ChannelInboundHandler接口定义

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

ChannelInboundHandlerAdapter

  • ChannelInboundHandler的默认实现类,方便使用,用户通过拓展该类,只重写与自身关注的IO事件的处理的方法。ChannelInboundHandlerAdapter的每个方法的默认实现都是通过ChannelHandlerContext将IO事件或接收到的数据,传给所在的ChannelPipeline的下一个ChannelInboundHandler:
    在这里插入图片描述

  • 业务处理逻辑处理:用户可以通过拓展ChannelInboundHandlerAdapter,重写相应的方法来,生成新的子类的方式来定义业务需要的处理逻辑,Netty默认针对特定功能的处理,提供了一些ChannelInboundHandler的实现类,详见下面分析。

  • 对于从Channel读入的数据,在调用channelRead方法处理时,默认实现也是传给下一个ChannelInboundHandler处理,不会销毁该数据对象,释放掉该数据所占用的空间的:
    在这里插入图片描述
    如果数据只需在当前ChannelInboundHandler处理,而不需要继续往下传输,则可以调用ReferenceCountUtil.release(msg)手动释放掉,或者拓展SimpleChannelInboundHandler类。

  • SimpleChannelInboundHandler
    在这里插入图片描述
    SimpleChannelInboundHandler为ChannelInboundHandlerAdapter的一个拓展实现,重写了channelRead方法,并提供了一个抽象方法channelRead0(在netty 5.0之后channelRead0方法名称变成了messageReceived)供用户实现自身的数据处理逻辑:
    在这里插入图片描述
    针对Channel读入的数据的处理,提供了两个功能:

    1. 特定类型数据处理:用户通过拓展实现SimpleChannelInboundHandler来定义业务处理的ChannelInboundHandler时,通过指定泛型参数I,限制该ChannelInboundHandler只处理Channel读入的类型为I的数据;
    2. 自动释放数据:如果Channel读入的数据的类型与泛型参数I匹配,则在该ChannelInboundHandler处理掉了,在finally中,根据调用构造函数创建ChannelInboundHandler时,是否需要自动释放数据(默认为true),来自动进行数据空间的释放。
    3. 数据的保留:如果用户在实现channelRead0方法自定义数据处理逻辑时,需要将该数据传给下一个ChannelInboundHandler,则需要调用ReferenceCountUtil.retain(msg)方法,原理是将msg的引用计数加1,因为ReferenceCountUtil.release(msg)是将msg的引用计数减1,同时当引用计数变成0时,释放该数据:
 public class StringHandler extends SimpleChannelInboundHandler<String> {

     @Override
     protected void channelRead0(ChannelHandlerContext ctx, String message)
             throws Exception {
         System.out.println(message);
         // 不释放数据,交给下一个ChannelInboundHandler继续处理
         ReferenceCountUtil.retain(message);
         ctx.fireChannelRead(message);
     }
 }

ChannelOutboundHandler

ChannelOutboundHandler接口定义

/**
 * {@link ChannelHandler} which will get notified for IO-outbound-operations.
 */
public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * Called once a bind operation is made.
     *
     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
     * @param localAddress  the {@link SocketAddress} to which it should bound
     * @param promise       the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception    thrown if an error occurs
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a connect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
     * @param remoteAddress     the {@link SocketAddress} to which it should connect
     * @param localAddress      the {@link SocketAddress} which is used as source on connect
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a disconnect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error occurs
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

ChannelOutboundHandlerAdapter

在这里插入图片描述
与ChannelInboundHandlerAdapter的作用类似,方法默认实现也是通过ChannelHandlerContext将写出的数据,交给下一个ChannelOutboundHandler处理。

ChannelDuplexHandler

在这里插入图片描述
同时具有ChannelInboundHandler和ChannelInboundHandler的功能,可以实现相应方法,对Channel的读入数据和写出数据都进行处理,默认实现也是交给下一个ChannelHandler处理。

Netty提供的ChannelHandler实现

Netty在handler子模块,针对不同的功能,包括流量控制,数据flush,ip过滤,日志,ssl,大数据流处理,超时心跳检测,拥塞控制,提供了相应的ChannelHandler实现类,如图:
在这里插入图片描述

  • 示例:心跳检测与长连接维持
    Netty源码分析-基于Netty的心跳检测机制实现长连接

最后

以上就是老实舞蹈为你收集整理的Netty源码分析-数据处理器ChannelInboundHandler和ChannelOutboundHandler的全部内容,希望文章能够帮你解决Netty源码分析-数据处理器ChannelInboundHandler和ChannelOutboundHandler所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(62)

评论列表共有 0 条评论

立即
投稿
返回
顶部