概述
本篇幅主要分析DubboProtocol.refer方法创建invoker
DubboProtocol.refer(Class<T> serviceType, URL url)
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
//序列化优化
optimizeSerialization(url);
//创建invoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
上述方法创建了一个DubboInvoker,Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。这里有个方法的调用getClients(url)方法节目组金瓶梅挂牌,创建ExchangeClient实例(单个或者多个),ExchangeClient基于NettyClient与NettyServer进行通信。
getClients(URL url)
private ExchangeClient[] getClients(URL url) {
// 是否共享连接
boolean service_share_connect = false;
// 获取连接数,默认为0
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置,则共享连接,否则,一项服务的一个连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
//
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
//如果共享连接
if (service_share_connect) {
//获取共享的ExchangeClient
clients[i] = getSharedClient(url);
} else {
//初始化新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
根据url中的sessions数量判断是否创建共享连接,如果需要创建共享连接则通过getSharedClient获取共享连接,如果客户端未建立,则创建客户端。
getSharedClient(URL url)
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
//获取带有引用计数功能的ExchangeClient
ReferenceCountExchangeClient client = referenceClientMap.get(key);
//获取到了并且client未被关闭增加引用计数
if (client != null) {
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
} else {
//客户端被关闭了,根据key删除掉吧
referenceClientMap.remove(key);
}
}
//放入该key对应的锁对象
locks.putIfAbsent(key, new Object());
//加锁重新获取
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
//通过url初始化exchangeClient
ExchangeClient exchangeClient = initClient(url);
//包装exchangeClient为带有引用计数功能的
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
//加入缓存
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
//锁删除
locks.remove(key);
return client;
}
}
该方法首先从referenceClientMap缓存中获取ReferenceCountExchangeClient即带有引用计数功能的ExchangeClient实例,否则通过调用initClient方法初始化ExchangeClient,然后通过装饰器将创建的exchangeClient包装为ReferenceCountExchangeClient对象加入缓存中,并返回ReferenceCountExchangeClient。
initClient(URL url)
private ExchangeClient initClient(URL url) {
// 获取客户端的类型默认值为netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 添加编解码器与心跳配置
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
//检测客户端类型是否存在,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
//
ExchangeClient client;
try {
// 获取 lazy 配置,并根据配置值决定创建的客户端类型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 创建普通 ExchangeClient 实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
initClient 方法首先获取用户配置的客户端类型,默认为 netty。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。
Exchangers.connect(URL url, ExchangeHandler handler)
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//获取到HeaderExchageClient
return getExchanger(url).connect(url, handler);
}
public static Exchanger getExchanger(URL url) {
//获取url中的exchanger属性,没有使用默认的header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
//dubbo的spi机制
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例
HeaderExchanger.connect(URL url,ExchangeHandler handler)
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
handler的包装处理:DubboProtocol.ExchangeHandler->HeaderExchangeHandler->DecodeHandler,通过Transporters.connect方法创建ExchangeClient对象实例,包装ExchangeClient对象为HeaderExchangeClient对象实例
Transporters.connect(URL url,ChannelHandler... handlers)
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
通过dubbo的spi机制获取到NettyTransporter,然后调用connect方法,创建NettyClient对象
NettyTransporter.connect(URL url, ChannelHandler listener)
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
NettyClient.java
- ChannelHandler:主要是处理channel的,比如channel关闭、连接,发送消息,接收消息等操作
- Resetable:该接口实现类可以实现例如Client相关属性的重置
- AbstractPeer:主要是保存了服务提供这协议的url和通过委托成员变量ChannelHandler实现了ChannelHandler接口的方法和EndPoint接口的方法
/**
* channel事件处理器
*/
private final ChannelHandler handler;
/**
* 第一个服务提供者协议的url地址
*/
private volatile URL url;
- Channel:Netty为了统一实现不同类型NIO框架的框架对Channel处理而做出的抽象接口
- Client:约定客户端必须要实现Client接口的reconnect方法
- AbstractEndpoint:继承AbstractPeer并实现了Resetable接口reset方法,使得我们可以AbstractPeer的Url属性重置编码器、超时时间、连接超时时间
/**
* 编码解码器
*/
private Codec2 codec;
/**
* 超时时间
*/
private int timeout;
/**
* 连接超时时间
*/
private int connectTimeout;
- AbstractClient:该类实现了Client接口和Channel接口的方法,Mina、Netty、Grizzly类型的客户端统一实现,并且该类重写了AbstractEndPoint的reset方法
/**
* 客户端线程池ID自增器
*/
private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
/**
* 客户端连接重连线程池
*/
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
/**
* 客户端连接服务端独占锁,保证一个客户端同时只会一个线程在执行连接动作
*/
private final Lock connectLock = new ReentrantLock();
/**
* 消息发送时,如果当前客户端未连接,是否发起重连操作
*/
private final boolean send_reconnect;
/**
* 记录重连的次数
*/
private final AtomicInteger reconnect_count = new AtomicInteger(0);
// Reconnection error log has been called before?
/**
* 连接出错后是否打印过ERROR日志
*/
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
/**
* 对连接异常,以WARN级别日志输出的频率,默认第一次是以Error日志,然后每出现reconnect_warning_period次后,就打印一次warn级别日志
*/
private final int reconnect_warning_period;
/**
* 关闭服务的超时时间
*/
private final long shutdown_timeout;
/**
* 客户端线程池
*/
protected volatile ExecutorService executor;
/**
* 重连的Future
*/
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
// the last successed connected time
/**
* 上一次重连时间戳
*/
private long lastConnectedTime = System.currentTimeMillis();
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
//调用父类方法初始化url、handler
super(url, handler);
/**
* 初始化send_reconnect 、shutdown_timeout、reconnect_warning_period(默认1小时打印一次日志)
*/
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
//模板方法,委托子类实现,此方法还未真正的连接到服务端
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// 这里会连接nettyServer服务器
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
//从DataStore中获取线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
//从DataStore中移除线程池
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
- a.调用父类构造函数初始化handler、和url
- b.初始化send_reconnect 、shutdown_timeout、reconnect_warning_period(默认1小时打印一次日志)
- c.委托子类实现doOpen方法初始化客户端
- d.connect()方法,真正建立TCP连接,其主要逻辑是开启重连任务,然后委托不同的客户端实现类型实现doConnect()方法打开TCP连接,设置重连次数和重连标识。
protected void connect() throws RemotingException {
//加锁
connectLock.lock();
try {
//已经连接成功返回
if (isConnected()) {
return;
}
//重连任务调度开启
initConnectStatusCheckCommand();
//委托子类实现真正的连接逻辑
doConnect();
//还未连接到服务器
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
//设置重连次数为0
reconnect_count.set(0);
//设置重连error log标识为false
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
- e.从DataStore中获取线程池,然后从DataStore中移除该线程池
- NettyClient:继承AbstractClient,真正的客户端类,主要负责与服务端通信。
/**
* IO线程组,同一个JVM中所有的客户端公用一个IO线程组,且线程数固定为(32与CPU核数+1的最小值)。
*/
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
/**
* Netty客户端启动实例
*/
private Bootstrap bootstrap;
/**
* 客户端连接,请copy其引用使用。
*/
private volatile Channel channel;
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
不难看出,NettyClient的构造函数主要是调用父类AbstractClient的构造函数进行属性初始化
AbstractClient.wrapChannelHandler(URL url,ChannelHandler handler)
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
//在url添加threadname属性
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
//在url添加threadpool属性值为cached
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
由Dubbo源码学习07可知我们的chandler的整体包装流程: MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> DubboProtocol.requestHandler
- MultiMessageHandler:检查消息是否为MutiMessage ,如果 是,分开单条调用后续handler
- HeartbeatHanlder:1.在每个channel动作,对channel标记时间属性, 2. 检查是否心跳请求,是则直接返回心跳,不继续后续请求。
- AllChannelHandler : 1. 将后续handler 包装成 ChannelEventRunnable,捕获后续执行的异常,记录日志 。 2. 包装的runnable 放到独立线程池运行, 达到全流程异步化效果。
- DecodeHandler:判断message的种类然后解码
NettyClient.doOpen()
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
标准的NettyClient的启动模板,创建NettyClientHandler,配置客户端启动实例的属性,设置连接超时时间,最小连接超时时间为3s,初始化ChannelInitializer添加编解码器,NettyClientHandler
NettyClient.doConnect()
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
//连接到远程服务
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// Close old channel
//关闭oldchannel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
//如果当前客户端被关闭了,关闭newChannel
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
//将newChannel赋值给client的channel
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
调用doOpen方法创建的bootstrap实例发起连接,等待个connecTimeout超时时间去连接到服务端,如果连接成功:关闭以前的channel,判断客户端是否已经关闭,如果已经关闭,调用channel.close()方法关闭channel。
最后
以上就是殷勤衬衫为你收集整理的Dubbo源码学习14的全部内容,希望文章能够帮你解决Dubbo源码学习14所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复