概述
文章目录
- Java-Chassis核心源码
- Consumer到Provider整体流程
- @EnableServiceComb-初始化ServiceComb
- TransportManager-管理传输层
- TransportManager-管理传输层
- RegistrationManager-注册服务
- DiscoveryManager-服务发现
- Registration/Discovery-继承多接口将功能抽象分类可学
- RemoteServiceRegistry-注意这个怎么和Eureka交互,底层交互有意思
- @RpcSchema-Pojo类型调用的生产者
- @RestSchema-Rest形式调用的生产者
- @RpcReference-消费者
- SPIServiceUtils-自定义SPI加载器
- 职责链管理器FilterChainsManager/ConsumerHandlerManager
- 举一反三
- EventBus
- PaaSResourceUtils+PathMatchingResourcePatternResolver
- PropertySourcesPlaceholderConfigurer
- ThreadPoolExecutorEx/LinkedBlockingQueueEx
- ConcurrentHashMapEx
- AopProxyUtils/BeanUtils
- ReflectionUtils
- StringValueResolver
- SpringCloud
Java-Chassis核心源码
Dubbo3场景及其原理
ServiceComb中的Java-Chassis模块是实现类似Dubbo的RPC的框架,又称Java底座,Dubbo解析请参考上一篇博文。
Consumer到Provider整体流程
- Consumer和Provider的Handler都支持通过SPI动态扩展,如流控、熔断、隔离、负载均衡等扩展。
- Consumer在传输层支持Filter拦截过滤请求,支持通过扩展点处理Request和Response。
- 监听者模式的完美实现,通过SPI读取监听者,监听者监听控制中心的状态,修改控制中心的状态信息。
- 自己业务代码也可以使用这种方式扩展功能并动态组合策略,解耦各种功能业务代码。
类型 | artifact id | 是否可选 | 功能说明 |
---|---|---|---|
编程模型 | provider-pojo | 是 | 提供RPC开发模式 |
编程模型 | provider-jaxrs | 是 | 提供JAX RS开发模式 |
编程模型 | provider-springmvc | 是 | 提供Spring MVC开发模式 |
通信模型 | transport-rest-vertx | 是 | 运行于HTTP之上的开发框架,不依赖WEB容器,应用打包为可执行jar |
通信模型 | transport-rest-servlet | 是 | 运行于WEB容器的开发框架,应用打包为war包 |
通信模型 | transport-highway | 是 | 提供高性能的私有通信协议,仅用于java之间互通。 |
运行模型 | handler-loadbalance | 是 | 负载均衡模块。提供各种路由策略和配置方法。一般客户端使用。 |
运行模型 | handler-bizkeeper | 是 | 和服务治理相关的功能,比如隔离、熔断、容错。 |
运行模型 | handler-tracing | 是 | 调用链跟踪模块,对接监控系统,吐出打点数据。 |
@EnableServiceComb-初始化ServiceComb
通过@ImportResource
的方式导入ServiceComb的核心Bean定义文件"classpath*:META-INF/spring/scb-core-bean.xml";和"classpath*:META-INF/spring/*.bean.xml";
,注入如下Bean定义。
-
ConfigurationSpringInitializer
-
CseApplicationListener,ContextRefreshedEvent事件监听器
-
在
CseApplicationListener#setApplicationContext
初始化相关的连接if (this.applicationContext == applicationContext) { // same object. avoid initialize many times. return; } this.applicationContext = applicationContext; BeanUtils.setContext(applicationContext); HttpClients.load(); // Http相关的 RegistrationManager.INSTANCE.init(); // 服务注册 管理初始化,通过SPI获取。 DiscoveryManager.INSTANCE.init(); // 服务发现 管理初始化,通过SPI获取。
-
ContextRefreshedEvent Bean实例化完事件触达,初始化ServiceComb引擎SCBEngine
@Override public void onApplicationEvent(ApplicationEvent event) { if (initEventClass.isInstance(event)) { if (applicationContext instanceof AbstractApplicationContext) { ((AbstractApplicationContext) applicationContext).registerShutdownHook(); } SCBEngine scbEngine = SCBEngine.getInstance(); // 1、获取单例 scbEngine.setApplicationContext(applicationContext); // 2、设置Contex scbEngine.setPriorityPropertyManager(applicationContext.getBean(PriorityPropertyManager.class)); scbEngine.setFilterChainsManager(applicationContext.getBean(FilterChainsManager.class)); // 3、指责链管理类 scbEngine.getConsumerProviderManager().getConsumerProviderList() .addAll(applicationContext.getBeansOfType(ConsumerProvider.class).values()); // 4、获取支持的Consumer形式,如Rest/Pojo scbEngine.getProducerProviderManager().getProducerProviderList() .addAll(applicationContext.getBeansOfType(ProducerProvider.class).values()); // // 5、获取Provider注册的元信息,支持从Rest/Pojo中获取 scbEngine.addBootListeners(applicationContext.getBeansOfType(BootListener.class).values()); // 6、获取全量SCB初始化监听器BootListener,用于监听SCB在生命周期内的各种事件。 scbEngine.run(); // 7、run方法 } else if (event instanceof ContextClosedEvent) { if (SCBEngine.getInstance() != null) { SCBEngine.getInstance().destroy(); } } }
-
获取单例SCBEngine<重载类,必须单例,Double Check,骚操作可实现隔离>,会首先初始化事件总线等逻辑。
public static SCBEngine getInstance() { if (null == INSTANCE) { // private static final Object initializationLock = new Object(); 全局锁 synchronized (initializationLock) { if (null == INSTANCE) { new SCBEngine(); } } } return INSTANCE; } protected SCBEngine() { eventBus = EventManager.getEventBus(); // 获取事件总线 eventBus.register(this); // 注册事件 @Subscribe @AllowConcurrentEvents INSTANCE = this; producerProviderManager = new ProducerProviderManager(this); serviceRegistryListener = new ServiceRegistryListener(this); }
-
初始化事件总线,使用Guava事件总线处理事件,解耦内部各个模块的逻辑。
-
ProducerProviderManager
生产提供者管理器,用于管理不同的Provider类型,包括PojoProducerProvider、RestProducerProvider,下文会通过注解的方式讲解这两种Provider。 -
SCBEngine监听InvocationStartEvent/InvocationFinishEvent事件,用于AtomicLong统计调用开始和调用结束的总数。
@AllowConcurrentEvents @Subscribe public void onInvocationStart(InvocationStartEvent event) { invocationStartedCounter.incrementAndGet(); } @AllowConcurrentEvents @Subscribe public void onInvocationFinish(InvocationFinishEvent event) { invocationFinishedCounter.incrementAndGet(); }
-
注册ServiceRegistryListener到事件总线,监听以下事件,将引擎需要关注的,和服务发现类的各项操作解耦开,这是常规的操作,我们一般还是可以效仿的。
@EnableExceptionPropagation @SubscriberOrder(-1000) @Subscribe public void onCreateMicroservice(CreateMicroserviceEvent event); @EnableExceptionPropagation @SubscriberOrder(-1000) @Subscribe public void onDestroyMicroservice(DestroyMicroserviceEvent event); @EnableExceptionPropagation @SubscriberOrder(-1000) @Subscribe public void onCreateMicroserviceVersion(CreateMicroserviceVersionEvent event);
-
-
初始化消费者管理,提供者管理,BootListener,BootListener支持SPI或者@Component注解扩展,这种事件总线的模式值得产参考参考各位。
-
监听各种事件,如切handler或filter前后、服务注册前后、传输前后、消费和生产提供者前后等,支持如下事件
enum EventType { BEFORE_HANDLER, AFTER_HANDLER, BEFORE_FILTER, AFTER_FILTER, BEFORE_PRODUCER_PROVIDER, AFTER_PRODUCER_PROVIDER, BEFORE_CONSUMER_PROVIDER, AFTER_CONSUMER_PROVIDER, BEFORE_TRANSPORT, AFTER_TRANSPORT, BEFORE_REGISTRY, AFTER_REGISTRY, BEFORE_CLOSE, AFTER_CLOSE }
-
-
scb.run -> doRun方法,可以打开org.apache.servicecomb.core.SCBEngine的日志
public synchronized SCBEngine run() { if (SCBStatus.DOWN.equals(status)) { try { doRun(); waitStatusUp(); // 等待SCB变成UP状态,会在收到MicroserviceInstanceRegisteredEvent事件的时候修改。 } catch (TimeoutException e) { } catch (Throwable e) { try { destroy(); } catch (Exception exception) { } status = SCBStatus.FAILED; } finally { printServiceInfo(); // 最终打印Service的全部信息 } } private void doRun() throws Exception { status = SCBStatus.STARTING; // 1、BootListener排序包括SPI+Compnent扩展的类 bootListeners.sort(Comparator.comparingInt(BootListener::getOrder)); // 2、触发BootEvent调用BootListener里面对应的监听方法,上下文Contex类,不论是监听者还是职责链,都会需要传递上下文的Contex类。 triggerEvent(EventType.BEFORE_HANDLER); // 3、通过SPI加载全部的Handler到Config.class中,并初始化到Manager中 HandlerConfigUtils.init(consumerHandlerManager, producerHandlerManager); // 4、触发Handler加载之后的事件,我们可以监听handler加载到Engine的事件,这种写法爽不,非常爽,事件总线。 triggerEvent(EventType.AFTER_HANDLER); // 5、同理操作Filter triggerEvent(EventType.BEFORE_FILTER); filterChainsManager.init(); triggerEvent(EventType.AFTER_FILTER); // 6、创建提供者元信息 createProducerMicroserviceMeta(); // 7、初始化生产管理者并触发事件 triggerEvent(EventType.BEFORE_PRODUCER_PROVIDER); producerProviderManager.init(); triggerEvent(EventType.AFTER_PRODUCER_PROVIDER); // 8、初始化消费管理者并触发事件 triggerEvent(EventType.BEFORE_CONSUMER_PROVIDER); consumerProviderManager.init(); // 含有生命周期的Class可以使用init,destory等模板方法 /** for (ConsumerProvider provider : consumerProviderList) { provider.init(); // 模板方法 } **/ triggerEvent(EventType.AFTER_CONSUMER_PROVIDER); // 9、初始化TransportManager管理器,通过SPI加载Transport的实现, // 包括HighwayTransport/ServletRestTransport/VertxRestTransport // Transport的实现都有名字,同名会选择其中的一个。 // 加载选择完会调用Transport的Init方法初始化。 triggerEvent(EventType.BEFORE_TRANSPORT); transportManager.init(this); triggerEvent(EventType.AFTER_TRANSPORT); triggerEvent(EventType.BEFORE_REGISTRY); // 10、注册AfterRegistryEventHanlder监MicroserviceInstanceRegisteredEvent事件,修改scb的状态为up triggerAfterRegistryEvent(); // 11、服务注册启动 RegistrationManager.INSTANCE.run(); // 12、服务发现启动 DiscoveryManager.INSTANCE.run(); shutdownHook = new Thread(this::destroyForShutdownHook); Runtime.getRuntime().addShutdownHook(shutdownHook); }
通过以上注释的数字索引详解部分有价值的地方。
-
9、TransportManager
public void init(SCBEngine scbEngine) throws Exception { buildTransportMap(); // SPI分组并选择 for (Transport transport : transportMap.values()) { if (transport.init()) { // 初始化,初始化失败会出问题的 Endpoint endpoint = transport.getPublishEndpoint(); if (endpoint != null && endpoint.getEndpoint() != null) { LOGGER.info("endpoint to publish: {}", endpoint.getEndpoint()); RegistrationManager.INSTANCE.addEndpoint(endpoint.getEndpoint()); // 注册到注册中心中 } continue; } } }
-
11、RegistrationManager,非常重要,见下详解。
-
12、DiscoveryManager
-
-
TransportManager-管理传输层
通过SPI解耦了各个Module,用户也可以基于实现自定义Transport层的实现,这种策略方式还是可以学习的。
- java-chassis-core-2.6.0.jar核心包中提供了TransportManager、AbstractTransport、Transport基础类。TransportManager通过SPI加载Transport.class的实现类,提供了AbstractTransport.class用于将传输层的功能抽象到父类中。
- transport-rest-vertx/transport-rest-servlet/transport-highway三个都是独立的jar包,用于扩展Transport
TransportManager-管理传输层
通过SPI解耦了各个Module,用户也可以基于实现自定义Transport层的实现,这种策略方式还是可以学习的。
-
java-chassis-core-2.6.0.jar核心包中提供了TransportManager、AbstractTransport、Transport基础类。TransportManager通过SPI加载Transport.class的实现类,提供了AbstractTransport.class用于将传输层的功能抽象到父类中。
-
transport-rest-vertx/transport-rest-servlet/transport-highway三个都是独立的jar包,用于扩展Transport。
-
Rest模式的transport会经过各种HttpClientFilter来动态拦截前后的请求,在拦截流程中通过
Invocation
这个Contex上下文来传递全量信息,这种思路非常的通用极易学习,代码这样写起来非常的舒服好看,这里的异步编程也是值得我们学习的,后面可以细看。org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#invoke public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Exception { this.invocation = invocation; this.asyncResp = asyncResp; OperationMeta operationMeta = invocation.getOperationMeta(); restOperationMeta = operationMeta.getExtData(RestConst.SWAGGER_REST_OPERATION); String path = this.createRequestPath(restOperationMeta); IpPort ipPort = (IpPort) invocation.getEndpoint().getAddress(); Future<HttpClientRequest> requestFuture = createRequest(ipPort, path); invocation.getInvocationStageTrace().startGetConnection(); requestFuture.compose(clientRequest -> { invocation.getInvocationStageTrace().finishGetConnection(); this.clientRequest = clientRequest; clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName()); RestClientRequestImpl restClientRequest = new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler); invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest); Buffer requestBodyBuffer; try { requestBodyBuffer = restClientRequest.getBodyBuffer(); } catch (Exception e) { return Future.failedFuture(e); } HttpServletRequestEx requestEx = new VertxClientRequestToHttpServletRequest(clientRequest, requestBodyBuffer); invocation.getInvocationStageTrace().startClientFiltersRequest(); // 1、执行Filter.beforeSendRequest,可用于动态修改request撒 for (HttpClientFilter filter : httpClientFilters) { if (filter.enabled()) { filter.beforeSendRequest(invocation, requestEx); } } // 2、通过事件总线发事件撒 invocation.onStartSendRequest(); httpClientWithContext.runOnContext(httpClient -> { clientRequest.setTimeout(operationMeta.getConfig().getMsRequestTimeout()); clientRequest.response().onComplete(asyncResult -> { if (asyncResult.failed()) { fail(asyncResult.cause()); return; } // 3、执行Filter.afterReceiveResponse,可用于动态修改Response撒 handleResponse(asyncResult.result()); }); processServiceCombHeaders(invocation, operationMeta); restClientRequest.end() .onComplete((t) -> invocation.getInvocationStageTrace().finishWriteToBuffer(System.nanoTime())); }); return Future.succeededFuture(); }).onFailure(failure -> { }); }
RegistrationManager-注册服务
Faced门面类,用于管理全部的Registration,功能全部路由到下游实体类,Registration是实际管理Provider注册到注册中心的逻辑(支持动态扩展不同的注册中心,可插拔)。会通过SPI的方式加载全部的Registration.class
实现,解耦和实际注册中心处理类的实现,Faced设计模式可以学习,门面简化内部的实现细节及其使用。
-
RegistrationManager.INSTANCE.init()
INSTANCE单例初始化,模板方法addBeans有些牛逼,做了两件事,非常值得学习啊。
其一:自动给SPI加载的类Autowire(兼容Spring)。其二:将Spring框架里面的Bean也加载到RegistrationManager之中。
// 1、SPI加载全部的实现,并调用过滤方法 SPIServiceUtils.getOrLoadSortedService(Registration.class) .stream() .filter((SPIEnabled::enabled)) .forEach(registrationList::add); // 2、会在CseApplicationListener#setApplicationContext被调用的init public void init() { // BeanUtils.addBeans(Registration.class, registrationList); initPrimary(); // 初始化全部的registration registrationList.forEach(registration -> registration.init()); } public static <T extends SPIOrder & SPIEnabled> void addBeans(Class<T> cls, List<T> exists) { if (context == null) { return; } // 尝试通过Spring中的Bean autowire相应的instance for (T instance : exists) { context.getAutowireCapableBeanFactory().autowireBean(instance); } // 添加到exists中 for (T bean : context.getBeansOfType(cls).values()) { if (bean.enabled()) { exists.add(bean); } } // 排个序呗 exists.sort(Comparator.comparingInt(SPIOrder::getOrder)); } }
-
RegistrationManager.INSTANCE.run()
public void run() { EventManager.getEventBus().register(new AfterServiceInstanceRegistryHandler(registrationList.size())); registrationList.forEach(registration -> registration.run()); }
-
Registration的具体实现-ServiceCenterRegistration,代理到RegistryUtils通过ServiceRegistry(RemoteServiceRegistry是其实现)和注册中心通信。
- RemoteServiceRegistry,Rest请求,事件总线监听者模式,定时任务执行,这里可以后续细看细看,EventBus发挥着巨大的作用。
- 定时执行ServiceCenterTask,包括各种
Runnable
的实现,定时执行参数从ServiceRegistryConfig中获取。 - 定时心跳健康监测执行。
- 通过ServiceRegistryClientImpl和注册中心Rest通信,注册服务到注册中心。在执行的逻辑中,会通过post各种事件,让外部监听者可以收到相应的请求。
- 定时执行ServiceCenterTask,包括各种
- RemoteServiceRegistry,Rest请求,事件总线监听者模式,定时任务执行,这里可以后续细看细看,EventBus发挥着巨大的作用。
-
Registration的具体实现-LocalRegistration,本地注册中心
DiscoveryManager-服务发现
Registration/Discovery-继承多接口将功能抽象分类可学
继承多个接口,每个接口都是高度抽象的功能,区分不同的接口,将方法分类,经典的面向对象的思路,解耦不同的方法,便于后续动态扩展。模板方法将面向接口的编程,使得当前的类功能非常容易扩展。
public interface Registration extends SPIEnabled, SPIOrder, LifeCycle
public interface Discovery extends SPIEnabled, SPIOrder, LifeCycle
-
SPIEnabled,SPI加载进去是否使用。
-
SPIOrder,SPI加载到内存中的排序。
-
LifeCycle,当前类具有生命周期,模板方法相继调用不同的方法,如init、run,destory。
-
ServiceCenterRegistration实现了Registration用于和注册中心交互,代理到RegistryUtils -> RemoteServiceRegistry和注册中心交互。
-
ServiceCenterDiscovery实现了Discovery,同样代理到了RegistryUtils -> RemoteServiceRegistry和注册中心交互。
基于上述二位,我们有必要解读下RemoteServiceRegistry
哈。
RemoteServiceRegistry-注意这个怎么和Eureka交互,底层交互有意思
这种类的设计方式和Apollo的类构造很像,分为本地仓库,远端仓库等模式,本地配置,远端配置等。
最终肯定会调用ServiceRegistryClient - > HttpClients 发起Rest请求。
重点关注如下配置?
- Provider新增,多久会更新到Consumer服务本地?
- Provider挂了,多久会更新到Consumer服务本地?
- 这些参数是否支持可以配置?
结合下图理解如下方法流程:
-
Provider心跳保活/注册,定时任务和注册中心保活provider,这是定时间隔serviceRegistryConfig.getHeartbeatInterval() /s,会定时执行ServiceCenterTask -> MicroserviceServiceCenterTask,包括如下Task,注册微服务监控,心跳,状态同步等。
addTask(new MicroserviceRegisterTask(eventBus, srClient, microservice)); addTask(new MicroserviceInstanceRegisterTask(eventBus, serviceRegistryConfig, srClient, microservice)); addTask(new MicroserviceWatchTask(eventBus, serviceRegistryConfig, srClient, microservice)); addTask(new MicroserviceInstanceHeartbeatTask(eventBus, srClient, microservice)); // 心跳心跳,活着么? addTask(new MicroserviceInstanceStatusSyncTask(eventBus, srClient, microservice));
-
Provider下线
-
Consumer定时更新拉取Provider并更新本地服务缓存:定时间隔,serviceRegistryConfig.getInstancePullInterval() /s,如果服务失效需要移除服务。
org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache#refreshCache
寻找某个jar包由哪个pom文件引入
1、mvn dependency:tree -Dverbose -Dincludes=org.apache.servicecomb:java-chassis-core:2.1.5
2、mvn dependency:tree -Dverbose 找出全部依赖树,如果失败则执行向下install方法
@RpcSchema-Pojo类型调用的生产者
@RpcSchema(schemaId = "hello")
public class HelloImpl implements Hello {
@Override
public String sayHi(String name) {
return "Hello " + name;
}
@Override
public String sayHello(Person person) {
return "Hello person " + person.getName();
}
}
-
注解会被PojoProducers这个BeanPostProcessor处理,用于往注册中心注册服务。
protected void processProvider(String beanName, Object bean) { // 1、aop后,新的实例的父类可能是原class,也可能只是个proxy,父类不是原class,所以,需要先取出原class,再取标注 // 调用 AopProxyUtils.ultimateTargetClass Class<?> beanCls = BeanUtils.getImplClassFromBean(bean); if (beanCls == null) { return; } RpcSchema rpcSchema = beanCls.getAnnotation(RpcSchema.class); if (rpcSchema == null) { return; } // 2、获取schemaId,如果没有传递则获取接口名 String schemaId = rpcSchema.schemaId(); if (StringUtils.isEmpty(schemaId)) { Class<?>[] intfs = beanCls.getInterfaces(); if (intfs.length == 1) { schemaId = intfs[0].getName(); } else { throw new Error("Must be schemaId or implements only one interface"); } } // 3、注册producer元信息 PojoProducerMeta pojoProducerMeta = new PojoProducerMeta(); pojoProducerMeta.setSchemaId(schemaId); // pojoProducerMeta.setSchemaInterface(rpcSchema.schemaInterface()); // 接口名 pojoProducerMeta.setInstance(bean); // 实例类 registerPojoProducer(pojoProducerMeta); }
-
在SCB启动的时候,
scbEngine.getProducerProviderManager().getProducerProviderList()
,获取全部ProducerProvider的实现类,用于获取Bean工厂中的生产提供者。包括从SPI中创建的,及使用Bean的形式注入的,默认主要有以下两种-
PojoProducerProvider
:scb run的时候,会调用PojoProducerProvider的Init方法从PojoProducers中获取全部的Provider元信息,注意在scbRun的时候,Bean工厂实例化全部的Bean实例。public List<ProducerMeta> init() { // for some test cases, there is no spring context if (BeanUtils.getContext() == null) { return Collections.emptyList(); } PojoProducers pojoProducers = BeanUtils.getContext().getBean(PojoProducers.class); for (ProducerMeta producerMeta : pojoProducers.getProducerMetas()) { PojoProducerMeta pojoProducerMeta = (PojoProducerMeta) producerMeta; initPojoProducerMeta(pojoProducerMeta); } return pojoProducers.getProducerMetas(); }
-
RestProducerProvider
:同理,从RestProducers中获取全部提供Rest接口的实例信息。
-
-
`会获取PojoProducerProvider -> 。
-
-
SCB.Run的时候,会通过获取到的元信息,将其注册到注册中心上对外提供服务。
@RestSchema-Rest形式调用的生产者
和RpcSchema处理方式类似,并且可以和RpcSchema的形式并存。
- 注解会被RestProducers处理,并将元信息保存。
- 在scb.run的时候,会被RestProducerProvider获取全部的元信息,并传递到SCB最终的控制中心中去。
@RpcReference-消费者
@RpcReference(microserviceName = "hello", schemaId = "hello")
private static Hello hello;
Pojo形式的Rpc消费方的注解,将自动从注册中心拉取对应的服务名,动态代理Compute并注入代理类。
-
注解会被RpcReferenceProcessor这个BeanPostProcessor处理,动态代理增强Hello。十分注意StringValueResolver支持动态解析占位符。
public class RpcReferenceProcessor implements BeanPostProcessor, EmbeddedValueResolverAware { private StringValueResolver resolver; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { // 扫描所有field,处理扩展的field标注 ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() { public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { processConsumerField(bean, field); } }); return bean; } protected void processConsumerField(Object bean, Field field) { RpcReference reference = field.getAnnotation(RpcReference.class); if (reference == null) { return; } handleReferenceField(bean, field, reference); } private void handleReferenceField(Object obj, Field field, RpcReference reference) { String microserviceName = reference.microserviceName(); microserviceName = resolver.resolveStringValue(microserviceName); // 微服务的名字支持占位符 PojoReferenceMeta pojoReference = new PojoReferenceMeta(); pojoReference.setMicroserviceName(microserviceName); pojoReference.setSchemaId(reference.schemaId()); pojoReference.setConsumerIntf(field.getType()); // proxy = Invoker.createProxy(microserviceName, schemaId, consumerIntf); 动态代理 pojoReference.afterPropertiesSet(); ReflectionUtils.makeAccessible(field); ReflectionUtils.setField(field, obj, pojoReference.getProxy()); } }
- 设置Consumer的元信息,包括服务的名字,SchemaId,契约接口名,最后根据接口名,生成动态代理类。
- 反射的方式注入field。必须makeAccessible使其支持private注入。
SPIServiceUtils-自定义SPI加载器
ServiceComb大量使用SPI动态扩展对应的实现,SPI定义的使能及优先级,从全部jar包中
META-INF/services的路径文件中加载扩展类,文件名为接口类的全路径里面内容为接口实现。
-
SPIOrder,支持加载SPI顺序,排序从小到大,order数值越小,优先级越高,调用越靠前。
-
SPIEnabled,支持SPI实现类是否使能。
-
SPIServiceUtils,代理到JDK自带的ServiceLoader动态加载SPI类,并使用ReflectionUtils用于获取SPIOrder排序对应的实现类。
// 加载SPI全部类 public static <T> List<T> loadSortedService(Class<T> serviceType) { List<Entry<Integer, T>> serviceEntries = new ArrayList<>(); ServiceLoader<T> serviceLoader = ServiceLoader.load(serviceType); // 代理到JDK serviceLoader.forEach(service -> { int serviceOrder = 0; // 实现getOrder方法就获取,否则就使用默认值,不强迫用户实现Order方法 Method getOrder = ReflectionUtils.findMethod(service.getClass(), "getOrder"); if (getOrder != null) { serviceOrder = (int) ReflectionUtils.invokeMethod(getOrder, service); // 触发方法 } Entry<Integer, T> entry = new SimpleEntry<>(serviceOrder, service); serviceEntries.add(entry); }); List<T> services = serviceEntries.stream() .sorted(Comparator.comparingInt(Entry::getKey)) .map(Entry::getValue) .collect(Collectors.toList()); // 根据 getOrder 排序,并返回全部的serviceType。 LOGGER.info("Found SPI service {}, count={}.", serviceType.getName(), services.size()); for (int idx = 0; idx < services.size(); idx++) { T service = services.get(idx); LOGGER.info(" {}. {}.", idx, service.getClass().getName()); } // 输出加载的日志及其顺序,方便定位调试功能 return services; } // Map<Class<?>, List<Object>> cache = new ConcurrentHashMap<>(); // loadSortedService 并非线程安全,因此添加类锁 public static <T> List<T> getOrLoadSortedService(Class<T> serviceType) { List<Object> services = cache.get(serviceType); if (services == null) { synchronized (LOCK) { services = cache.get(serviceType); if (services == null) { services = (List<Object>) loadSortedService(serviceType); cache.put(serviceType, services); } } }
SPI加载类的全量日志:
职责链管理器FilterChainsManager/ConsumerHandlerManager
不论在服务的提供方或者在服务的调用方,在触发请求或者响应请求的时候,都会通过职责链的方式顺序调用各种Handler,框架动态从handler.xml文件中读取Handler算子,支持在yaml文件中通过配置文件动态装配,非常灵活。
-
实现了Handler仓库HandlerConfigUtils。PaaSResourceUtils继承Spring ResourceUtils,读取资源文件
// 1、handler-loadbalance-2.6.0.jar ->cse.handler.xml <config> <handler id="loadbalance" class="org.apache.servicecomb.loadbalance.LoadbalanceHandler"/> </config> // 2、从配置文件中加载全部的Config类 private static Config loadConfig() throws Exception { Config config = new Config(); // 1、读取配置文件 classpath* 代表在全量jar包中寻找 // 2、classpath*:config/cse.handler.xml classpath*:config/cse.*.handler.xml // 3、PathMatchingResourcePatternResolver 加载全部的Resource资源路径 // 4、排序资源的路径(xxxx.handler.cse,根据名字xxx进行排序) List<Resource> resList = PaaSResourceUtils.getSortedResources("classpath*:config/cse.handler.xml", ".handler.xml"); for (Resource res : resList) { // 5. 通过XmlMapper读取handler.xml文件映射到Config类中 Config tmpConfig = XmlLoaderUtils.load(res, Config.class); config.mergeFrom(tmpConfig); }
-
从xml文件中加载到Config.class
,这里可以学习如何读取XML文件
通过ObjectMapper xmlMapper = new XmlMapper()
读取readValue(res.getURL(), cls)
<config> <handler id="loadbalance" class="org.apache.servicecomb.loadbalance.LoadbalanceHandler"/> </config> // 1、通过注解,将Config Bean类 和 xml文件一一映射 @JacksonXmlRootElement // XML根元素<config> public class Config { // key为handler id private Map<String, Class<Handler>> handlerClassMap = new HashMap<>(); public void mergeFrom(Config otherConfig) { handlerClassMap.putAll(otherConfig.handlerClassMap); } public Map<String, Class<Handler>> getHandlerClassMap() { return this.handlerClassMap; } @JacksonXmlProperty(localName = "handler") // 属性 @JacksonXmlElementWrapper(useWrapping = false) public void setHandlerConfigList(List<HandlerConfig> handlerConfigList) { for (HandlerConfig handlerConfig : handlerConfigList) { handlerClassMap.put(handlerConfig.getHandlerId(), handlerConfig.getClazz()); } } } // 2、定义子属性映射 public class HandlerConfig { private String handlerId; private Class<Handler> clazz; @JacksonXmlProperty(localName = "id", isAttribute = true) public String getHandlerId() { return handlerId; } public void setHandlerId(String handlerId) { this.handlerId = handlerId; } @JacksonXmlProperty(localName = "class", isAttribute = true) public Class<Handler> getClazz() { return clazz; } public void setClazz(Class<Handler> clazz) { this.clazz = clazz; } @SuppressWarnings("unchecked") public void setClazz(String clazz) throws ClassNotFoundException { this.clazz = (Class<Handler>) Class.forName(clazz); } }
-
Handler算子的编排,使用ConsumerHandlerManager和ProducerHandlerManager读取yaml中的配置文件,通过名字对Handler进行编排,Handler并未实现Order接口,调用顺序交给编排方。
-
Consumer/Provider支持自定义ConsumerHandlerManager/ProducerHandlerManager类生成职责链列表,最后的Consumer Handler-TransportClientHandler.INSTANCE, 最后的Provider Handler–ProducerOperationHandler.INSTANCE
以下动态创建职责链配置:
// 职责链Key,配置在yaml文件中 protected List<Handler> create(String microserviceName) { // 是否定义服务对应的handler编排servicecomb.handler.chain.Provider.service.calculator // 没有则使用默认的 servicecomb.handler.chain.Provider.default String handlerChainKey = "servicecomb.handler.chain." + getName() + ".service." + microserviceName; String chainDef = DynamicPropertyFactory.getInstance() .getStringProperty(handlerChainKey, defaultChainDef) .get(); LOGGER.info("get handler chain for [{}]: [{}]", handlerChainKey, chainDef); return createHandlerChain(chainDef); }
-
BootListener
PojoInvocation保存List<Handler>
职责链,并实现
举一反三
EventBus
事件总线:通过发布事件类来驱动监听当前事件的类中方法的调用,简化监听者模式的实现
以下几个使用的注解可以了解下:
@EnableExceptionPropagation
@SubscriberOrder(-1000)
@Subscribe
PaaSResourceUtils+PathMatchingResourcePatternResolver
- 继承自Spring自带的ResourceUtils用于resolving resource locations to files in the file system,获取文件及判断文件。
PathMatchingResourcePatternResolver
通过正则方式去读取路径上文件并返回Resource[]
。- 正因为Resolver的存在可以解析
"classpath*:config/cse.handler.xml", ".handler.xml"
这种资源路径。
PropertySourcesPlaceholderConfigurer
BeanFactoryPostProcessor,用于解析Bean对应的Property。
- @Value注解,会被
AutowiredAnnotationBeanPostProcessor
解析,并替换占位符注入合适的值。 - XML配置如果含有属性注入配置,则
PropertySourcesPlaceholderConfigurer#postProcessBeanFactory
会被替换占位符注入合适的值。
ThreadPoolExecutorEx/LinkedBlockingQueueEx
ThreadPoolExecutorEx
背景:扩展的ThreadPool,在coreSize到达的时候,并发继续增加,会优先创建Thread直到达到maxSize,才会放入BlockQueue,Dubbo和Tomcat都扩展了类似的功能,当BlockQueue的size为无限大的时候,避免Thread永远无法到达maxSize。
实现:默认ThreadPool按照如下逻辑执行Task。
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 1、workTask小于coreSize,则直接创建新的Thread执行command。
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 2、已达到coreSize,则放入BlockQueue调用offer方法。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 3、BlockQueue放满,则继续添加worker,直到达到最大。
reject(command);
}
如上要点在第二步,只需要修改workQueue的offer逻辑,就可以优先创建Thead,可使用扩展的LinkedBlockingQueueEx。
在offer的时候,获取TheadPool的size,当小于最大的时候,直接创建新的Thead,执行Task。
public class LinkedBlockingQueueEx extends LinkedBlockingQueue<Runnable> {
private transient volatile ThreadPoolExecutorEx owner = null;
public void setOwner(ThreadPoolExecutorEx owner) {
this.owner = owner;
}
@Override
public boolean offer(Runnable runnable) {
// task can come before owner available
if (owner == null) {
return super.offer(runnable);
}
// can not create more thread, just queue the task
if (owner.getPoolSize() == owner.getMaximumPoolSize()) {
return super.offer(runnable);
}
// no need to create more thread, just queue the task
if (owner.getNotFinished() <= owner.getPoolSize()) {
return super.offer(runnable);
}
// all threads are busy, and can create new thread, not queue the task
if (owner.getPoolSize() < owner.getMaximumPoolSize()) {
return false;
}
return super.offer(runnable);
}
/*
* when task is rejected (thread pool if full), force the item onto queue.
*/
public boolean force(Runnable runnable) {
if (owner == null || owner.isShutdown()) {
throw new RejectedExecutionException("queue is not running.");
}
return super.offer(runnable);
}
}
相应的扩展的TheadPool也override部分方法,用于当前提交的任务总数,完成的人数总数,拒绝的任务总数。
public class ThreadPoolExecutorEx extends ThreadPoolExecutor {
private AtomicInteger submittedCount = new AtomicInteger();
private AtomicInteger finishedCount = new AtomicInteger();
private AtomicInteger rejectedCount = new AtomicInteger();
public ThreadPoolExecutorEx(int coreThreads, int maxThreads, int maxIdleInSecond, TimeUnit timeUnit,
BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
super(coreThreads, maxThreads, maxIdleInSecond, timeUnit, queue, threadFactory);
if (queue instanceof LinkedBlockingQueueEx) {
((LinkedBlockingQueueEx) queue).setOwner(this);
}
setRejectedExecutionHandler(this::rejectedExecution);
}
@Override
public void execute(Runnable command) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException e) {
if (getQueue() instanceof LinkedBlockingQueueEx) {
final LinkedBlockingQueueEx queue = (LinkedBlockingQueueEx) getQueue();
if (!queue.force(command)) {
throw new RejectedExecutionException("thread pool queue is full");
}
} else {
throw e;
}
}
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
rejectedCount.incrementAndGet();
finishedCount.incrementAndGet();
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
finishedCount.incrementAndGet();
}
public int getNotFinished() {
return submittedCount.get() - finishedCount.get();
}
public int getRejectedCount() {
return rejectedCount.get();
}
}
ConcurrentHashMapEx
默认的ConcurrentHashMap当Key都在同一个桶的时候,调用computeIfAbsent的时候,都会对当前的桶加锁(分段锁),当在高并发读多余写的场景,这里的加锁会影响单个桶的性能,因此可以优先通过CAS获取下元素,然后再调用默认方法,扩展性能。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FzUwVpEM-1671287799657)(images/image-20221204161851865.png)]
// ConcurrentHashMap.computeIfAbsent always do "synchronized" operation
// so we wrap it to improve performance
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
V value = get(key); // CAS GET
if (value != null) {
return value;
}
return super.computeIfAbsent(key, mappingFunction);
}
AopProxyUtils/BeanUtils
ReflectionUtils
-
ReflectionUtils#doWithFields(Class<?> clazz, FieldCallback, FieldFilter):使用Lamda表达式简化实现模板方法<可以参考>,支持自定义处理方法以及过滤逻辑,也类似实现了访问者模式的效果,可以支持访问Filed,访问Methed等逻辑。
getDeclaredFields会优先从ConcurrentReferenceHashMap缓存中获取,否则再从
clazz.getDeclaredFields()
中获取// 递归获取全部的DeclaredFields,然后执行过滤以及处理逻辑 Class<?> targetClass = clazz; do { // public, protected, default (package) access, and private fields, but excludes inherited fields. // 因此需要递归 Field[] fields = getDeclaredFields(targetClass); for (Field field : fields) { if (ff != null && !ff.matches(field)) { continue; } try { fc.doWith(field); } catch (IllegalAccessException ex) { } } targetClass = targetClass.getSuperclass(); } while (targetClass != null && targetClass != Object.class); }
-
makeAccessible:合理将Method、Feild、Ctor设置为Accessible,支持set值进去。
-
setField(Field field, @Nullable Object target, @Nullable Object value),将实例的Field设置Value。
-
findField(Class<?> clazz, @Nullable String name, @Nullable Class<?> type)
:递归通过name或type寻找Field,找到第一个就直接返回。两个判断条件且支持传null,所以可以加个&&和短路法逻辑,type为null则后面逻辑不起作用,如果不为null则后面逻辑需要使用,简化逻辑的书写,这个可以学习下。(type == null || type.equals(field.getType()))
Class<?> searchType = clazz; while (Object.class != searchType && searchType != null) { Field[] fields = getDeclaredFields(searchType); for (Field field : fields) { // 短路法,极大简化了if else逻辑的书写,非常方便。 if ((name == null || name.equals(field.getName())) && (type == null || type.equals(field.getType()))) { return field; // 找到就返回了哈,不继续找了。 } } searchType = searchType.getSuperclass(); } return null;
-
其余方法代办。
StringValueResolver
Bean类实现EmbeddedValueResolverAware接口,则会自动注入StringValueResolver,用于解析带有占位符的String值,如${com.huawei.com.name}
-> Perjoker。这样我们也支持解析从属性配置中 自动填充String占位符啦。
SpringCloud
负载均衡
最后
以上就是迷人宝马为你收集整理的ServiceComb场景及其原理Java-Chassis核心源码举一反三的全部内容,希望文章能够帮你解决ServiceComb场景及其原理Java-Chassis核心源码举一反三所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复