概述
Dubbo SPI机制
- 一、Java SPI简介
- 二、Dubbo SPI简介
- 三、Java SPI 与 Dubbo SPI异同
- 四、Dubbo SPI详解
- 1. ExtensionLoader类
- 1.1 ExtensionLoader构造方法
- 1.2 ExtensionLoader.getExtensionLoader(XXX.class) 获取一个ExtensionLoader 对象
- 1.3 extensionLoader.getDefaultExtension() 获取当前接口指定的默认实现类
- 1.4 extensionLoader.getAdaptiveExtension() 获取当前接口所有实现类中 @Adaptive 标识的实现类
- 1.5 extensionLoader.getActivateExtension(URL url, String key, String group) 方法
- Activate 注解
- 场景、示例
一、Java SPI简介
关于Java SPI的接收本编文件不过多介绍,详细可以参照【Java SPI机制】。
二、Dubbo SPI简介
Dubbo 的扩展点加载从 JDK 标准的 SPI(Service Provider Interface) 扩展点发现机制加强而来。思想一致,实现不同。
当接口上打了 @SPI
注解时,Dubbo 会在 META-INF/dubbo/internal/
、META-INF/dubbo/
、META-INF/services/
(可自行扩展,通过JAVA SPI) 接口全名文件下读取扩展点。配置文件内容为 name -> implementation class,可以声明多个提供者。
Dubbo SPI主要实现是体现在ExtensionLoader类中。
SPI示例如下图:
Dubbo SPI中常用的几个注解:
- @SPI:默认拓展实现类。
- @Adaptive:自适应拓展实现类标志。
- @Activate:自动激活条件的标记。
三、Java SPI 与 Dubbo SPI异同
- JDK 标准的 SPI 会一次性实例化扩展点所有实现,会很浪费资源。Dubbo SPI 可以延迟加载,一次只加载自己想要加载的扩展实现。
- Dubbo SPI 增加了对扩展点 IoC 和 AOP 的支持(
injectExtension()
方法),一个扩展点可以直接 setter 注入其它扩展点。 - Dubbo 的扩展机制能很好的支持第三方 IoC 容器,默认支持 Spring Bean。
- Java SPI 如果扩展点加载失败,会导致调用方报错,导致追踪问题很困难。
四、Dubbo SPI详解
本编文章源码取自 dubbo-2.7.14。
1. ExtensionLoader类
Dubbo SPI主要实现是体现在 ExtensionLoader 类中。
ExtensionLoader 中 status
属性,如下:
// 名称分隔符正则表达式
private static final Pattern NAME_SEPARATOR = Pattern.compile("\s*[,]+\s*");
// 扩展加载器map
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap(64);
// 扩展实例map
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap(64);
// 装箱策略数组:通过Java SPI机制扩展,需要加载的根路径
private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();
/**
* 通过Java SPI机制扩展,指定dubbo SPI需要加载的路径
*/
private static LoadingStrategy[] loadLoadingStrategies() {
return (LoadingStrategy[])StreamSupport.stream(ServiceLoader.load(LoadingStrategy.class).spliterator(), false).sorted().toArray((x$0) -> {
return new LoadingStrategy[x$0];
});
}
通过 LoadingStrategy 的 String directory()
返回需要加载的根目录。
1.1 ExtensionLoader构造方法
private ExtensionLoader(Class<?> type) {
// 初始化当前对象的type
this.type = type;
// 获取当前类的ExtensionFactory
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
从代码中可以看出,ExrtensionLoader与SPI是一一对应的关系,且每个封装了 SPI 的 ExrtensionLoader 都保存了一个 objectFactory 对象,而 objectFactory 指向的都是 ExrtensionLoader 的 adaptive 实现,即AdaptiveExtensionFactory。
1.2 ExtensionLoader.getExtensionLoader(XXX.class) 获取一个ExtensionLoader 对象
通过ExtensionLoader类的static方法获取一个当前 XXX.class 对应的 ExtensionLoader 对象。
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {// type不能为空
throw new IllegalArgumentException("Extension type == null");
} else if (!type.isInterface()) {// type必须为接口
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
} else if (!withExtensionAnnotation(type)) {// 接口必须有@SPI注解进行
throw new IllegalArgumentException("Extension type (" + type + ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
} else {
// 通过type获取当前对应的ExtensionLoader对象
ExtensionLoader<T> loader = (ExtensionLoader)EXTENSION_LOADERS.get(type);
if (loader == null) {//第一次创建
// 创建当前type对应的ExtensionLoader对象
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type));
// 缓存到private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap(64); 对象中
loader = (ExtensionLoader)EXTENSION_LOADERS.get(type);
}
return loader;
}
}
1.3 extensionLoader.getDefaultExtension() 获取当前接口指定的默认实现类
extensionLoader.getDefaultExtension() 获取当前接口指定的默认实现类,如@SPI("dubbo") public interface Protocol
默认指定 dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
协议,如果@SPI中value值为空或 value=“true”,直接返回 null。
public T getDefaultExtension() {
// 获取当前接口下所有的实现类Class对象,并缓存起来
getExtensionClasses();
if (StringUtils.isBlank(cachedDefaultName) || "true".equals(cachedDefaultName)) {// 返回 null
return null;
}
// 通过cachedDefaultName获取指定的实例对象
return getExtension(cachedDefaultName);
}
@SuppressWarnings("unchecked")
public T getExtension(String name) {
return getExtension(name, true);
}
/**
* 通过cachedDefaultName获取指定的实例对象
*/
public T getExtension(String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {// 返回 null
return getDefaultExtension();
}
// 构建并获取一个Holder
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建默认实例
instance = createExtension(name, wrap);
// 缓存,保证只创建一次
holder.set(instance);
}
}
}
return (T) instance;
}
/**
* 构建并获取一个Holder
*/
private Holder<Object> getOrCreateHolder(String name) {
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
// 缓存,保证只构建一次
cachedInstances.putIfAbsent(name, new Holder<>());
holder = cachedInstances.get(name);
}
return holder;
}
/**
* 创建默认实例
*/
@SuppressWarnings("unchecked")
private T createExtension(String name, boolean wrap) {
// 获取当前name的Class对象
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null || unacceptableExceptions.contains(name)) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {// 如果当前对象为null,进行创建
// 创建
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.getDeclaredConstructor().newInstance());
// 缓存
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 利用Spring中依赖注入思想进行对象初始化
injectExtension(instance);
if (wrap) {
List<Class<?>> wrapperClassesList = new ArrayList<>();
if (cachedWrapperClasses != null) {// 判断当前实例是否存在包装器类
wrapperClassesList.addAll(cachedWrapperClasses);
wrapperClassesList.sort(WrapperComparator.COMPARATOR);// 排序
Collections.reverse(wrapperClassesList);
}
if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
for (Class<?> wrapperClass : wrapperClassesList) {// 循环包装,包装类套包装类
Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
if (wrapper == null
|| (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) {
// 利用Spring中依赖注入思想进行对象初始化
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
}
}
// 初始化
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
1.4 extensionLoader.getAdaptiveExtension() 获取当前接口所有实现类中 @Adaptive 标识的实现类
extensionLoader.getAdaptiveExtension() 获取当前接口所有实现类中 @Adaptive 标识的实现类。如果存在多个 @Adaptive 标识的实现类,则选择最后一个装载的类。
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {// 双重检查
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建当前接口所以实现类中最后一个@Adaptive标注的对象
instance = createAdaptiveExtension();
// 缓存
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
/**
* 创建当前接口所以实现类中最后一个@Adaptive标注的对象
*/
@SuppressWarnings("unchecked")
private T createAdaptiveExtension() {
try {
// 利用Spring中依赖注入思想进行对象初始化
// getAdaptiveExtensionClass() 获取@Adaptive标注的Class对象
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
/**
* 获取@Adaptive标注的Class对象
*/
private Class<?> getAdaptiveExtensionClass() {
// 获取当前接口下所有的实现类Class对象,并缓存起来
getExtensionClasses();
if (cachedAdaptiveClass != null) {// 判断 private volatile Class<?> cachedAdaptiveClass = null; 是否已经初始化,如果不为空,返回
return cachedAdaptiveClass;
}
// 加载并创建 @Adaptive 注解标记的类的Class对象
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
/**
* 获取当前接口下所有的实现类Class对象,并缓存起来
*/
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {// 双重校验
classes = cachedClasses.get();
if (classes == null) {
// 加载当前接口下所有的实现类Class对象
classes = loadExtensionClasses();
// 环境
cachedClasses.set(classes);
}
}
}
return classes;
}
/**
* 加载当前接口下所有的实现类Class对象
*/
private Map<String, Class<?>> loadExtensionClasses() {
// 缓存@SPI标记的类的默认实现类的Key
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
// 利用Java SPI机制获取需要加载的目录,并通过loadDirectory进行加载,缓存到extensionClasses中。
for (LoadingStrategy strategy : strategies) {
// 加载目录
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(),
strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"),
strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
/**
* 加载目录
*/
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
String fileName = dir + type;
try {
Enumeration<java.net.URL> urls = null;
// 获取 ClassLoader
ClassLoader classLoader = findClassLoader();
// try to load from ExtensionLoader's ClassLoader first
if (extensionLoaderClassLoaderFirst) {
ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
urls = extensionLoaderClassLoader.getResources(fileName);
}
}
// 初始化urls
if (urls == null || !urls.hasMoreElements()) {
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
}
if (urls != null) {
while (urls.hasMoreElements()) {// 遍历urls进行加载Resource
java.net.URL resourceURL = urls.nextElement();
// 加载
loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages);
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", description file: " + fileName + ").", t);
}
}
/**
* 加载
*/
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
try {
// 获取文件
BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8));
Throwable var7 = null;
try {
String clazz = null;
String line;
while((line = reader.readLine()) != null) {
// 获取 # 的index(注释)
int ci = line.indexOf(35);
if (ci >= 0) {// 如果存在 # 号,则截取之前的字符串
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
// 获取 = 的index
int i = line.indexOf(61);
if (i > 0) {
name = line.substring(0, i).trim();// 获取当前SPI的name
clazz = line.substring(i + 1).trim();// 获取当前SPI的class字符串名字
} else {
clazz = line;
}
if (StringUtils.isNotEmpty(clazz) && !this.isExcluded(clazz, excludedPackages)) {
// 加载名字为clazz的类
this.loadClass(extensionClasses, resourceURL, Class.forName(clazz, true, classLoader), name, overridden);
}
} catch (Throwable var22) {
IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + this.type + ", class line: " + line + ") in " + resourceURL + ", cause: " + var22.getMessage(), var22);
this.exceptions.put(line, e);
}
}
}
} catch (Throwable var23) {
var7 = var23;
throw var23;
} finally {
if (reader != null) {
if (var7 != null) {
try {
reader.close();
} catch (Throwable var21) {
var7.addSuppressed(var21);
}
} else {
reader.close();
}
}
}
} catch (Throwable var25) {
logger.error("Exception occurred when loading extension class (interface: " + this.type + ", class file: " + resourceURL + ") in " + resourceURL, var25);
}
}
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name, boolean overridden) throws NoSuchMethodException {
if (!this.type.isAssignableFrom(clazz)) {// 判断当前clazz是否为type的实现
throw new IllegalStateException("Error occurred when loading extension class (interface: " + this.type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + " is not subtype of interface.");
} else {
if (clazz.isAnnotationPresent(Adaptive.class)) {// @Adaptive注解标记的类
// 缓存@Adaptive标记的类:取SPI配置列表中最后一个
this.cacheAdaptiveClass(clazz, overridden);
} else if (this.isWrapperClass(clazz)) {// 包装类
// 缓存当前类的所有包装类
this.cacheWrapperClass(clazz);
} else {// 其他
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {// 如果name为空,系统默认会生成一个name
name = this.findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
// 通过正则表达式进行拆分
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
// 缓存@Activate 标记的类
this.cacheActivateClass(clazz, names[0]);
String[] var7 = names;
int var8 = names.length;
for(int var9 = 0; var9 < var8; ++var9) {
String n = var7[var9];
// 缓存当前class所有name
this.cacheName(clazz, n);
// 保存clazz 到 extensionClasses map中
this.saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
}
}
/**
* 加载并创建 @Adaptive 注解标记的类的Class对象
*/
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
/**
* SPI机制:@SPI("javassist")默认使用javassist=JavassistCompiler, 还有另外一种实现 JdkCompiler
* 通过Compiler 对象编辑加载@Adaptive实例的Class对象
*/
org.apache.dubbo.common.compiler.Compiler compiler =
ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
1.5 extensionLoader.getActivateExtension(URL url, String key, String group) 方法
/**
* 在所有的激活中,使用 key 指定的扩展
*/
public List<T> getActivateExtension(URL url, String key)
/**
* 在所有的激活中,指定的group 外加 使用key 指定的扩展
*/
public List<T> getActivateExtension(URL url, String key, String group)
/**
* 在所有的激活中 values指定的扩展
*/
public List<T> getActivateExtension(URL url, String[] values)
/**
* 在所有的激活中,指定的group 外加 使用values 指定的扩展
*/
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> activateExtensions = new ArrayList<>();
// solve the bug of using @SPI's wrapper method to report a null pointer exception.
TreeMap<Class, T> activateExtensionsMap = new TreeMap<>(ActivateComparator.COMPARATOR);
Set<String> loadedNames = new HashSet<>();
Set<String> names = CollectionUtils.ofSet(values);
if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
// 加载当前type的所有实现类
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {// 遍历当前 SPI 的所有 Activate注解对象
String name = entry.getKey();// spi 扩展名
Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) {// 自定义
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {// 默认
activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
} else {
continue;
}
if (isMatchGroup(group, activateGroup)// 判断当前group是否包含在所有 activateGroup中
&& !names.contains(name) // name不包含在values中
&& !names.contains(REMOVE_VALUE_PREFIX + name) // REMOVE_VALUE_PREFIX + name)不包含在values中
// (k.equals(key) || k.endsWith("." + key)) &&
// ((keyValue != null && keyValue.equals(v)) || (keyValue == null && ConfigUtils.isNotEmpty(v)))
&& isActive(activateValue, url)
&& !loadedNames.contains(name)// 不在loadedNames中
) {
// 通过name获取Class对象与实例对象,缓存
activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
loadedNames.add(name);
}
}
if (!activateExtensionsMap.isEmpty()) {
activateExtensions.addAll(activateExtensionsMap.values());
}
}
List<T> loadedExtensions = new ArrayList<>();
for (String name : names) {// 过滤所有没有排除的扩展名
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !names.contains(REMOVE_VALUE_PREFIX + name)) {
if (!loadedNames.contains(name)) {
if (DEFAULT_KEY.equals(name)) {
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(0, loadedExtensions);
loadedExtensions.clear();
}
} else {
loadedExtensions.add(getExtension(name));
}
loadedNames.add(name);
} else {
// If getExtension(name) exists, getExtensionClass(name) must exist, so there is no null pointer processing here.
String simpleName = getExtensionClass(name).getSimpleName();
logger.warn("Catch duplicated filter, ExtensionLoader will ignore one of them. Please check. Filter Name: " + name +
". Ignored Class Name: " + simpleName);
}
}
}
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(loadedExtensions);
}
return activateExtensions;
}
Activate 注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* Group过滤条件。
* <br />
* 包含{@link ExtensionLoader#getActivateExtension}的group参数给的值,则返回扩展。
* <br />
* 如没有Group设置,则不过滤。
*/
String[] group() default {};
/**
* Key过滤条件。包含{@link ExtensionLoader#getActivateExtension}的URL的参数Key中有,则返回扩展。
* <p/>
* 示例:<br/>
* 注解的值 <code>@Activate("cache,validatioin")</code>,
* 则{@link ExtensionLoader#getActivateExtension}的URL的参数有<code>cache</code>Key,或是<code>validatioin</code>则返回扩展。
* <br/>
* 如没有设置,则不过滤。
*/
String[] value() default {};
/**
* 排序信息,可以不提供。
*/
String[] before() default {};
/**
* 排序信息,可以不提供。
*/
String[] after() default {};
/**
* 排序信息,可以不提供。
*/
int order() default 0;
}
场景、示例
某些情况下,同一个接口的多个实现需要同时发挥作用,比如 filter 链。此时需要按条件选择一批实现类来工作。
/**
* filter1:@Activate(group = {"group1", "group2"}, order = 1, value = "value1")
* filter2:@Activate(group = {"group2"}, order = 1, value = "value2")
* filter3:@Activate(group = {"group2"}, order = 1, value = "value3")
*/
@Test
public void testActivate() {
ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(Filter.class);
URL url = URL.valueOf("test://localhost/test");
url = url.addParameter("value1", "value2");
url = url.addParameter("value2", "1");
// 得到所有的可激活拓展类
List<Filter> list = extensionLoader.getActivateExtension(url, new String[]{}, "group2");
for (Filter filter : list) {
log.info("getActivateExtension>>>>>>>>filter={}", filter.filter());
}
}
结果:
getActivateExtension>>>>>>>>filter=filter1.filter()
getActivateExtension>>>>>>>>filter=filter2.filter()
最后
以上就是俊逸乌冬面为你收集整理的Dubbo SPI机制一、Java SPI简介二、Dubbo SPI简介三、Java SPI 与 Dubbo SPI异同四、Dubbo SPI详解的全部内容,希望文章能够帮你解决Dubbo SPI机制一、Java SPI简介二、Dubbo SPI简介三、Java SPI 与 Dubbo SPI异同四、Dubbo SPI详解所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复