概述
consumer同步实现分析
send执行链中HeaderExchangeChannel-request() 创建了 DefaultFuture,依赖 DefaultFuture实现同步功能。
DefaultFuture 通过await,signal();实现请求和响应同步
HeaderExchangeChannel
public ResponseFuture request(Object request, int timeout) throws RemotingException {
//......
//返回future ,future 肯定是 channel.send(req);执行后生成某些东西。
// future是怎么与channel.send(req);建立关系的?
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);//A
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
new DefaultFuture(channel, req, timeout);
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);//注意这里。关联 send()的关键
CHANNELS.put(id, channel);
}
futrue.get()调用链
public Object get() throws RemotingException {
return get(timeout);
}
// 阻塞直到 response获取到对象
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
response是从哪里获取的呢?
DefaultFuture 中有个received方法
public static void received(Channel channel, Response response) {
try {
//注意这里,response.getId() 猜测是send(req)时req传过去的。
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
被调跟踪如下
从图中还分析不出具体什么时候触发,但是可以推断出是 Netty readIO事件触发.
HeaderExchangeHandler
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
//如果不是心跳响应,转发响应数据给DefaultFuture
DefaultFuture.received(channel, response);
}
}
HeaderExchangeHandler是什么时候关联netty的?
连接的时候:
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
。。。。
}
建立关联的过程
使用静态变量 关联 request实例,id 和DefaultFuture,channel.send(req)(高亮A);将req.id传给netty, netty 服务端发送响应时执行response.set(req.id),客户端接受请求后根据 response.id获取DefaultFuture, 然后调用 DefaultFuture.received(channel, response); 将返回结果传入。
响应处理:DefaultFuture.get()。
read 事件回调handler的分析
netty read事件 发生后,handler的调用顺序
AllChannelHandler:将handler封装成task,提交到线程池执行。
HeartbeatHandler:处理心跳响应,如果是心跳请求 发送心跳响应,如果是心跳响应,什么也不做。
MultiMessageHandler:处理批量消息
NettyClient(AbstractPeer):无处理,调用handler.received()
NettyHandler:netty事件处理,不同的时间对应handler的不同方法
创建task 使用线程池执行后handler顺序
ChannelEventRunnable.run() :线程任务类,状态不同执行不同的handller方法
DecodeHandler.received(Channel, Object) :解码 转发
HeaderExchangeHandler.received(Channel, Object) :根据message类型,转发不同的方法执行(message不是Response 什么时候用?)
HeaderExchangeHandler.handleResponse(Channel, Response) :如果不是心跳响应,转发响应数据给DefaultFuture
DefaultFuture.received(Channel, Response):找到 response.getId 对应的future,解除阻塞。
NettyClient创建 handler包装多层分析
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// 创建 NettyClient ,将原来的handler包装了多层
super(url, wrapChannelHandler(url, handler));
}
包装handler的调用链
NettyClient.<init>(URL, ChannelHandler) :创建NettyClient
AbstractClient.wrapChannelHandler(URL, ChannelHandler) :设置线程名,添加默认线程类型为cached
ChannelHandlers.wrap(ChannelHandler, URL) :转发
ChannelHandlers.wrapInternal(ChannelHandler, URL)://将需要的handler按照需求层层封装。
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
执行handler包装 由外到内:MultiMessageHandler->HeartbeatHandler->AllChannelHandler
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
粗体部分 调用 AllDispatcher-dispatch 创建了AllChannelHandler
获取response的交互过程
DefaultFuture-received: 放入response, 执行 done.signal(),停止阻塞。
DefaultFuture-get:进入阻塞 直到执行 received后,被唤醒,获取response 返回请求结果。
AllChannelHandler创建跟踪
AllChannelHandler是什么时候创建,前面提到是创建 NettyClient的时候创建的。
创建 NettyClient调用了wrapChannelHandler()
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);//指定 threadpool=cached
return ChannelHandlers.wrap(handler, url);
}
ChannelHandlers.wrap调用下面方法。
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
//粗体的部分 调用了AllDispatcher.dispatch()
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
AllDispatcher.dispatch() 直接创建了AllChannelHandler。
继续跟踪,它调用了父类的构造方法
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;// type is DecodeHandler
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
//executor 是CachedThreadPool 类型的
//。。。
}
AllChannelHandler的父类 保存了hanler和url,并根据配置获取了线程池实例(默认是cached类型的CachedThreadPool )。
CachedThreadPool 线程池分析。
public class CachedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// alive=6000,queues=0,threads= Integer.MAX_VALUE,cores=0,name='DubboClientHandler-10.103.11.210:20880' 其中 alive,queues,threads,cores都是默认值
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
//NamedThreadFactory 组装线程名
}
}
逻辑比较简单,依赖ThreadPoolExecutor,根据配置创建线程池。
这个线程池是可以根据参数配置成不同类型的,并使用具体的参数进行调优。
总结:本文以数据响应为切入点,分析了 发送和响应同步机制,整个接受数据的流程。
最后
以上就是重要毛豆为你收集整理的dubbo源码分析6-consumer请求响应的全部内容,希望文章能够帮你解决dubbo源码分析6-consumer请求响应所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复