概述
1 获取目标类的代理bean
此时applicationContext中有两个bean,一个id=‘demo-consumer’,对应的实例是ApplicationConfig的实例,一个是id=‘demoService’ 对应的实例是ReferenceBean的实例;
DemoService demoService = (DemoService)context.getBean("demoService");
// ReferenceBean的继承结构:
ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean
ReferenceBean 实现了FactoryBean,那么在使用getBean(“demoService”),在BeanFactory中获取Bean时,spring在执行到DefaultListableBeanFactory.getObjectForBeanInstance方法时,会判断当前容器中的实例是否是FactoryBean的实例,如果是,则调用重写的getObject()方法,判断的逻辑和代码如下图所示:
因为ReferenceBean实现了FactoryBean,因此在执行getBean(“demoService”),方法时,最终执行了ReferenceBean.getObject();代码如下:
//RefereneceBean的结构:
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean
// ReferenceBean.getObject()
@Override
public Object getObject() {
return get();
}
//ReferenceConfig.get()
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
主要是init方法。init的只要代码如下:
//ReferenceConfig.init()
...
ref = createProxy(map);
//map的值是:{side=consumer, application=demo-consumer, register.ip=192.168.0.103, release=2.7.3, methods=sayHello, lazy=false, sticky=false, dubbo=2.0.2, pid=8564, check=false, interface=org.apache.dubbo.demo.DemoService, timestamp=1575614824918}
String serviceKey = URL.buildKey(interfaceName, group, version);
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
initialized = true;
//ReferenceConfig.createProxy()方法
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
//判断是否是本地方法调用,判断依据,referenece是否配置了url,配置了,则为远程方法调用
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
// 开始构建服务提供方的URL信息,URL中的信息见下图
urls.clear(); // reference retry init will add url to urls, lead to OOM
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
checkRegistry();
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
}
}
}
/*
//private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
上述代码涉及到Dubbo中的对SPI扩展,可以查看官方文档:
http://dubbo.apache.org/zh-cn/docs/source_code_guide/adaptive-extension.html
依据官方文档中可知,REF_PROTOCOL实际是一个动态生成的类的实例,类名是Protocol$Adaptive,代码如下图。
REF_PROTOCOL.refer(),默认情况下,调用dubboProtocol.refer();
*/
if (urls.size() == 1) {
// 如果只有一个URL则,构建Invoker
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
//多个URL,聚合为一个Invoker
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's CLUSTER is available
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker);
}
REF_PROTOCOL生成的动态类代码
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
//没有@Adaptive注解,直接抛出异常
public void destroy() {
throw new UnsupportedOperationException(
"The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException(
"The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
//有@Adaptive注解,编写通过URL获取具体扩展类的代码,有代码可知调用refer()方法实际调用extension.refer()方法;extension在默认情况下,是通过关键词dubbo进行配置的扩展类。
@SuppressWarnings("unchecked")
public Invoker refer(Class arg0, URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
+ url.toString() + ") use keys([protocol])");
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
@SuppressWarnings("unchecked")
public Exporter<?> export(Invoker arg0) throws RpcException {
if (arg0 == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
+ url.toString() + ") use keys([protocol])");
Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
其中string=
dubbo://127.0.0.1:20880/org.apache.dubbo.demo.DemoService?application=demo-consumer&check=false&interface=org.apache.dubbo.demo.DemoService&lazy=false&pid=8564®ister.ip=192.168.0.103&remote.application=&side=consumer&sticky=false
DubboProtocol的refer继承自AbstractProtocol:代码如下:
//AbstractProtocol.refer
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
其中AsyncToSyncInvoker是一个代理类,其中包含了一个目标的Invoker;目标Invoker来源于protocolBindingRefer(type, url);其代码如下:
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
因此,代理类中的目标Invoker就是DubboInvoker,context.getBean(“demoService”)返回的就是一个持有DubboInvoker的代理类AsyncToSyncInvoker的动态代理对象,其中该动态代理对象的InvocationHandler就是InvokerInvocationHandler。
2 获取ExchangeClient
ExchangeClient的继承结构如下:
public interface ExchangeClient extends Client, ExchangeChannel{}
创建DubboInvoker的代码如下:
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
getClients(url)返回的是ExchageClient[]列表;代码如下:
//DubboProtocol.getClients
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;
/**
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
上述代码主要判断是否可以共享连接数,如果在配置文件中配置了dubbo:service,那么一个service独立使用一个连接,如果没有配置,所有的service共享一个连接。后面代码两个功能,一是获取共享连接getSharedClient,一是初始化独立连接initClient
2.1 获取共享连接getSharedClient(url, connections);
//dubboProtocol.getSharedClient
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
//xml配置的url,
String key = url.getAddress();
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
//如果缓存中存在,核对是否能够使用
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
// dubbo check
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
// connectNum must be greater than or equal to 1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
if (CollectionUtils.isEmpty(clients)) {
clients = buildReferenceCountExchangeClientList(url, connectNum);
referenceClientMap.put(key, clients);
} else {
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
/**
* I understand that the purpose of the remove operation here is to avoid the expired url key
* always occupying this memory space.
*/
locks.remove(key);
return clients;
}
}
//DubboProtocol.buildReferenceCountExchangeClientList
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
List<ReferenceCountExchangeClient> clients = new ArrayList<>();
for (int i = 0; i < connectNum; i++) {
clients.add(buildReferenceCountExchangeClient(url));
}
return clients;
}
buildReferenceCountExchangeClientList不过是循环调用buildReferenceCountExchangeClient的方法,说明共享连接数就是在List中存放多个相同的ReferenceCountExchangeClient。
//DubboProtocol.buildReferenceCountExchangeClient
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
上面代码说明ReferenceCountExchangeClient是一个代理类,其中的目标类为initClient的返回类。init代码如下:
2.2 初始化独立连接initClient
//DubboProtocol.initClient
/**
* Create new connection
*
* @param url
*/
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
//Exchangers.connect
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
//getExchanger(url)是依据SPI返回不同的Exchanger.class的扩展类,默认是HeaderExchanger
}
//HeaderExchanger.connect
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
//getTransporter()是依据SPI返回不同的Transporter.class的扩展类,默认是NettyTransporter
}
//NettyTransporter.connect
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
NettyClient是Netty中相关的内容,可以查看相关文档。
总结:ExchangeClient[]中存放的是ReferenceCountExchangeClient,ReferenceCountExchangeClient是一个代理类,目标类是LazyConnectExchangeClient(xml中配置lazy=true)或者HeaderExchangeClient,其中HeaderExchangeClient中client存放NettyClient的实例。
最后
以上就是哭泣夕阳为你收集整理的3.3 dubbo-消费方获取目标类的代理bean的全部内容,希望文章能够帮你解决3.3 dubbo-消费方获取目标类的代理bean所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复