我是靠谱客的博主 香蕉鸭子,最近开发中收集的这篇文章主要介绍精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层1.概述2.AbstractPeer3.AbstractEndpoint4.Client5. Server6. Channel7. ChannelHandler8. Dispacher9. Codec,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.概述

dubbo-remoting-api 模块, transport 包,网络传输层。
涉及的类图如下:
在这里插入图片描述

2.AbstractPeer

实现 Endpoint、ChannelHandler 接口,Peer 抽象类。

/**
 * AbstractPeer
 *
 * Peer 抽象类
 */
public abstract class AbstractPeer implements Endpoint, ChannelHandler {

    /**
     * 通道处理器
     */
    private final ChannelHandler handler;
    /**
     * URL
     */
    private volatile URL url;
    /**
     * 正在关闭
     *
     * {@link #startClose()}
     */
    // closing closed means the process is being closed and close is finished
    private volatile boolean closing;
    /**
     * 关闭完成
     *
     * {@link #close()}
     */
    private volatile boolean closed;

    public AbstractPeer(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }
    }

使用装饰模式

3.AbstractEndpoint

实现 Resetable 接口,继承 AbstractPeer 抽象类,端点抽象类。

/**
 * AbstractEndpoint
 *
 * Endpoint 抽象类
 */
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

    private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);

    /**
     * 编解码器
     */
    private Codec2 codec;
    /**
     * 超时时间
     */
    private int timeout;
    /**
     * 连接超时时间
     */
    private int connectTimeout;

    public AbstractEndpoint(URL url, ChannelHandler handler) {
        super(url, handler);
        this.codec = getChannelCodec(url);
        this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }
    }

4.Client

4.1 AbstractClient

实现 Client 接口,继承 AbstractEndpoint 抽象类,客户端抽象类,重点实现了公用的重连逻辑,同时抽象了连接等模板方法,供子类实现。抽象方法如下:

 /**
     * 重连定时任务执行器
     */
    private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
    /**
     * 连接锁,用于实现发起连接和断开连接互斥,避免并发。
     */
    private final Lock connectLock = new ReentrantLock();
    /**
     * 发送消息时,若断开,是否重连
     */
    private final boolean send_reconnect;
    /**
     * 重连次数
     */
    private final AtomicInteger reconnect_count = new AtomicInteger(0);
    /**
     * 重连时,是否已经打印过错误日志。
     */
    // Reconnection error log has been called before?
    private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
    /**
     * 重连 warning 的间隔.(waring多少次之后,warning一次) //for test
     */
    // reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
    private final int reconnect_warning_period;
    /**
     * 关闭超时时间
     */
    private final long shutdown_timeout;
    /**
     * 线程池
     *
     * 在调用 {@link #wrapChannelHandler(URL, ChannelHandler)} 时,会调用 {@link com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler} 创建
     */
    protected volatile ExecutorService executor;
    /**
     * 重连执行任务 Future
     */
    private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
    /**
     * 最后成功连接时间
     */
    // the last successed connected time
    private long lastConnectedTime = System.currentTimeMillis();

    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        // 从 URL 中,获得重连相关配置项
        send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
        shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
        // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
        reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

        // 初始化客户端
        try {
            doOpen();
        } catch (Throwable t) {
            close(); // 失败,则关闭
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }

        // 连接服务器
        try {
            // connect.
            connect();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
            }
        } catch (RemotingException t) {
            if (url.getParameter(Constants.CHECK_KEY, true)) {
                close(); // 失败,则关闭
                throw t;
            } else {
                logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
            }
        } catch (Throwable t) {
            close(); // 失败,则关闭
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }

        // 获得线程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension()
                .get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
        ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension()
                .remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    }
}

连接服务器:

    protected void connect() throws RemotingException {
        // 获得锁
        connectLock.lock();
        try {
            // 已连接,
            if (isConnected()) {
                return;
            }
            // 初始化重连线程
            initConnectStatusCheckCommand();
            // 执行连接
            doConnect();
            // 连接失败,抛出异常
            if (!isConnected()) {
                throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
            // 连接成功,打印日志
            } else {
                if (logger.isInfoEnabled()) {
                    logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                            + ", channel is " + this.getChannel());
                }
            }
            // 设置重连次数归零
            reconnect_count.set(0);
            // 设置未打印过错误日志
            reconnect_error_log_flag.set(false);
        } catch (RemotingException e) {
            throw e;
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                    + ", cause: " + e.getMessage(), e);
        } finally {
            // 释放锁
            connectLock.unlock();
        }
    }

