我是靠谱客的博主 陶醉嚓茶,最近开发中收集的这篇文章主要介绍Dubbo-Client基本介绍关系类图ClientDelegate-Client代表类AbstractClientNettyClient(待完善),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

客户端接口-Client

  • 基本介绍
  • 关系类图
  • ClientDelegate-Client代表类
  • AbstractClient
    • 属性
    • 构造方法
      • initExecutor-初始化线程池
    • 方法
      • connect-连接通道
      • disconnect-断开通道
      • 抽象方法
    • 实现Endpoint接口
      • send
      • close
      • 其他方法
    • 实现Channel接口
      • isConnected
      • 其他方法
    • 实现Client#reconnect
  • NettyClient(待完善)

基本介绍

  • Client接口 是 Dubbo传输层-Transport 客户端 的 抽象,主要抽象了reconnect重连方法,同时继承Endpoint ,Channel, Resetable接口,Endpoint提供了发送和关闭功能Channel提供了操作属性功能(这里就是操作Client的属性),所以Client也拥有这两类功能
  • 继承Channel 是因为 客户端 跟 通道 是 1-1对应的,也就是一个Client对应一个Channel,所以做了这样的设计
  • 还继承了Resetable接口是为了实现reset方法,该方法,不过已经打上@Deprecated注解,不推荐使用
public interface Client extends Endpoint, Channel, Resetable, IdleSensible {

    /**
     * reconnect.
     * 重新连接
     */
    void reconnect() throws RemotingException;

    @Deprecated
    void reset(org.apache.dubbo.common.Parameters parameters);

}

关系类图

在这里插入图片描述

  • 实现Client的有ClientDelegate 和 AbstractClient
  • 继承Client的有ExchangeClient,ExchangeClient是 在交换层 对Client的 继承,ExchangeClient同时还继承了ExchangeChannel。ExchangeClient会放在交换层内容整理中。

ClientDelegate-Client代表类

ClientDelegate就是对Client的装饰,ClientDelegate就是装饰模式中的Decorator装饰角色(继承ClientDelegate的有内部类ChannelWrapper)

public class ClientDelegate implements Client {

    private transient Client client;

    public ClientDelegate() {
    }

	// 通过 构造函数 传递给 被修饰者(Client)
    public ClientDelegate(Client client) {
        setClient(client);
    }

    public Client getClient() {
        return client;
    }

    public void setClient(Client client) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
    }
    
   	// 下面的具体实现 都是 委托给被修饰者执行
	// ....
}

AbstractClient

AbstractClient是 在传输层-Transport 对Client的 抽象实现,提供了创建连接,重新连接,关闭连接功能,同时它还继承AbstractEndpoint抽象类,AbstractEndpoint主要是提供编解码器

属性

// 线程池名称
protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
// 连接锁
private final Lock connectLock = new ReentrantLock();
// 发送消息的时候若断开是否重连
private final boolean needReconnect;
//issue-7054:Consumer's executor is sharing globally.
// 线程池
protected volatile ExecutorService executor;
// ExecutorRepository是一个可以获得线程池的SPI
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

构造方法

由于AbstractClient继承了AbstractEndpoint,AbstractEndpoint继承了AbstractPeer,AbstractPeer维护者URL和ChannelHandler的关系,所以AbstractClient的构造方法入参也必须有这两个参数

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
	// 这里会先执行AbstractPeer的构造方法,再执行AbstractEndpoint的构造方法
	super(url, handler);
	// set default needReconnect true when channel is not connected
	// 根据url中的send.reconnect参数值设置是否需要重联
	needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);

	// 初始化线程池
	initExecutor(url);

	try {
		// 打开客户端,具体代码交给4个子类分别实现
		doOpen();
	} catch (Throwable t) {
		// 关闭客户端
		close();
        // ...
	}

	try {
		// 连接通道
		connect();
	} catch (RemotingException t) {
		if (url.getParameter(Constants.CHECK_KEY, true)) {
			// 关闭通道
			close();
			throw t;
		} else {
			// ...
		}
	} catch (Throwable t) {
		close();
		// ...
	}
}

initExecutor-初始化线程池

String THREAD_NAME_KEY = "threadname";
String DEFAULT_CLIENT_THREADPOOL = "cached";
String THREADPOOL_KEY = "threadpool";

private void initExecutor(URL url) {
	//issue-7054:Consumer's executor is sharing globally, thread name not require provider ip.
	// 向url中添加参数
	url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME);
	url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
	executor = executorRepository.createExecutorIfAbsent(url);
}

接下来看一下ExecutorRepository扩展点的默认实现DefaultExecutorRepository中的createExecutorIfAbsent方法

private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();

String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
String CONSUMER_SIDE = "consumer";
String SIDE_KEY = "side";

public synchronized ExecutorService createExecutorIfAbsent(URL url) {
	// 从data映射集合中获取
	Map<Integer, ExecutorService> executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>());
	//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
	Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
	ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
	// If executor has been shut down, create a new one
	if (executor.isShutdown() || executor.isTerminated()) {
		executors.remove(portKey);
		executor = createExecutor(url);
		// 放入executors映射集合中
		executors.put(portKey, executor);
	}
	return executor;
}

String THREADPOOL_KEY = "threadpool";
// 获取ThreadPool扩展点的自适应扩展(FixedThreadPool),然后通过url中参数来new不同队列类型的线程池
private ExecutorService createExecutor(URL url) {
	return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}

String THREAD_NAME_KEY = "threadname";
String DEFAULT_THREAD_NAME = "Dubbo";

