我是靠谱客的博主 重要毛豆,最近开发中收集的这篇文章主要介绍dubbo源码分析6-consumer请求响应,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

consumer同步实现分析

send执行链中HeaderExchangeChannel-request() 创建了 DefaultFuture,依赖 DefaultFuture实现同步功能。

DefaultFuture 通过await,signal();实现请求和响应同步

HeaderExchangeChannel

public ResponseFuture request(Object request, int timeout) throws RemotingException {

//......

//返回future ,future 肯定是 channel.send(req);执行后生成某些东西。

// future是怎么与channel.send(req);建立关系的?

DefaultFuture future = new DefaultFuture(channel, req, timeout);

try {

channel.send(req);//A

} catch (RemotingException e) {

future.cancel();

throw e;

}

return future;

}
new DefaultFuture(channel, req, timeout);

public DefaultFuture(Channel channel, Request request, int timeout) {

this.channel = channel;

this.request = request;

this.id = request.getId();

this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

// put into waiting map.

FUTURES.put(id, this);//注意这里。关联 send()的关键

CHANNELS.put(id, channel);

}

futrue.get()调用链

public Object get() throws RemotingException {

return get(timeout);

}

// 阻塞直到 response获取到对象

public Object get(int timeout) throws RemotingException {

if (timeout <= 0) {

timeout = Constants.DEFAULT_TIMEOUT;

}

if (!isDone()) {

long start = System.currentTimeMillis();

lock.lock();

try {

while (!isDone()) {

done.await(timeout, TimeUnit.MILLISECONDS);

if (isDone() || System.currentTimeMillis() - start > timeout) {

break;

}

}

} catch (InterruptedException e) {

throw new RuntimeException(e);

} finally {

lock.unlock();

}

if (!isDone()) {

throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));

}

}

return returnFromResponse();

}

response是从哪里获取的呢?

DefaultFuture 中有个received方法

public static void received(Channel channel, Response response) {

try {

//注意这里,response.getId() 猜测是send(req)时req传过去的。

DefaultFuture future = FUTURES.remove(response.getId());

if (future != null) {

future.doReceived(response);

} else {

logger.warn("The timeout response finally returned at "

+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))

+ ", response " + response

+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()

+ " -> " + channel.getRemoteAddress()));

}

} finally {

CHANNELS.remove(response.getId());

}

}

被调跟踪如下

从图中还分析不出具体什么时候触发,但是可以推断出是 Netty readIO事件触发.

 

HeaderExchangeHandler

static void handleResponse(Channel channel, Response response) throws RemotingException {

if (response != null && !response.isHeartbeat()) {

//如果不是心跳响应,转发响应数据给DefaultFuture

DefaultFuture.received(channel, response);

}

}

HeaderExchangeHandler是什么时候关联netty的?

连接的时候:

public class HeaderExchanger implements Exchanger {

 

public static final String NAME = "header";

 

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {

return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);

}

。。。。

}

建立关联的过程

使用静态变量 关联 request实例,id 和DefaultFuture,channel.send(req)(高亮A);将req.id传给netty, netty 服务端发送响应时执行response.set(req.id),客户端接受请求后根据 response.id获取DefaultFuture, 然后调用 DefaultFuture.received(channel, response); 将返回结果传入。

响应处理:DefaultFuture.get()。

 

read 事件回调handler的分析

netty read事件 发生后,handler的调用顺序

AllChannelHandler:将handler封装成task,提交到线程池执行。

 

HeartbeatHandler:处理心跳响应,如果是心跳请求 发送心跳响应,如果是心跳响应,什么也不做。

MultiMessageHandler:处理批量消息

NettyClient(AbstractPeer):无处理,调用handler.received()

NettyHandler:netty事件处理,不同的时间对应handler的不同方法

 

创建task 使用线程池执行后handler顺序

 