初始化重连线程:

 /**
     * init reconnect thread
     *
     * 初始化重连线程
     */
    private synchronized void initConnectStatusCheckCommand() {
        //reconnect=false to close reconnect
        // 获得获得重连频率,默认开启。
        int reconnect = getReconnectParam(getUrl());
        // 若开启重连功能,创建重连线程
        if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
            // 创建 Runnable 对象
            Runnable connectStatusCheckCommand = new Runnable() {
                public void run() {
                    try {
                        // 未连接,重连
                        if (!isConnected()) {
                            connect();
                        // 已连接,记录最后连接时间
                        } else {
                            lastConnectedTime = System.currentTimeMillis();
                        }
                    } catch (Throwable t) {
                        // 超过一定时间未连接上,才打印异常日志。并且,仅打印一次。默认,15 分钟。
                        String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                        // wait registry sync provider list
                        if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                            if (!reconnect_error_log_flag.get()) {
                                reconnect_error_log_flag.set(true);
                                logger.error(errorMsg, t);
                                return;
                            }
                        }
                        // 每一定次发现未重连,才打印告警日志。默认,1800 次,1 小时。
                        if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
                            logger.warn(errorMsg, t);
                        }
                    }
                }
            };
            // 发起定时任务
            reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
        }
    }

发送消息

@Override
public void send(Object message, boolean sent) throws RemotingException {
    // 未连接时,开启重连功能,则先发起连接
    if (send_reconnect && !isConnected()) {
        connect();
    }
    // 发送消息
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    channel.send(message, sent);
}

4.2 ClientDelegate

实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 client 属性的方法。

/**
 * ClientDelegate
 *
 * 客户端装饰者
 */
public class ClientDelegate implements Client {

    /**
     * 客户端
     */
    private transient Client client;

    public ClientDelegate() {
    }
   }.

5. Server

5.1 AbstractServer

实现 Server 接口,继承 AbstractEndpoint 抽象类,服务器抽象类,重点实现了公用的逻辑

public abstract class AbstractServer extends AbstractEndpoint implements Server {

    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";

    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);

    /**
     * 线程池
     */
    ExecutorService executor;
    /**
     * 服务地址
     */
    private InetSocketAddress localAddress;
    /**
     * 绑定地址
     */
    private InetSocketAddress bindAddress;
    /**
     * 服务器最大可接受连接数
     */
    private int accepts;
    /**
     * 空闲超时时间,单位:毫秒
     */
    private int idleTimeout; //600 seconds

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        // 服务地址
        localAddress = getUrl().toInetSocketAddress();
        // 绑定地址
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // 服务器最大可接受连接数
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        // 空闲超时时间
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);

        // 开启服务器
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }

        // 获得线程池
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }}

子类类图

在这里插入图片描述

5.2 ServerDelegate

实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 server 属性的方法。

/**
 * ServerDelegate
 *
 * 服务器装饰者
 */
public class ServerDelegate implements Server {

    /**
     * 服务器
     */
    private transient Server server;

    public ServerDelegate() {
    }
    }

6. Channel

6.1 AbstractChannel

实现 Channel 接口,实现 AbstractPeer 抽象类,通道抽象类。

/**
 * AbstractChannel
 *
 * Channel 抽象类
 */
public abstract class AbstractChannel extends AbstractPeer implements Channel {

    public AbstractChannel(URL url, ChannelHandler handler) {
        super(url, handler);
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (isClosed()) {
            throw new RemotingException(this, "Failed to send message "
                    + (message == null ? "" : message.getClass().getName()) + ":" + message
                    + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
        }
    }

    @Override
    public String toString() {
        return getLocalAddress() + " -> " + getRemoteAddress();
    }

}

具体的发送方法,子类实现。在 AbstractChannel 中,目前只做状态检查。

6.2 ChannelDelegate

现 Channel 接口,通道装饰者实现类。在每个实现的方法里,直接调用被装饰的 channel 属性的方法。


/**
 * ChannelDelegate
 *
 */
public class ChannelDelegate implements Channel {

    private transient Channel channel;

    public ChannelDelegate() {
    }
}

7. ChannelHandler

7.1 ChannelHandlerAdapter

实现 ChannelHandler 接口,通道处理器适配器,每个方法为空实现

7.2 ChannelHandlerDelegate

实现 ChannelHandler 接口,通道处理器装饰者接口。

7.2.1 AbstractChannelHandlerDelegate

实现 ChannelHandlerDelegate 接口,通道处理器装饰者抽象实现类。在每个实现的方法里,直接调用被装饰的 handler 属性的方法。

/**
 * 通道处理器装饰者抽象实现类
 */
public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {

    /**
     * 通道处理器
     */
    protected ChannelHandler handler;

    protected AbstractChannelHandlerDelegate(ChannelHandler handler) {
        Assert.notNull(handler, "handler == null");
        this.handler = handler;
    }

    @Override
    public ChannelHandler getHandler() {
        if (handler instanceof ChannelHandlerDelegate) {
            return ((ChannelHandlerDelegate) handler).getHandler();
        }
        return handler;
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        handler.connected(channel);
    }
}
7.2.2 DecodeHandler

实现 AbstractChannelHandlerDelegate 抽象类,解码处理器,处理接收到的消息,实现了 Decodeable 接口的情况。


/**
 * 解码处理器,处理接收到的消息,实现了 Decodeable 接口的情况。
 */
public class DecodeHandler extends AbstractChannelHandlerDelegate {

