概述
1.概述
dubbo-remoting-api 模块, transport 包,网络传输层。
涉及的类图如下:
2.AbstractPeer
实现 Endpoint、ChannelHandler 接口,Peer 抽象类。
/**
* AbstractPeer
*
* Peer 抽象类
*/
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
/**
* 通道处理器
*/
private final ChannelHandler handler;
/**
* URL
*/
private volatile URL url;
/**
* 正在关闭
*
* {@link #startClose()}
*/
// closing closed means the process is being closed and close is finished
private volatile boolean closing;
/**
* 关闭完成
*
* {@link #close()}
*/
private volatile boolean closed;
public AbstractPeer(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
}
使用装饰模式
3.AbstractEndpoint
实现 Resetable 接口,继承 AbstractPeer 抽象类,端点抽象类。
/**
* AbstractEndpoint
*
* Endpoint 抽象类
*/
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {
private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);
/**
* 编解码器
*/
private Codec2 codec;
/**
* 超时时间
*/
private int timeout;
/**
* 连接超时时间
*/
private int connectTimeout;
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
this.codec = getChannelCodec(url);
this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
}
4.Client
4.1 AbstractClient
实现 Client 接口,继承 AbstractEndpoint 抽象类,客户端抽象类,重点实现了公用的重连逻辑,同时抽象了连接等模板方法,供子类实现。抽象方法如下:
/**
* 重连定时任务执行器
*/
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
/**
* 连接锁,用于实现发起连接和断开连接互斥,避免并发。
*/
private final Lock connectLock = new ReentrantLock();
/**
* 发送消息时,若断开,是否重连
*/
private final boolean send_reconnect;
/**
* 重连次数
*/
private final AtomicInteger reconnect_count = new AtomicInteger(0);
/**
* 重连时,是否已经打印过错误日志。
*/
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
/**
* 重连 warning 的间隔.(waring多少次之后,warning一次) //for test
*/
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period;
/**
* 关闭超时时间
*/
private final long shutdown_timeout;
/**
* 线程池
*
* 在调用 {@link #wrapChannelHandler(URL, ChannelHandler)} 时,会调用 {@link com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler} 创建
*/
protected volatile ExecutorService executor;
/**
* 重连执行任务 Future
*/
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
/**
* 最后成功连接时间
*/
// the last successed connected time
private long lastConnectedTime = System.currentTimeMillis();
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 从 URL 中,获得重连相关配置项
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
// 初始化客户端
try {
doOpen();
} catch (Throwable t) {
close(); // 失败,则关闭
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
// 连接服务器
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close(); // 失败,则关闭
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close(); // 失败,则关闭
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
// 获得线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension()
.get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension()
.remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
}
连接服务器:
protected void connect() throws RemotingException {
// 获得锁
connectLock.lock();
try {
// 已连接,
if (isConnected()) {
return;
}
// 初始化重连线程
initConnectStatusCheckCommand();
// 执行连接
doConnect();
// 连接失败,抛出异常
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
// 连接成功,打印日志
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
// 设置重连次数归零
reconnect_count.set(0);
// 设置未打印过错误日志
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
// 释放锁
connectLock.unlock();
}
}
初始化重连线程:
/**
* init reconnect thread
*
* 初始化重连线程
*/
private synchronized void initConnectStatusCheckCommand() {
//reconnect=false to close reconnect
// 获得获得重连频率,默认开启。
int reconnect = getReconnectParam(getUrl());
// 若开启重连功能,创建重连线程
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
// 创建 Runnable 对象
Runnable connectStatusCheckCommand = new Runnable() {
public void run() {
try {
// 未连接,重连
if (!isConnected()) {
connect();
// 已连接,记录最后连接时间
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
// 超过一定时间未连接上,才打印异常日志。并且,仅打印一次。默认,15 分钟。
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
// 每一定次发现未重连,才打印告警日志。默认,1800 次,1 小时。
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
// 发起定时任务
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
发送消息
@Override
public void send(Object message, boolean sent) throws RemotingException {
// 未连接时,开启重连功能,则先发起连接
if (send_reconnect && !isConnected()) {
connect();
}
// 发送消息
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
4.2 ClientDelegate
实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 client 属性的方法。
/**
* ClientDelegate
*
* 客户端装饰者
*/
public class ClientDelegate implements Client {
/**
* 客户端
*/
private transient Client client;
public ClientDelegate() {
}
}.
5. Server
5.1 AbstractServer
实现 Server 接口,继承 AbstractEndpoint 抽象类,服务器抽象类,重点实现了公用的逻辑
public abstract class AbstractServer extends AbstractEndpoint implements Server {
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
/**
* 线程池
*/
ExecutorService executor;
/**
* 服务地址
*/
private InetSocketAddress localAddress;
/**
* 绑定地址
*/
private InetSocketAddress bindAddress;
/**
* 服务器最大可接受连接数
*/
private int accepts;
/**
* 空闲超时时间,单位:毫秒
*/
private int idleTimeout; //600 seconds
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 服务地址
localAddress = getUrl().toInetSocketAddress();
// 绑定地址
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 服务器最大可接受连接数
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
// 空闲超时时间
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
// 开启服务器
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
// 获得线程池
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}}
子类类图
5.2 ServerDelegate
实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 server 属性的方法。
/**
* ServerDelegate
*
* 服务器装饰者
*/
public class ServerDelegate implements Server {
/**
* 服务器
*/
private transient Server server;
public ServerDelegate() {
}
}
6. Channel
6.1 AbstractChannel
实现 Channel 接口,实现 AbstractPeer 抽象类,通道抽象类。
/**
* AbstractChannel
*
* Channel 抽象类
*/
public abstract class AbstractChannel extends AbstractPeer implements Channel {
public AbstractChannel(URL url, ChannelHandler handler) {
super(url, handler);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (isClosed()) {
throw new RemotingException(this, "Failed to send message "
+ (message == null ? "" : message.getClass().getName()) + ":" + message
+ ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
}
}
@Override
public String toString() {
return getLocalAddress() + " -> " + getRemoteAddress();
}
}
具体的发送方法,子类实现。在 AbstractChannel 中,目前只做状态检查。
6.2 ChannelDelegate
现 Channel 接口,通道装饰者实现类。在每个实现的方法里,直接调用被装饰的 channel 属性的方法。
/**
* ChannelDelegate
*
*/
public class ChannelDelegate implements Channel {
private transient Channel channel;
public ChannelDelegate() {
}
}
7. ChannelHandler
7.1 ChannelHandlerAdapter
实现 ChannelHandler 接口,通道处理器适配器,每个方法为空实现
7.2 ChannelHandlerDelegate
实现 ChannelHandler 接口,通道处理器装饰者接口。
7.2.1 AbstractChannelHandlerDelegate
实现 ChannelHandlerDelegate 接口,通道处理器装饰者抽象实现类。在每个实现的方法里,直接调用被装饰的 handler 属性的方法。
/**
* 通道处理器装饰者抽象实现类
*/
public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {
/**
* 通道处理器
*/
protected ChannelHandler handler;
protected AbstractChannelHandlerDelegate(ChannelHandler handler) {
Assert.notNull(handler, "handler == null");
this.handler = handler;
}
@Override
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate) handler).getHandler();
}
return handler;
}
@Override
public void connected(Channel channel) throws RemotingException {
handler.connected(channel);
}
}
7.2.2 DecodeHandler
实现 AbstractChannelHandlerDelegate 抽象类,解码处理器,处理接收到的消息,实现了 Decodeable 接口的情况。
/**
* 解码处理器,处理接收到的消息,实现了 Decodeable 接口的情况。
*/
public class DecodeHandler extends AbstractChannelHandlerDelegate {
private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
private void decode(Object message) {
if (message != null && message instanceof Decodeable) {
try {
((Decodeable) message).decode(); // 解析消息
if (log.isDebugEnabled()) {
log.debug(new StringBuilder(32).append("Decode decodeable message ").append(message.getClass().getName()).toString());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn(new StringBuilder(32).append("Call Decodeable.decode failed: ").append(e.getMessage()).toString(), e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
}
8. Dispacher
8.1 ChannelHandlers
通道处理器工厂。
/**
* 通道处理器工厂
*/
public class ChannelHandlers {
/**
* 单例
*/
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) { // for testing
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(
new HeartbeatHandler(
ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
)
);
}
}
8.2 Dispatcher 实现类
Dispatcher
- all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
- direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
- message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
- execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
- connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
8.2.1 AllDispatcher
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
8.2.1 AllDispatcher
实现 WrappedChannelHandler 抽象类。覆写 #connected(channel) 方法如下:
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
8.2.1 ChannelEventRunnable
实现 Runnable 接口。
@Override
public void run() {
switch (state) {
case CONNECTED: handler.connected(channel); break;
case DISCONNECTED:handler.disconnected(channel); break;
case SENT:handler.sent(channel, message);break;
case RECEIVED:handler.received(channel, message);break;
case CAUGHT:handler.caught(channel, exception);break;
default: logger.warn("unknown state: " + state + ", message is " + message);
}
}
9. Codec
9.1 CodecSupport
编解码工具类,提供查询 Serialization 的功能。
初始化:
/**
* 序列化对象集合
* key:序列化类型编号 {@link Serialization#getContentTypeId()}
*/
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
/**
* 序列化名集合
* key:序列化类型编号 {@link Serialization#getContentTypeId()}
* value: 序列化拓展名
*/
private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
static {
// 基于 Dubbo SPI ,初始化
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
for (String name : supportedExtensions) {
Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
byte idByte = serialization.getContentTypeId();
if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
logger.error("Serialization extension " + serialization.getClass().getName()
+ " has duplicate id to Serialization extension "
+ ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
+ ", ignore this Serialization extension");
continue;
}
ID_SERIALIZATION_MAP.put(idByte, serialization);
ID_SERIALIZATIONNAME_MAP.put(idByte, name);
}
}
获取所有序列化方式
查找 Serialization 对象
public static Serialization getSerialization(URL url, Byte id) throws IOException {
Serialization serialization = getSerializationById(id);
String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); // 默认,hessian2
// 出于安全的目的,针对 JDK 的序列化方式(对应编号为 3、4、7),检查连接到服务器的 URL 和实际传输的数据,协议是否一致。
// https://github.com/apache/incubator-dubbo/issues/1138
// Check if "serialization id" passed from network matches the id on this side(only take effect for JDK serialization), for security purpose.
if (serialization == null
|| ((id == 3 || id == 7 || id == 4) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
}
return serialization;
}
9.2 AbstractCodec
实现 Codec2 接口,提供公用方法
子类类图:
编解码器的实现,通过继承的方式,获得更多的功能。每一个 Codec2 类实现对不同消息的编解码。通过协议头来判断,具体使用哪个编解码逻辑
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) { // 请求
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) { // 响应
encodeResponse(channel, buffer, (Response) msg);
} else { // 提交给父类( Telnet ) 处理,目前是 Telnet 命令的结果。
super.encode(channel, buffer, msg);
}
}
最后
以上就是香蕉鸭子为你收集整理的精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层1.概述2.AbstractPeer3.AbstractEndpoint4.Client5. Server6. Channel7. ChannelHandler8. Dispacher9. Codec的全部内容,希望文章能够帮你解决精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层1.概述2.AbstractPeer3.AbstractEndpoint4.Client5. Server6. Channel7. ChannelHandler8. Dispacher9. Codec所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复