概述
客户端接口-Client
- 基本介绍
- 关系类图
- ClientDelegate-Client代表类
- AbstractClient
- 属性
- 构造方法
- initExecutor-初始化线程池
- 方法
- connect-连接通道
- disconnect-断开通道
- 抽象方法
- 实现Endpoint接口
- send
- close
- 其他方法
- 实现Channel接口
- isConnected
- 其他方法
- 实现Client#reconnect
- NettyClient(待完善)
基本介绍
- Client接口 是 Dubbo传输层-Transport 客户端 的 抽象,主要抽象了reconnect重连方法,同时继承Endpoint ,Channel, Resetable接口,Endpoint提供了发送和关闭功能,Channel提供了操作属性功能(这里就是操作Client的属性),所以Client也拥有这两类功能
- 继承Channel 是因为 客户端 跟 通道 是 1-1对应的,也就是一个Client对应一个Channel,所以做了这样的设计
- 还继承了Resetable接口是为了实现reset方法,该方法,不过已经打上@Deprecated注解,不推荐使用
public interface Client extends Endpoint, Channel, Resetable, IdleSensible {
/**
* reconnect.
* 重新连接
*/
void reconnect() throws RemotingException;
@Deprecated
void reset(org.apache.dubbo.common.Parameters parameters);
}
关系类图
- 实现Client的有ClientDelegate 和 AbstractClient
- 继承Client的有ExchangeClient,ExchangeClient是 在交换层 对Client的 继承,ExchangeClient同时还继承了ExchangeChannel。ExchangeClient会放在交换层内容整理中。
ClientDelegate-Client代表类
ClientDelegate就是对Client的装饰,ClientDelegate就是装饰模式中的Decorator装饰角色(继承ClientDelegate的有内部类ChannelWrapper)
public class ClientDelegate implements Client {
private transient Client client;
public ClientDelegate() {
}
// 通过 构造函数 传递给 被修饰者(Client)
public ClientDelegate(Client client) {
setClient(client);
}
public Client getClient() {
return client;
}
public void setClient(Client client) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
}
// 下面的具体实现 都是 委托给被修饰者执行
// ....
}
AbstractClient
AbstractClient是 在传输层-Transport 对Client的 抽象实现,提供了创建连接,重新连接,关闭连接功能,同时它还继承AbstractEndpoint抽象类,AbstractEndpoint主要是提供编解码器
属性
// 线程池名称
protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
// 连接锁
private final Lock connectLock = new ReentrantLock();
// 发送消息的时候若断开是否重连
private final boolean needReconnect;
//issue-7054:Consumer's executor is sharing globally.
// 线程池
protected volatile ExecutorService executor;
// ExecutorRepository是一个可以获得线程池的SPI
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
构造方法
由于AbstractClient继承了AbstractEndpoint,AbstractEndpoint继承了AbstractPeer,AbstractPeer维护者URL和ChannelHandler的关系,所以AbstractClient的构造方法入参也必须有这两个参数
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
// 这里会先执行AbstractPeer的构造方法,再执行AbstractEndpoint的构造方法
super(url, handler);
// set default needReconnect true when channel is not connected
// 根据url中的send.reconnect参数值设置是否需要重联
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
// 初始化线程池
initExecutor(url);
try {
// 打开客户端,具体代码交给4个子类分别实现
doOpen();
} catch (Throwable t) {
// 关闭客户端
close();
// ...
}
try {
// 连接通道
connect();
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
// 关闭通道
close();
throw t;
} else {
// ...
}
} catch (Throwable t) {
close();
// ...
}
}
initExecutor-初始化线程池
String THREAD_NAME_KEY = "threadname";
String DEFAULT_CLIENT_THREADPOOL = "cached";
String THREADPOOL_KEY = "threadpool";
private void initExecutor(URL url) {
//issue-7054:Consumer's executor is sharing globally, thread name not require provider ip.
// 向url中添加参数
url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
}
接下来看一下ExecutorRepository扩展点的默认实现DefaultExecutorRepository中的createExecutorIfAbsent方法
private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
String CONSUMER_SIDE = "consumer";
String SIDE_KEY = "side";
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
// 从data映射集合中获取
Map<Integer, ExecutorService> executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>());
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
// 放入executors映射集合中
executors.put(portKey, executor);
}
return executor;
}
String THREADPOOL_KEY = "threadpool";
// 获取ThreadPool扩展点的自适应扩展(FixedThreadPool),然后通过url中参数来new不同队列类型的线程池
private ExecutorService createExecutor(URL url) {
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
String THREAD_NAME_KEY = "threadname";
String DEFAULT_THREAD_NAME = "Dubbo";
String THREADS_KEY = "threads";
int DEFAULT_THREADS = 200;
String QUEUES_KEY = "queues";
int DEFAULT_QUEUES = 0;
// FixedThreadPool
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
关于线程池扩展点-ThreadPool的相关内容可以在Dubbo-SPI(六)-各层扩展点中找到
方法
connect-连接通道
protected void connect() throws RemotingException {
// 上锁
connectLock.lock();
try {
// 如果已连接,则直接返回
// AbstractClient实现了Channel#isConnected
if (isConnected()) {
return;
}
// 如果已关闭 或 正在关闭 ,则打印报警日志并返回
// AbstractPeer#isClosed和isClosing
if (isClosed() || isClosing()) {
// ...
return;
}
// 子类实现
doConnect();
// 如果没有连接成功,则抛出异常
if (!isConnected()) {
// ...
} else {
// ...
}
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
// ...
} finally {
// 解锁
connectLock.unlock();
}
}
disconnect-断开通道
public void disconnect() {
connectLock.lock();
try {
try {
Channel channel = getChannel();
if (channel != null) {
// 关闭通道
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
// 子类实现
doDisConnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
抽象方法
Client有5个抽象方法,分别是doOpen、doClose、doConnect、doDisConnect、getChannel,他们具体由子类实现
实现Endpoint接口
send
@Override
public void send(Object message, boolean sent) throws RemotingException {
// 如果需要重连 并且 当前是未连接状态,则先进行连接
// needReconnect见构造方法
if (needReconnect && !isConnected()) {
connect();
}
// 获取通道
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
// ...
}
// 通过通道进行发送
channel.send(message, sent);
}
close
@Override
public void close() {
// 如果已关闭,则直接返回
// AbstractPeer#isClosed
if (isClosed()) {
// ...
return;
}
connectLock.lock();
try {
if (isClosed()) {
// ...
return;
}
try {
// 把close字段置为true
// AbstractPeer#close
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
// 断开通道连接
disconnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
// 子类实现
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
@Override
public void close(int timeout) {
close();
}
其他方法
@Override
public InetSocketAddress getLocalAddress() {
Channel channel = getChannel();
if (channel == null) {
return InetSocketAddress.createUnresolved(NetUtils.getLocalHost(), 0);
}
return channel.getLocalAddress();
}
实现Channel接口
isConnected
@Override
public boolean isConnected() {
Channel channel = getChannel();
if (channel == null) {
return false;
}
return channel.isConnected();
}
其他方法
几乎都是对Channel的属性做操作
@Override
public Object getAttribute(String key) {
Channel channel = getChannel();
if (channel == null) {
return null;
}
return channel.getAttribute(key);
}
@Override
public void setAttribute(String key, Object value) {
Channel channel = getChannel();
if (channel == null) {
return;
}
channel.setAttribute(key, value);
}
@Override
public void removeAttribute(String key) {
Channel channel = getChannel();
if (channel == null) {
return;
}
channel.removeAttribute(key);
}
@Override
public boolean hasAttribute(String key) {
Channel channel = getChannel();
if (channel == null) {
return false;
}
return channel.hasAttribute(key);
}
@Override
public InetSocketAddress getRemoteAddress() {
Channel channel = getChannel();
if (channel == null) {
return getUrl().toInetSocketAddress();
}
return channel.getRemoteAddress();
}
实现Client#reconnect
@Override
public void reconnect() throws RemotingException {
if (!isConnected()) {
connectLock.lock();
try {
// 如果通道没有关闭,则先断开,再重新连接
if (!isConnected()) {
disconnect();
connect();
}
} finally {
connectLock.unlock();
}
}
}
NettyClient(待完善)
最后
以上就是陶醉嚓茶为你收集整理的Dubbo-Client基本介绍关系类图ClientDelegate-Client代表类AbstractClientNettyClient(待完善)的全部内容,希望文章能够帮你解决Dubbo-Client基本介绍关系类图ClientDelegate-Client代表类AbstractClientNettyClient(待完善)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复