    private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        handler.received(channel, message);
    }

    private void decode(Object message) {
        if (message != null && message instanceof Decodeable) {
            try {
                ((Decodeable) message).decode(); // 解析消息
                if (log.isDebugEnabled()) {
                    log.debug(new StringBuilder(32).append("Decode decodeable message ").append(message.getClass().getName()).toString());
                }
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn(new StringBuilder(32).append("Call Decodeable.decode failed: ").append(e.getMessage()).toString(), e);
                }
            } // ~ end of catch
        } // ~ end of if
    } // ~ end of method decode

}

8. Dispacher

8.1 ChannelHandlers

通道处理器工厂。

/**
 * 通道处理器工厂
 */
public class ChannelHandlers {

    /**
     * 单例
     */
    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) { // for testing
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(
                new HeartbeatHandler(
                        ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
                )
        );
    }

}
8.2 Dispatcher 实现类

Dispatcher

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
8.2.1 AllDispatcher
public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}
8.2.1 AllDispatcher

实现 WrappedChannelHandler 抽象类。覆写 #connected(channel) 方法如下:

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}
8.2.1 ChannelEventRunnable

实现 Runnable 接口。

@Override
public void run() {
    switch (state) {
        case CONNECTED: handler.connected(channel); break;
        case DISCONNECTED:handler.disconnected(channel); break;
        case SENT:handler.sent(channel, message);break;
        case RECEIVED:handler.received(channel, message);break;
        case CAUGHT:handler.caught(channel, exception);break;
        default: logger.warn("unknown state: " + state + ", message is " + message);
    }
}

9. Codec

9.1 CodecSupport

编解码工具类,提供查询 Serialization 的功能。
初始化:


    /**
     * 序列化对象集合
     * key:序列化类型编号 {@link Serialization#getContentTypeId()}
     */
    private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
    /**
     * 序列化名集合
     * key:序列化类型编号 {@link Serialization#getContentTypeId()}
     * value: 序列化拓展名
     */
    private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
    
    static {
        // 基于 Dubbo SPI ,初始化
        Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
        for (String name : supportedExtensions) {
            Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
            byte idByte = serialization.getContentTypeId();
            if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
                logger.error("Serialization extension " + serialization.getClass().getName()
                        + " has duplicate id to Serialization extension "
                        + ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
                        + ", ignore this Serialization extension");
                continue;
            }
            ID_SERIALIZATION_MAP.put(idByte, serialization);
            ID_SERIALIZATIONNAME_MAP.put(idByte, name);
        }
    }

获取所有序列化方式

查找 Serialization 对象

public static Serialization getSerialization(URL url, Byte id) throws IOException {
    Serialization serialization = getSerializationById(id);
    String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); // 默认,hessian2
    // 出于安全的目的,针对 JDK 的序列化方式(对应编号为 3、4、7),检查连接到服务器的 URL 和实际传输的数据,协议是否一致。
    // https://github.com/apache/incubator-dubbo/issues/1138
    // Check if "serialization id" passed from network matches the id on this side(only take effect for JDK serialization), for security purpose.
    if (serialization == null
            || ((id == 3 || id == 7 || id == 4) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
        throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
    }
    return serialization;
}
9.2 AbstractCodec

实现 Codec2 接口,提供公用方法
子类类图:

在这里插入图片描述
编解码器的实现,通过继承的方式,获得更多的功能。每一个 Codec2 类实现对不同消息的编解码。通过协议头来判断,具体使用哪个编解码逻辑

    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) { // 请求
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) { // 响应
            encodeResponse(channel, buffer, (Response) msg);
        } else { // 提交给父类( Telnet ) 处理,目前是 Telnet 命令的结果。
            super.encode(channel, buffer, msg);
        }
    }

最后

以上就是香蕉鸭子为你收集整理的精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层1.概述2.AbstractPeer3.AbstractEndpoint4.Client5. Server6. Channel7. ChannelHandler8. Dispacher9. Codec的全部内容,希望文章能够帮你解决精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层1.概述2.AbstractPeer3.AbstractEndpoint4.Client5. Server6. Channel7. ChannelHandler8. Dispacher9. Codec所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(33)

评论列表共有 0 条评论

立即
投稿
返回
顶部