String THREADS_KEY = "threads";
int DEFAULT_THREADS = 200;

String QUEUES_KEY = "queues";
int DEFAULT_QUEUES = 0;

// FixedThreadPool
@Override
public Executor getExecutor(URL url) {
	String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
	int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
	int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
	return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
			queues == 0 ? new SynchronousQueue<Runnable>() :
					(queues < 0 ? new LinkedBlockingQueue<Runnable>()
							: new LinkedBlockingQueue<Runnable>(queues)),
			new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

关于线程池扩展点-ThreadPool的相关内容可以在Dubbo-SPI(六)-各层扩展点中找到

方法

connect-连接通道

protected void connect() throws RemotingException {
	// 上锁
	connectLock.lock();

 	try {
 		// 如果已连接,则直接返回
 		// AbstractClient实现了Channel#isConnected
		if (isConnected()) {
			return;
		}

		// 如果已关闭 或 正在关闭 ,则打印报警日志并返回
		// AbstractPeer#isClosed和isClosing
		if (isClosed() || isClosing()) {
			// ...
			return;
		}

		// 子类实现
		doConnect();

		// 如果没有连接成功,则抛出异常
		if (!isConnected()) {
			// ...

		} else {
			// ...
		}

	} catch (RemotingException e) {
		throw e;

	} catch (Throwable e) {
		// ...

	} finally {
		// 解锁
		connectLock.unlock();
	}
}

disconnect-断开通道

public void disconnect() {
	connectLock.lock();
	try {
		try {
			Channel channel = getChannel();
			if (channel != null) {
				// 关闭通道
				channel.close();
			}
		} catch (Throwable e) {
			logger.warn(e.getMessage(), e);
		}
		
		try {
			// 子类实现
			doDisConnect();
		} catch (Throwable e) {
			logger.warn(e.getMessage(), e);
		}
	} finally {
		connectLock.unlock();
	}
}

抽象方法

Client有5个抽象方法,分别是doOpen、doClose、doConnect、doDisConnect、getChannel,他们具体由子类实现

实现Endpoint接口

send

@Override
public void send(Object message, boolean sent) throws RemotingException {
	// 如果需要重连 并且 当前是未连接状态,则先进行连接
	// needReconnect见构造方法
	if (needReconnect && !isConnected()) {
		connect();
	}
	// 获取通道
	Channel channel = getChannel();
	//TODO Can the value returned by getChannel() be null? need improvement.
	if (channel == null || !channel.isConnected()) {
		// ...
	}
	// 通过通道进行发送
	channel.send(message, sent);
}

close

@Override
public void close() {
	// 如果已关闭,则直接返回
	// AbstractPeer#isClosed
	if (isClosed()) {
		// ...
		return;
	}
	 
	connectLock.lock();
	try {
		if (isClosed()) {
			// ...
			return;
		}

		try {
			// 把close字段置为true
			// AbstractPeer#close
			super.close();
		} catch (Throwable e) {
			logger.warn(e.getMessage(), e);
		}

		try {
			// 断开通道连接
			disconnect();
		} catch (Throwable e) {
			logger.warn(e.getMessage(), e);
		}

		try {
			// 子类实现
			doClose();
		} catch (Throwable e) {
			logger.warn(e.getMessage(), e);
		}

	} finally {
		connectLock.unlock();
	}
}

@Override
public void close(int timeout) {
	close();
}

其他方法

@Override
public InetSocketAddress getLocalAddress() {
	Channel channel = getChannel();
	if (channel == null) {
		return InetSocketAddress.createUnresolved(NetUtils.getLocalHost(), 0);
	}
	return channel.getLocalAddress();
}

实现Channel接口

isConnected

@Override
public boolean isConnected() {
	Channel channel = getChannel();
	if (channel == null) {
		return false;
	}
	return channel.isConnected();
}

其他方法

几乎都是对Channel的属性做操作

@Override
public Object getAttribute(String key) {
	Channel channel = getChannel();
	if (channel == null) {
		return null;
	}
	return channel.getAttribute(key);
}

@Override
public void setAttribute(String key, Object value) {
	Channel channel = getChannel();
	if (channel == null) {
		return;
	}
	channel.setAttribute(key, value);
}

@Override
public void removeAttribute(String key) {
	Channel channel = getChannel();
	if (channel == null) {
		return;
	}
	channel.removeAttribute(key);
}

@Override
public boolean hasAttribute(String key) {
	Channel channel = getChannel();
	if (channel == null) {
		return false;
	}
	return channel.hasAttribute(key);
}

@Override
public InetSocketAddress getRemoteAddress() {
	Channel channel = getChannel();
	if (channel == null) {
		return getUrl().toInetSocketAddress();
	}
	return channel.getRemoteAddress();
}

实现Client#reconnect

@Override
public void reconnect() throws RemotingException {
	if (!isConnected()) {
		connectLock.lock();
		try {
			// 如果通道没有关闭,则先断开,再重新连接
			if (!isConnected()) {
				disconnect();
				connect();
			}
		} finally {
			connectLock.unlock();
		}
	}
}

NettyClient(待完善)

最后

以上就是陶醉嚓茶为你收集整理的Dubbo-Client基本介绍关系类图ClientDelegate-Client代表类AbstractClientNettyClient(待完善)的全部内容,希望文章能够帮你解决Dubbo-Client基本介绍关系类图ClientDelegate-Client代表类AbstractClientNettyClient(待完善)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(93)

评论列表共有 0 条评论

立即
投稿
返回
顶部