概述
本文基于dubbo v2.6.x
1.Client
Client接口是dubbo 网络传输层客户端抽象,主要抽象了reconnect重连方法,同时继承Endpoint ,Channel, Resetable接口,先来看看Client接口定义:
Endpoint主要是代表一个端点,可以理解为网络的一端,它抽象了send发送方法,端点关闭方法以及获取该端点的地址信息与channelhandler
Channel是dubbo框架对通道的抽象,它主要是抽象了对通道属性的操作,可以设置,获取,移除属性,同时还可以获取远端网络信息与判断连接状态
Resetable接口主要是抽象了reset重置方法。
接下来我们来看下它的UML类关系图
2.AbstractClient
AbstractClient 是Client接口的抽象实现,主要提供了 创建连接,重新连接,关闭连接功能,同时它还继承AbstractEndpoint抽象方法,该抽象类主要是提供编解码器。
首先来看下它的class 类定义:
public abstract class AbstractClient extends AbstractEndpoint implements Client
接下来看下它的成员信息
private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
// 重连定时线程池
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
// 连接锁
private final Lock connectLock = new ReentrantLock();
// 发送消息的时候若断开是否重连
private final boolean send_reconnect;
//重连次数
private final AtomicInteger reconnect_count = new AtomicInteger(0);
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period; //重连警告间隔
private final long shutdown_timeout;
protected volatile ExecutorService executor;
// 重连执行任务
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
// 最后一次连接成功的时间戳
private long lastConnectedTime = System.currentTimeMillis();
其实通过它的成员就能看出来它的一些功能,提供了一个定时重连线程池,重连任务,还有一堆连接需要的参数。接下来我们来看下它的构造方法
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 发送消息的时候断开了是否重连接 默认是false的
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
// shutdown 超时时间 15分钟
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// The default reconnection interval is 2s, . 重连间隔是2s
// 重连警告间隔1800 也就是1小时 1800 means warning interval is 1 hour
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
// 具体打开代码交给 子类实现
doOpen(); // 调用具体实现类的doOpen方法 。 模板方法 , 初始化客户端
} catch (Throwable t) {
close();// 异常 关闭
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect. 进行连接
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
//是否检查,默认是true,在没有提供者的时候会抛出异常 报错说没有提供者
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else { // 不检查的时候就会发出警告信息
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close(); // 关闭
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
//从datastore中获取线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
// 移除
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
构造方法还是比较繁琐的,首先是获取send.reconnect ,shutdown.timeout ,reconnect.waring.period 参数值,send.reconnect 代表的是发送消息的时候发现断开连接是否进行重连,这个缺省是false 的,shutdown.timeout 是断开连接超时时间,用来打印重连错误日志,这个参数后面就能看到了,缺省是15分钟,reconnect.waring.period 是重连警告间隔 ,缺省是1800ms。接着调用doOpen 方法打开,这doOpen方法是个抽象方法,需要子类具体实现,protected abstract void doOpen() throws Throwable;
, 再往下就是调用 connect()方法进行连接操作,出现异常会判断 check参数是否打开,缺省是打开的,打开就会抛出异常信息。最后就是从DataStore 中获取线程池了。
接下来我们看下 connect 连接方法。
protected void connect() throws RemotingException {
connectLock.lock();// 连接锁
try {
if (isConnected()) { // 如果已经连接
return;
}
// 初始化重连线程
initConnectStatusCheckCommand();
doConnect();// 进行连接 交给子类来处理
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnect_count.set(0);
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
connect 方法中先是获取连接锁,然后判断是否已经连接,如果已经连接则抛出异常,接下来调用initConnectStatusCheckCommand 方法,该方法主要是启动定时重连任务,再往后就是调用doConnect 方法了,doConnect 方法是个抽象方法,需要子类进行具体实现,如果没有连接成功则会抛出异常。接下来就是设置reconnect_count重连次数为0 ,reconnect_error_log_flag 重连错误日志 为false,最后释放锁。
接下来看下 disconnect 断开连接的实现:
public void disconnect() {
connectLock.lock();// 获取连接锁
try {
destroyConnectStatusCheckCommand(); // 销毁重连定时器
try {
Channel channel = getChannel();
if (channel != null) {
channel.close();// 关闭channel
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
doDisConnect();// 进行销毁操作 实际上是子类来做
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
在断开连接disconnect 方法中,先是获取连接锁,接着调用 destroyConnectStatusCheckCommand 方法来停掉重连任务,获取channel 调用channel的close方法关闭channel,接着调用doDisConnect 方法,该方法是个抽象方法,需要子类来具体实现,最后是释放连接锁。
接下来看下initConnectStatusCheckCommand 这个初始化连接状态检查定时任务的方法:
private synchronized void initConnectStatusCheckCommand() {
//reconnect=false to close reconnect
// 获取重连频率 ,默认是开启的 默认是2s
int reconnect = getReconnectParam(getUrl());
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
Runnable connectStatusCheckCommand = new Runnable() {
@Override
public void run() {
try {
if (!isConnected()) {//没有连接就重新连接
connect();
} else {// 已经连接了 ,就记录一下时间
lastConnectedTime = System.currentTimeMillis();// 上一次连接成功的时间
}
} catch (Throwable t) {
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// wait registry sync provider list
// 距上次连接上的时间超过15分钟
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
// 每重连1800次就会触发一次警告, 2s重连一次的话就是1个小时
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
// 设置定时器
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
在initConnectStatusCheckCommand方法中,首先是调用getReconnectParam获取重连频率,这个重连频率其实就是获取用户配置reconnect参数,如果没有配置则使用默认2s,配置了就使用用户配置的重连频率。接下来就是创建重连任务connectStatusCheckCommand ,在connectStatusCheckCommand中会判断 是否处于连接状态,如果是的话,就更新下lastConnectedTime 最后连接时间戳,如果没有的就调用connect 方法进行重连。如果发生异常先判断 断开连接的时间是否超过shutdown_timeout 这个时间,如果超过就打印错误日志。加下来就是判断重连次数,每重连reconnect_warning_period (缺省是1800)次就会打印下警告日志。
接下来看下 destroyConnectStatusCheckCommand 销毁定时检查连接状态任务的方法,该方法就是判断任务不是null并且没有结束,就会调用任务cancel方法取消,然后调度线程池进行清理那种取消的定时任务。
private synchronized void destroyConnectStatusCheckCommand() {
try {
if (reconnectExecutorFuture != null && !reconnectExecutorFuture.isDone()) {
reconnectExecutorFuture.cancel(true);
reconnectExecutorService.purge();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
3.NettyClient
咱们这里NettyClient是介绍的netty4的客户端实现,它继承AbstractClient抽象客户端。首先来看下它的成员
// 线程数是cup核心数+1 与32 比较最小的
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
private Bootstrap bootstrap;
private volatile Channel channel; // volatile, please copy reference to use 有 volatile 修饰符。因为客户端可能会断开重连,需要保证多线程的可见性。
我们可以看到它这个NioEventLoopGroup 是个静态成员,线程数 你cpu核心数+1 与32做比较去小的,大于32核心的也用32。这个channel 是netty 的一个channel。
接下来看下构造方法:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
这里主要是需要注意下wrapChannelHandler方法,它把handler 又包装了几层。
接下来看下doOpen方法:
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)// 不使用这个算法,防止延迟
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
//连接超时时间最小是3000 ,如果用户设置了的这个时间小于3000 设置成3000
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
// 使用用户设置的超时时间
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
该方法就是调用netty的api 来创建 bootstrap ,设置一些属性,ChannelOption.SO_KEEPALIVE
这个参数 是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活,ChannelOption.TCP_NODELAY
这里是true,也就是不使用Nagle算法,能够保证实时性,使用Nagle算法的发小包的时候会有延迟。ChannelOption.ALLOCATOR
使用对象池,重用缓冲区。 我们可以看到这个方法并没有进行连接,连接动作是在doConnect方法中实现的
接下来看下doConnect 连接方法:
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
// 连接
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {// 连接成功的时候
Channel newChannel = future.channel();/// 获取channel
try {
// Close old channel 如果有以前的channel存在,
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) { //如果老的channel不是null,然后就会将老得channel关闭
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();// 关闭老得连接
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {// 判断netty client 关闭状态值
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();//关闭新连接
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
// 将新生成的channel 设置成channel
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
doConnect 方法总的来说就是调用bootstrap的connect方法进行连接,然后获取newChannel,如果存在老的channel,会关闭,将channel 赋值newChannel。如果连接出现异常则抛出。
接下来看下doDisConnect 销毁连接的实现:
@Override
protected void doDisConnect() throws Throwable {
try {
NettyChannel.removeChannelIfDisconnected(channel);
} catch (Throwable t) {
logger.warn(t.getMessage());
}
}
就是从NettyChannel中移除对应channel的缓存。该缓存 维护了 netty channel 与dubbo channel的对应关系。
最后看下getChannel获取channel的方法:
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isActive())
return null;
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
该方法获取的channel是dubbo channel, 在方法中,如果netty channel 是空或者断开了,就会返回空,否则调用NettyChannel 的getOrAddChannel方法获取返回,getOrAddChannel方法其实就是从 netty channel 与dubbo channel的对应关系缓存map中 查找该netty channel 对应的dubbo chennel ,如果没有的话就创建一个NettyChannel ,然后塞到缓存map中,并且返回。
最后
以上就是自由机器猫为你收集整理的深度解析dubbo网络传输层Client的全部内容,希望文章能够帮你解决深度解析dubbo网络传输层Client所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复