ChannelEventRunnable.run()  :线程任务类,状态不同执行不同的handller方法
DecodeHandler.received(Channel, Object)      :解码 转发
HeaderExchangeHandler.received(Channel, Object)  :根据message类型,转发不同的方法执行(message不是Response 什么时候用?)
HeaderExchangeHandler.handleResponse(Channel, Response) :如果不是心跳响应,转发响应数据给DefaultFuture
DefaultFuture.received(Channel, Response):找到 response.getId 对应的future,解除阻塞。

NettyClient创建 handler包装多层分析

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {

// 创建 NettyClient ,将原来的handler包装了多层

super(url, wrapChannelHandler(url, handler));

}

包装handler的调用链

NettyClient.<init>(URL, ChannelHandler) :创建NettyClient    
AbstractClient.wrapChannelHandler(URL, ChannelHandler) :设置线程名,添加默认线程类型为cached
ChannelHandlers.wrap(ChannelHandler, URL) :转发
ChannelHandlers.wrapInternal(ChannelHandler, URL)://将需要的handler按照需求层层封装。

 

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {

执行handler包装 由外到内:MultiMessageHandler->HeartbeatHandler->AllChannelHandler

return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)

.getAdaptiveExtension().dispatch(handler, url)));

}

粗体部分 调用 AllDispatcher-dispatch 创建了AllChannelHandler

获取response的交互过程

DefaultFuture-received: 放入response, 执行 done.signal(),停止阻塞。

DefaultFuture-get:进入阻塞 直到执行 received后,被唤醒,获取response 返回请求结果。

 

AllChannelHandler创建跟踪

AllChannelHandler是什么时候创建,前面提到是创建 NettyClient的时候创建的。

创建 NettyClient调用了wrapChannelHandler()

protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {

url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);

url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);//指定 threadpool=cached

return ChannelHandlers.wrap(handler, url);

}

ChannelHandlers.wrap调用下面方法。

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {

return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)

.getAdaptiveExtension().dispatch(handler, url)));

}

//粗体的部分 调用了AllDispatcher.dispatch()

 

public class AllDispatcher implements Dispatcher {

public static final String NAME = "all";

@Override

public ChannelHandler dispatch(ChannelHandler handler, URL url) {

return new AllChannelHandler(handler, url);

}

}

AllDispatcher.dispatch() 直接创建了AllChannelHandler。

 

继续跟踪,它调用了父类的构造方法

public AllChannelHandler(ChannelHandler handler, URL url) {

super(handler, url);

}

 

public WrappedChannelHandler(ChannelHandler handler, URL url) {

this.handler = handler;// type is DecodeHandler

this.url = url;

executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

//executor 是CachedThreadPool 类型的

//。。。

}

AllChannelHandler的父类 保存了hanler和url,并根据配置获取了线程池实例(默认是cached类型的CachedThreadPool )。

 

CachedThreadPool 线程池分析。

public class CachedThreadPool implements ThreadPool {



public Executor getExecutor(URL url) {

String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);

int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

// alive=6000,queues=0,threads= Integer.MAX_VALUE,cores=0,name='DubboClientHandler-10.103.11.210:20880' 其中 alive,queues,threads,cores都是默认值



return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,

queues == 0 ? new SynchronousQueue<Runnable>() :

(queues < 0 ? new LinkedBlockingQueue<Runnable>()

: new LinkedBlockingQueue<Runnable>(queues)),

new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));

//NamedThreadFactory 组装线程名

}

}

逻辑比较简单,依赖ThreadPoolExecutor,根据配置创建线程池。

这个线程池是可以根据参数配置成不同类型的,并使用具体的参数进行调优。

 

总结:本文以数据响应为切入点,分析了 发送和响应同步机制,整个接受数据的流程。

 

最后

以上就是重要毛豆为你收集整理的dubbo源码分析6-consumer请求响应的全部内容,希望文章能够帮你解决dubbo源码分析6-consumer请求响应所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部