概述
对于Dubbo的服务提供者,主要有两种线程池,一种是IO处理线程池,另一种是服务调用线程池。而作为IO处理线程池,由于Dubbo基于Mina、Grizzly和Netty框架做IO组件,IO线程池都是基于这些框架来配置,比如Netty中的boss和worker线程池,Dubbo选择的是“无边界”的CachedThreadPool,这意味着对所有服务请求先做到“来者不拒”,但它进一步限制了IO处理的线程数,默认是核数+1,本文拿Netty组件举例,代码见NettyServer#open:
ExecutorService boss = Executors.newCachedThreadPool(newNamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(newNamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss,worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
那么当请求到达Dubbo的提供者端,会进行哪些处理呢?用过Dubbo框架的同学哪怕没看过源码也能猜出个大概,请求处理会包含三部分:请求解析、服务调用和应答。请求解析需要确认请求的正确性,比如请求解码(比如协议是否正确)、请求是否合法(提供者端是否有该服务;该服务是否需要token验证来防止绕过注册中心直连);服务调用过程就是提供者作为服务端的一个服务处理过程,这个过程需要用到前面说到的第二种服务调用线程池来执行,该过程通过线程池来和请求解析过程分开,这样做的目的一是过程解耦,二是可以做到服务提供者超时返回,为了让用户能对该过程进行拦截,Dubbo特意通过SPI实现了Filter机制,用户可以通过自定义Filter来对服务调用进行日志记录和监控,当然前提是服务调用线程池还没被请求打满;应答过程主要是对结果进行编码并返回。
我们先来仔细看看请求解析过程,我们这里参照的是Netty,使用过Netty的同学都知道,如果想自定义流处理、协议的编解码功能,需要自己去实现一些适配类,比如Netty3.x中的SimpleChannelHandler和Netty4.x中的ChannelInboundHandlerAdapter,Dubbo的2.5.3版本依赖的是Netty3.x版本,我们这里直接可以从NettyServer#doOpen方法看出Dubbo向Netty中注册了哪些Handler,这些Handler是请求数据处理的第一道屏障:
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 无界的Netty boss线程池,负责和消费者建立新的连接
ExecutorService boss = Executors.newCachedThreadPool(newNamedThreadFactory("NettyServerBoss", true));
// 无界的Netty worker线程池,负责连接的数据交换
ExecutorService worker = Executors.newCachedThreadPool(newNamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker,getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// Netty服务启动类
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(newChannelPipelineFactory() {
publicChannelPipeline getPipeline(){
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
// 解码处理器,{@linkInternalDecoder}
pipeline.addLast("decoder", adapter.getDecoder());
// 编码处理器,{@linkInternalEncoder}
pipeline.addLast("encoder", adapter.getEncoder());
// 数据解析后流程处理的起点,{@linkNettyHandler}
pipeline.addLast("handler", nettyHandler);
returnpipeline;
}
});
// 绑定端口
channel= bootstrap.bind(getBindAddress());
}
从这里可以看出,如果我们在一个JVM进程只暴露一个Dubbo服务端口,那么一个JVM进程只会有一个NettyServer实例,也会只有一个NettyHandler实例,但如果应用即是消费者,也是提供者,那么将会存在多个NettyHandler。从上面代码也可以看出,Dubbo在Netty的Pipeline中只注册了三个Handler,而Dubbo内部也定义了一个ChannelHandler接口,用来将和Channel相关的处理串起来,而第一个ChannelHandler就是由NettyHandler来调用的。有趣的是NettyServer本身也是一个ChannelHandler。当Dubbo将Spring容器中的服务实例做了动态代理的处理后,就会通过NettyServer#doOpen来暴露服务端口,再接着将服务注册到注册中心。这些步骤做完后,Dubbo的消费者就可以来和提供者建立连接了,当然是消费者来主动建立连接,而提供者在初始化连接后会调用NettyHandler#channelConnected方法来创建一个NettyChannel:
@Override
public void channelConnected(ChannelHandlerContextctx, ChannelStateEvent e)throws Exception{
// 从channelMap中创建或获取一个NettyChannel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
if (channel != null) {
//如果在channels中没有则创建,注意这里的key是远端消费者的地址,即IP+端口
channels.put(NetUtils.toAddressString((InetSocketAddress)ctx.getChannel().getRemoteAddress()), channel);
}
// 这里的handler正是创建此NettyHandler的NettyServer
handler.connected(channel);
} finally{
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
NettyHandler有两个重要的属性用来保存当前的Netty Channel和Netty Channel和Dubbo内部NettyChannel的映射关系:
private final org.jboss.netty.channel.Channel channel;
private static final ConcurrentMap<org.jboss.netty.channel.Channel,NettyChannel>channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel,NettyChannel>();
从上面的代码可以看出,就像Netty和Dubbo都有自己的ChannelHandler一样,Netty和Dubbo也有着自己的Channel。该方法最后会调用NettyServer#connected方法来检查新添加channel后是否会超出提供者配置的accepts配置,如果超出,则直接打印错误日志并关闭该Channel,这样的话消费者端自然会收到连接中断的异常信息,详细可以见AbstractServer#connected方法。这里我们也可以看出,消费者和提供者建立的每一个TCP连接都放到了NettyHandler的channels中。还记得我们在《Dubbo源代码实现五》中提到的,消费者和提供者之间默认只会建立一条TCP长连接,为了增加消费者调用服务提供者的吞吐量,可以在消费者的dubbo:reference中配置connections来单独增加消费者和服务提供者的TCP长连接吗?作为服务提供者,也同样可以限制所接收的连接数,例如:
<dubbo:protocolname="dubbo"port="8888"threads="500"accepts="200"/>
需要注意的是,这种配置的长连接不会像JDK中的线程池那样按需来建立,而是在消费者启动后就全部创建好,的如果消费者“太过分”的话,即消费者配置的连接数已经超过了服务提供者的accepts,那么多余连接的建立时会遭到提供者拒绝,于是消费者将收到如下异常:
[18/06/17 04:02:55:055 CST] DubboClientReconnectTimer-thread-2 WARN transport.AbstractClient: [DUBBO] client reconnect to 提供者IP+端口 find error .url: dubbo://提供者IP+端口/...略...
com.alibaba.dubbo.remoting.RemotingException: Failed connect toserver /提供者IP+端口 fromNettyClient 消费者IP using dubbo version 2.5.3, cause:Connect wait timeout: 1500000ms.
atcom.alibaba.dubbo.remoting.transport.AbstractClient.connect(AbstractClient.java:282)
atcom.alibaba.dubbo.remoting.transport.AbstractClient$1.run(AbstractClient.java:145)
atjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
atjava.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
atjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
atjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
atjava.lang.Thread.run(Thread.java:745)
当连接建立完毕后,消费者就可以请求提供者的服务了,当请求到来,提供者这边会依次经过如下Handler的处理:
NettyCodecAdapter$InternalDecoder#messageReceived:对请求进行解码。
NettyHandler#messageReceived:根据Netty Channel来获取Dubbo的Channel,并开始调用Dubbo的Handler。
AbstractPeer#received:如果服务已经关闭,则返回,否则调用下一个Handler来处理。
MultiMessageHandler#received:如果是批量请求,则依次对请求调用下一个Handler来处理。
AllChannelHandler#received:该Dubbo的Handler非常重要,因为从这里是IO线程池和服务调用线程池的边界线,该Handler将服务调用操作直接提交给服务调用线程池并返回。
我们这里仔细看一下AllChannelHandler#received:
public void received(Channel channel, Object message)throws RemotingException {
// 获取服务调用线程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel,handler, ChannelState.RECEIVED, message));
} catch(Throwable t) {
thrownew ExecutionException(message,channel, getClass() +" error when process received event .", t);
}
}
我们注意到这里对execute进行了异常捕获,这是因为IO线程池是无界的(0-Integer.MAX_VALUE),但服务调用线程池是有界的,所以进行execute提交可能会遇到RejectedExecutionException异常,这也是为什么我们会在dubbo输出的日志中看到如下片段:
2017-06-16 22:01:03-WARNorg.jboss.netty.channel.DefaultChannelPipeline- [DUBBO] An exception was thrown by a user handler while handling anexception event ([id: 0xad26fbf0, /消费者IP+端口=> /提供者IP+端口] EXCEPTION: com.alibaba.dubbo.remoting.ExecutionException:class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandlererror when process received event .), dubbo version: 2.5.3, current host:127.0.0.1 - New I/O worker #95
com.alibaba.dubbo.remoting.ExecutionException:class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandlererror when process caught event .
atcom.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:67)
atcom.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate.caught(AbstractChannelHandlerDelegate.java:44)
atcom.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate.caught(AbstractChannelHandlerDelegate.java:44)
atcom.alibaba.dubbo.remoting.transport.AbstractPeer.caught(AbstractPeer.java:127)
atcom.alibaba.dubbo.remoting.transport.netty.NettyHandler.exceptionCaught(NettyHandler.java:112)
atcom.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.exceptionCaught(NettyCodecAdapter.java:165)
atorg.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)
atorg.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
atorg.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
atcom.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.messageReceived(NettyCodecAdapter.java:148)
atorg.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
atorg.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
atorg.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
atorg.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
atorg.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
atorg.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
atorg.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
atjava.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.RejectedExecutionException: Threadpool is EXHAUSTED! ThreadName: DubboServerHandler-提供者IP+端口, Pool Size: 300 (active: 274, core:300, max: 300, largest: 300), Task: 145727238 (completed: 145726964), Executorstatus:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://提供者IP+端口!
atcom.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:53)
atjava.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
atjava.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
atcom.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:65)
... 19 more
当然,如果你没有指定,服务调用线程池默认的size是200,并且使用的是SynchronousQueue队列,请看FixedThreadPool#getExecutor实现:
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads,0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
可以看出,上面的错误日志正式在AbortPolicyWithReport中被输出的,我们看下AbortPolicyWithReport#rejectedExecution的实现:
@Override
public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
String msg = String.format("Thread pool isEXHAUSTED!"+
"Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d),Task: %d (completed: %d),"+
"Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in%s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(),e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(),e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
throw new RejectedExecutionException(msg);
}
可以看出,这里输出的是WRAN级别的日志,并且抛出了RejectedExecutionException异常,那么问题来了,我们业务系统能否检测到这种情况?很遗憾,在这一步的时候,不仅没有走到Invoke的Filter环节,也没有真正开始进行服务调用,所以对服务接口配置的Filter或者AOP中都无法对这种情况进行处理。那我们该如何感知这种情况,一种方式是检测Dubbo输出的日志,第二种方式是消费者可以收到对应的RpcException,因为NettyCodecAdapter$InternalDecoder#exceptionCaught已经对该异常进行了处理,直接输出到了消费者端:
@Override
public void exceptionCaught(ChannelHandlerContext ctx,ExceptionEvent e)throws Exception {
ctx.sendUpstream(e);
}
消费者方可以用过Filter捕获到该异常,并输出日志:
com.alibaba.dubbo.rpc.RpcException:Failed to invoke remote method: doYouLoveMe, provider: dubbo://提供者IP+端口/...略..., cause: com.alibaba.dubbo.remoting.ExecutionException: classcom.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler errorwhen process received event .
com.alibaba.dubbo.remoting.ExecutionException:class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandlererror when process received event .
atcom.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.received(AllChannelHandler.java:58)
atcom.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler.received(HeartbeatHandler.java:90)
atcom.alibaba.dubbo.remoting.transport.MultiMessageHandler.received(MultiMessageHandler.java:25)
atcom.alibaba.dubbo.remoting.transport.AbstractPeer.received(AbstractPeer.java:123)
atcom.alibaba.dubbo.remoting.transport.netty.NettyHandler.messageReceived(NettyHandler.java:91)
atorg.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
atcom.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.messageReceived(NettyCodecAdapter.java:148)
atorg.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
atorg.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
atorg.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
atorg.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
atorg.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
atorg.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
atorg.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
atjava.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.RejectedExecutionException:Thread pool is EXHAUSTED!Thread Name: DubboServerHandler-提供者IP+端口, Pool Size: 300 (active: 274, core:300, max: 300, largest: 300), Task: 145731584 (completed: 145731286), Executorstatus:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://提供者IP+端口!
atcom.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:53)
atjava.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
atjava.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
atcom.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.received(AllChannelHandler.java:56)
... 16 more
如果经常出现该问题,说明提供者的处理能力跟不上消费者,最简单的解决办法就是将提供者的服务调用线程池数目调大点,比如:
<dubbo:protocol name="dubbo"dispatcher="all"threadpool="fixed"threads="500"/>
这里我们为了保证模块内的主要服务有线程可用(防止次要服务抢占过多服务调用线程),可以对次要服务进行并发限制,例如:
<dubbo:protocol threads="500"/>
<dubbo:service interface="xxxxx"version="1.0.0"ref="xxx1"
executes="100">
<dubbo:method name="findAllPerson" executes="50"/>
</dubbo:service>
如果服务调用线程池够用,则将直接创建ChannelEventRunnable对象并扔到服务调用线程池中执行。直接将事件交给DecodeHandler来处理,从这里开始调用的Handler如下:
DecodeHandler#received:对message的data进行解码,并交给下一个Handler.
HeaderExchangeHandler#received:这里会记录下读取时间,便于心跳任务检测(参见HeartBeatTask),通过下一个Handler执行完服务调用后往Channel写入应答数据。
DubboProtocol#reply:因为使用的是dubbo协议,所以这里是DubboProtocol,这里将直接通过Invoker来调用服务处理过程。
这里先给出DubboProtocol中的相关代码片段:
public Object reply(ExchangeChannel channel,Object message)throws RemotingException{
if (message instanceofInvocation) {
Invocation inv = (Invocation) message;
// 根据请求参数来选择Invoker
Invoker<?> invoker = getInvoker(channel, inv);
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 直接执行服务调用过程
returninvoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupportedrequest: " + message == null ?null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider:" +channel.getLocalAddress());
}
Invoker<?> getInvoker(Channelchannel, Invocation inv) throws RemotingException{
int port =channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
// 服务调用key,即接口名+版本+提供者端口
String serviceKey = serviceKey(port,path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
// 通过服务调用key找到对应的DubboExporter对象,exporterMap中是每个接口对应一个DubboExporter
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
thrownew RemotingException(channel,"Notfound exported service: "+serviceKey + "in " + exporterMap.keySet() + ", may be version or group mismatch"+ ", channel:consumer: " +channel.getRemoteAddress() +" --> provider: " + channel.getLocalAddress() + ", message:" + inv);
// 返回invoker
return exporter.getInvoker();
}
代码都容易懂,为了能让用户接入服务调用的之前和之后,Dubbo使用了Filter机制,类似于Servlet中的Filter概念,Filter在消费者和提供者都有。当然,Dubbo里面也有一系列的内定Filter用来执行特定的功能,比如状态检查、异常处理等,这里我们就不讨论Dubbo中Filter执行顺序的问题了。为了大家能清晰的了解一笔请求会经历哪些内定的Filter,这里列出提供者在接收到请求后会执行的Filter链路(调用顺序从上到下):
invoker ={ProtocolFilterWrapper$1@3163}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {EchoFilter@3173} //第一个调用的Filter,如果方法名是$echo,则仿照echo协议传什么返回什么
next = {ProtocolFilterWrapper$1@3174}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {ClassLoaderFilter@3177} //将当前线程的ClassLoader切换成服务调用接口的ClassLoader,服务调用完毕再切换回来
next = {ProtocolFilterWrapper$1@3178}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {GenericFilter@3180} //通过GenericService来调用的Dubbo服务才会执行里面的逻辑
next = {ProtocolFilterWrapper$1@3181}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {ContextFilter@3183} // 填充上下文RpcContext中的数据,服务调用完后清除
next = {ProtocolFilterWrapper$1@3184}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {TraceFilter@3188} // telnet的时候才会执行内部逻辑
next = {ProtocolFilterWrapper$1@3189}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {MonitorFilter@3192} // 如果需要使用监控功能,这里会统计并发数和异常数
next = {ProtocolFilterWrapper$1@3193}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {TimeoutFilter@3196} // 统计方法调用耗时,如果超过provider设置的时间,则输出告警日志
next = {ProtocolFilterWrapper$1@3197}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {ExceptionFilter@3199} // 遇到异常Dubbo会如何处理,大家一定得好好看看这个Filter的实现
next = {ProtocolFilterWrapper$1@3200}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter = {TokenFilter@3202} // 校验token,如果在provider中token设置成true,表明不允许consumer绕过注册中心直接调用provider的服务,注意,token是provider负责生成并注册到注册中心的
next = {ProtocolFilterWrapper$1@3203}"com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker ={RegistryProtocol$InvokerDelegete@3172}
filter ={ProviderAFilter@3209} // 这里是业务方自定义的Filter
next ={RegistryProtocol$InvokerDelegete@3172}
invoker ={JavassistProxyFactory$1@3210} "registry://注册中心地址/com.alibaba.dubbo.registry.RegistryService?...略..."
InvokerWrapper.invoker = {JavassistProxyFactory$1@3210}"registry://服务提供者地址/com.alibaba.dubbo.registry.RegistryService?...略..."
wrapper = {Wrapper0@3215}
this$0 ={JavassistProxyFactory@3216}
proxy ={ServiceProviderImpl$$EnhancerByCGLIB$$14338e8a@3217}"com.manzhizhen.study.dubbo.ServiceProviderImpl@747af825" // 对Spring的服务类做动态代理处理
type = {Class@418} "interfacecom.manzhizhen.study.dubbo.ServiceProvider"
url = {URL@3218}"registry://注册中心地址/com.alibaba.dubbo.registry.RegistryService?...略..."
url = {URL@3211}"dubbo://服务提供者地址/com.manzhizhen.study.dubbo.ServiceProvider?...略..."
最后执行的就是Wrapper对proxy的调用了,proxy指向的是Spring容器中的服务实例的代理类,这个代理类采用的是和Spring容器配置相关的代理形式(比如这里是CGLib而不是JDK的动态代理),而Wrapper是Dubbo用Javassist实现的代理类(关于Dubbo代理部分可以参看《Dubbo源代码实现二》),用来调用proxy。大家有兴趣可以看看Wrapper的源码。
最后
以上就是舒适云朵为你收集整理的Dubbo源代码实现六:线程池模型与提供者的全部内容,希望文章能够帮你解决Dubbo源代码实现六:线程池模型与提供者所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复