我是靠谱客的博主 迷人宝马,最近开发中收集的这篇文章主要介绍ServiceComb场景及其原理Java-Chassis核心源码举一反三,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • Java-Chassis核心源码
    • Consumer到Provider整体流程
    • @EnableServiceComb-初始化ServiceComb
      • TransportManager-管理传输层
      • TransportManager-管理传输层
      • RegistrationManager-注册服务
      • DiscoveryManager-服务发现
      • Registration/Discovery-继承多接口将功能抽象分类可学
      • RemoteServiceRegistry-注意这个怎么和Eureka交互,底层交互有意思
    • @RpcSchema-Pojo类型调用的生产者
    • @RestSchema-Rest形式调用的生产者
    • @RpcReference-消费者
    • SPIServiceUtils-自定义SPI加载器
    • 职责链管理器FilterChainsManager/ConsumerHandlerManager
  • 举一反三
    • EventBus
    • PaaSResourceUtils+PathMatchingResourcePatternResolver
    • PropertySourcesPlaceholderConfigurer
    • ThreadPoolExecutorEx/LinkedBlockingQueueEx
    • ConcurrentHashMapEx
    • AopProxyUtils/BeanUtils
    • ReflectionUtils
    • StringValueResolver
    • SpringCloud

Java-Chassis核心源码

Dubbo3场景及其原理
ServiceComb中的Java-Chassis模块是实现类似Dubbo的RPC的框架,又称Java底座,Dubbo解析请参考上一篇博文。

Consumer到Provider整体流程

在这里插入图片描述

  • Consumer和Provider的Handler都支持通过SPI动态扩展,如流控、熔断、隔离、负载均衡等扩展。
  • Consumer在传输层支持Filter拦截过滤请求,支持通过扩展点处理Request和Response。
  • 监听者模式的完美实现,通过SPI读取监听者,监听者监听控制中心的状态,修改控制中心的状态信息。
  • 自己业务代码也可以使用这种方式扩展功能并动态组合策略,解耦各种功能业务代码。
类型artifact id是否可选功能说明
编程模型provider-pojo提供RPC开发模式
编程模型provider-jaxrs提供JAX RS开发模式
编程模型provider-springmvc提供Spring MVC开发模式
通信模型transport-rest-vertx运行于HTTP之上的开发框架,不依赖WEB容器,应用打包为可执行jar
通信模型transport-rest-servlet运行于WEB容器的开发框架,应用打包为war包
通信模型transport-highway提供高性能的私有通信协议,仅用于java之间互通。
运行模型handler-loadbalance负载均衡模块。提供各种路由策略和配置方法。一般客户端使用。
运行模型handler-bizkeeper和服务治理相关的功能,比如隔离、熔断、容错。
运行模型handler-tracing调用链跟踪模块,对接监控系统,吐出打点数据。

@EnableServiceComb-初始化ServiceComb

通过@ImportResource的方式导入ServiceComb的核心Bean定义文件"classpath*:META-INF/spring/scb-core-bean.xml";和"classpath*:META-INF/spring/*.bean.xml";,注入如下Bean定义。

  • ConfigurationSpringInitializer

  • CseApplicationListener,ContextRefreshedEvent事件监听器

    • CseApplicationListener#setApplicationContext初始化相关的连接

      if (this.applicationContext == applicationContext) {
            // same object. avoid initialize many times.
            return;
          }
          this.applicationContext = applicationContext;
          BeanUtils.setContext(applicationContext);
          HttpClients.load(); // Http相关的
          RegistrationManager.INSTANCE.init(); // 服务注册 管理初始化,通过SPI获取。
          DiscoveryManager.INSTANCE.init(); // 服务发现 管理初始化,通过SPI获取。
      
    • ContextRefreshedEvent Bean实例化完事件触达,初始化ServiceComb引擎SCBEngine

        @Override
        public void onApplicationEvent(ApplicationEvent event) {
          if (initEventClass.isInstance(event)) {
            if (applicationContext instanceof AbstractApplicationContext) {
              ((AbstractApplicationContext) applicationContext).registerShutdownHook();
            }
      
            SCBEngine scbEngine = SCBEngine.getInstance(); // 1、获取单例
            
            scbEngine.setApplicationContext(applicationContext); // 2、设置Contex
            scbEngine.setPriorityPropertyManager(applicationContext.getBean(PriorityPropertyManager.class));
            scbEngine.setFilterChainsManager(applicationContext.getBean(FilterChainsManager.class)); // 3、指责链管理类
            scbEngine.getConsumerProviderManager().getConsumerProviderList()
                .addAll(applicationContext.getBeansOfType(ConsumerProvider.class).values()); // 4、获取支持的Consumer形式,如Rest/Pojo
            scbEngine.getProducerProviderManager().getProducerProviderList()
                .addAll(applicationContext.getBeansOfType(ProducerProvider.class).values()); // // 5、获取Provider注册的元信息,支持从Rest/Pojo中获取
            scbEngine.addBootListeners(applicationContext.getBeansOfType(BootListener.class).values()); // 6、获取全量SCB初始化监听器BootListener,用于监听SCB在生命周期内的各种事件。
      
            scbEngine.run(); // 7、run方法
          } else if (event instanceof ContextClosedEvent) {
            if (SCBEngine.getInstance() != null) {
              SCBEngine.getInstance().destroy();
            }
          }
        }
      
    1. 获取单例SCBEngine<重载类,必须单例,Double Check,骚操作可实现隔离>,会首先初始化事件总线等逻辑。

      public static SCBEngine getInstance() {
          if (null == INSTANCE) {
            // private static final Object initializationLock = new Object(); 全局锁
            synchronized (initializationLock) {
              if (null == INSTANCE) {
                new SCBEngine();
              }
            }
          }
          return INSTANCE;
        }
      
      protected SCBEngine() {
          eventBus = EventManager.getEventBus(); // 获取事件总线
      
          eventBus.register(this); // 注册事件 @Subscribe  @AllowConcurrentEvents
      
          INSTANCE = this;
      
          producerProviderManager = new ProducerProviderManager(this);
          serviceRegistryListener = new ServiceRegistryListener(this);
        }
      
      • 初始化事件总线,使用Guava事件总线处理事件,解耦内部各个模块的逻辑。

      • ProducerProviderManager生产提供者管理器,用于管理不同的Provider类型,包括PojoProducerProvider、RestProducerProvider,下文会通过注解的方式讲解这两种Provider。

      • SCBEngine监听InvocationStartEvent/InvocationFinishEvent事件,用于AtomicLong统计调用开始和调用结束的总数。

          @AllowConcurrentEvents
          @Subscribe
          public void onInvocationStart(InvocationStartEvent event) {
            invocationStartedCounter.incrementAndGet();
          }
        
          @AllowConcurrentEvents
          @Subscribe
          public void onInvocationFinish(InvocationFinishEvent event) {
            invocationFinishedCounter.incrementAndGet();
          }
        
      • 注册ServiceRegistryListener到事件总线,监听以下事件,将引擎需要关注的,和服务发现类的各项操作解耦开,这是常规的操作,我们一般还是可以效仿的。

          @EnableExceptionPropagation
          @SubscriberOrder(-1000)
          @Subscribe
          public void onCreateMicroservice(CreateMicroserviceEvent event);
          
          @EnableExceptionPropagation
          @SubscriberOrder(-1000)
          @Subscribe
          public void onDestroyMicroservice(DestroyMicroserviceEvent event);
          
          @EnableExceptionPropagation
          @SubscriberOrder(-1000)
          @Subscribe
          public void onCreateMicroserviceVersion(CreateMicroserviceVersionEvent event);
        
    2. 初始化消费者管理,提供者管理,BootListener,BootListener支持SPI或者@Component注解扩展,这种事件总线的模式值得产参考参考各位。

      • 监听各种事件,如切handler或filter前后、服务注册前后、传输前后、消费和生产提供者前后等,支持如下事件

        enum EventType {
            BEFORE_HANDLER,
            AFTER_HANDLER,
            BEFORE_FILTER,
            AFTER_FILTER,
            BEFORE_PRODUCER_PROVIDER,
            AFTER_PRODUCER_PROVIDER,
            BEFORE_CONSUMER_PROVIDER,
            AFTER_CONSUMER_PROVIDER,
            BEFORE_TRANSPORT,
            AFTER_TRANSPORT,
            BEFORE_REGISTRY,
            AFTER_REGISTRY,
            BEFORE_CLOSE,
            AFTER_CLOSE
          }
        
    3. scb.run -> doRun方法,可以打开org.apache.servicecomb.core.SCBEngine的日志

      public synchronized SCBEngine run() {
          if (SCBStatus.DOWN.equals(status)) {
            try {
              doRun(); 
              waitStatusUp(); // 等待SCB变成UP状态,会在收到MicroserviceInstanceRegisteredEvent事件的时候修改。     
            } catch (TimeoutException e) {
            } catch (Throwable e) {
              try {
                destroy();
              } catch (Exception exception) {
              }
              status = SCBStatus.FAILED;
            } finally {
              printServiceInfo(); // 最终打印Service的全部信息
            }
          }
      private void doRun() throws Exception {
          status = SCBStatus.STARTING;
      
          // 1、BootListener排序包括SPI+Compnent扩展的类
          bootListeners.sort(Comparator.comparingInt(BootListener::getOrder));
      
          // 2、触发BootEvent调用BootListener里面对应的监听方法,上下文Contex类,不论是监听者还是职责链,都会需要传递上下文的Contex类。
          triggerEvent(EventType.BEFORE_HANDLER);
          
          // 3、通过SPI加载全部的Handler到Config.class中,并初始化到Manager中
          HandlerConfigUtils.init(consumerHandlerManager, producerHandlerManager);
          
          // 4、触发Handler加载之后的事件,我们可以监听handler加载到Engine的事件,这种写法爽不,非常爽,事件总线。
          triggerEvent(EventType.AFTER_HANDLER);
          
          // 5、同理操作Filter
          triggerEvent(EventType.BEFORE_FILTER);
          filterChainsManager.init();
          triggerEvent(EventType.AFTER_FILTER);
          
      	// 6、创建提供者元信息
          createProducerMicroserviceMeta();
      
          // 7、初始化生产管理者并触发事件
          triggerEvent(EventType.BEFORE_PRODUCER_PROVIDER);
          producerProviderManager.init();
          triggerEvent(EventType.AFTER_PRODUCER_PROVIDER);
      
          // 8、初始化消费管理者并触发事件
          triggerEvent(EventType.BEFORE_CONSUMER_PROVIDER);
          consumerProviderManager.init(); // 含有生命周期的Class可以使用init,destory等模板方法
            /**
            for (ConsumerProvider provider : consumerProviderList) {
            		provider.init(); // 模板方法
          	}
            **/
          triggerEvent(EventType.AFTER_CONSUMER_PROVIDER);
      
          // 9、初始化TransportManager管理器,通过SPI加载Transport的实现,
          // 包括HighwayTransport/ServletRestTransport/VertxRestTransport
          // Transport的实现都有名字,同名会选择其中的一个。
          // 加载选择完会调用Transport的Init方法初始化。
          triggerEvent(EventType.BEFORE_TRANSPORT);
          transportManager.init(this);
          triggerEvent(EventType.AFTER_TRANSPORT);
      
          triggerEvent(EventType.BEFORE_REGISTRY);
      
          // 10、注册AfterRegistryEventHanlder监MicroserviceInstanceRegisteredEvent事件,修改scb的状态为up
          triggerAfterRegistryEvent(); 
      
          // 11、服务注册启动
          RegistrationManager.INSTANCE.run(); 
          // 12、服务发现启动
          DiscoveryManager.INSTANCE.run(); 
      
          shutdownHook = new Thread(this::destroyForShutdownHook);
          Runtime.getRuntime().addShutdownHook(shutdownHook);
        }    
          
      

      通过以上注释的数字索引详解部分有价值的地方。

      • 9、TransportManager

        public void init(SCBEngine scbEngine) throws Exception {
            buildTransportMap(); // SPI分组并选择
        
            for (Transport transport : transportMap.values()) {
              if (transport.init()) { // 初始化,初始化失败会出问题的
                Endpoint endpoint = transport.getPublishEndpoint();
                if (endpoint != null && endpoint.getEndpoint() != null) {
                  LOGGER.info("endpoint to publish: {}", endpoint.getEndpoint());
                  RegistrationManager.INSTANCE.addEndpoint(endpoint.getEndpoint()); // 注册到注册中心中
                }
                continue;
              }
            }
          }
        
      • 11、RegistrationManager,非常重要,见下详解。

      • 12、DiscoveryManager

TransportManager-管理传输层

通过SPI解耦了各个Module,用户也可以基于实现自定义Transport层的实现,这种策略方式还是可以学习的。

  1. java-chassis-core-2.6.0.jar核心包中提供了TransportManager、AbstractTransport、Transport基础类。TransportManager通过SPI加载Transport.class的实现类,提供了AbstractTransport.class用于将传输层的功能抽象到父类中。
  2. transport-rest-vertx/transport-rest-servlet/transport-highway三个都是独立的jar包,用于扩展Transport

TransportManager-管理传输层

通过SPI解耦了各个Module,用户也可以基于实现自定义Transport层的实现,这种策略方式还是可以学习的。

  1. java-chassis-core-2.6.0.jar核心包中提供了TransportManager、AbstractTransport、Transport基础类。TransportManager通过SPI加载Transport.class的实现类,提供了AbstractTransport.class用于将传输层的功能抽象到父类中。

  2. transport-rest-vertx/transport-rest-servlet/transport-highway三个都是独立的jar包,用于扩展Transport。

  3. Rest模式的transport会经过各种HttpClientFilter来动态拦截前后的请求,在拦截流程中通过Invocation这个Contex上下文来传递全量信息,这种思路非常的通用极易学习,代码这样写起来非常的舒服好看,这里的异步编程也是值得我们学习的,后面可以细看。

    org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#invoke
    public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Exception {
        this.invocation = invocation;
        this.asyncResp = asyncResp;
    
        OperationMeta operationMeta = invocation.getOperationMeta();
        restOperationMeta = operationMeta.getExtData(RestConst.SWAGGER_REST_OPERATION);
    
        String path = this.createRequestPath(restOperationMeta);
        IpPort ipPort = (IpPort) invocation.getEndpoint().getAddress();
    
        Future<HttpClientRequest> requestFuture = createRequest(ipPort, path);
    
        invocation.getInvocationStageTrace().startGetConnection();
        requestFuture.compose(clientRequest -> {
          invocation.getInvocationStageTrace().finishGetConnection();
    
          this.clientRequest = clientRequest;
    
          clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName());
          RestClientRequestImpl restClientRequest =
              new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler);
          invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);
    
          Buffer requestBodyBuffer;
          try {
            requestBodyBuffer = restClientRequest.getBodyBuffer();
          } catch (Exception e) {
            return Future.failedFuture(e);
          }
          HttpServletRequestEx requestEx = new VertxClientRequestToHttpServletRequest(clientRequest, requestBodyBuffer);
          invocation.getInvocationStageTrace().startClientFiltersRequest();
          // 1、执行Filter.beforeSendRequest,可用于动态修改request撒  
          for (HttpClientFilter filter : httpClientFilters) {
            if (filter.enabled()) {
              filter.beforeSendRequest(invocation, requestEx);
            }
          }
    
          // 2、通过事件总线发事件撒
          invocation.onStartSendRequest();
          httpClientWithContext.runOnContext(httpClient -> {
            clientRequest.setTimeout(operationMeta.getConfig().getMsRequestTimeout());
            clientRequest.response().onComplete(asyncResult -> {
              if (asyncResult.failed()) {
                fail(asyncResult.cause());
                return;
              }
              // 3、执行Filter.afterReceiveResponse,可用于动态修改Response撒  
              handleResponse(asyncResult.result());
            });
            processServiceCombHeaders(invocation, operationMeta);
            restClientRequest.end()
                .onComplete((t) -> invocation.getInvocationStageTrace().finishWriteToBuffer(System.nanoTime()));
          });
          return Future.succeededFuture();
        }).onFailure(failure -> {
        });
      }
    

RegistrationManager-注册服务

Faced门面类,用于管理全部的Registration,功能全部路由到下游实体类,Registration是实际管理Provider注册到注册中心的逻辑(支持动态扩展不同的注册中心,可插拔)。会通过SPI的方式加载全部的Registration.class实现,解耦和实际注册中心处理类的实现,Faced设计模式可以学习,门面简化内部的实现细节及其使用。

  1. RegistrationManager.INSTANCE.init()INSTANCE单例初始化,模板方法addBeans有些牛逼,做了两件事,非常值得学习啊。
    其一:自动给SPI加载的类Autowire(兼容Spring)。

    其二:将Spring框架里面的Bean也加载到RegistrationManager之中。

    // 1、SPI加载全部的实现,并调用过滤方法
    SPIServiceUtils.getOrLoadSortedService(Registration.class)
            .stream()
            .filter((SPIEnabled::enabled))
            .forEach(registrationList::add);
    // 2、会在CseApplicationListener#setApplicationContext被调用的init
    public void init() {
        //
        BeanUtils.addBeans(Registration.class, registrationList);
    
        initPrimary();
        // 初始化全部的registration
        registrationList.forEach(registration -> registration.init());
      }
    
    
    public static <T extends SPIOrder & SPIEnabled> void addBeans(Class<T> cls, List<T> exists) {
        if (context == null) {
          return;
        }
        
    	// 尝试通过Spring中的Bean autowire相应的instance
        for (T instance : exists) {
          context.getAutowireCapableBeanFactory().autowireBean(instance);
        }
        // 添加到exists中
        for (T bean : context.getBeansOfType(cls).values()) {
          if (bean.enabled()) {
            exists.add(bean);
          }
        }
        // 排个序呗
        exists.sort(Comparator.comparingInt(SPIOrder::getOrder));
      }
    }
    
  2. RegistrationManager.INSTANCE.run()

    public void run() {
        EventManager.getEventBus().register(new AfterServiceInstanceRegistryHandler(registrationList.size())); 
        registrationList.forEach(registration -> registration.run());
      }
    
  3. Registration的具体实现-ServiceCenterRegistration,代理到RegistryUtils通过ServiceRegistry(RemoteServiceRegistry是其实现)和注册中心通信。

    • RemoteServiceRegistry,Rest请求,事件总线监听者模式,定时任务执行,这里可以后续细看细看EventBus发挥着巨大的作用。
      • 定时执行ServiceCenterTask,包括各种Runnable的实现,定时执行参数从ServiceRegistryConfig中获取。
      • 定时心跳健康监测执行。
      • 通过ServiceRegistryClientImpl和注册中心Rest通信,注册服务到注册中心。在执行的逻辑中,会通过post各种事件,让外部监听者可以收到相应的请求。
  4. Registration的具体实现-LocalRegistration,本地注册中心

DiscoveryManager-服务发现

Registration/Discovery-继承多接口将功能抽象分类可学

继承多个接口,每个接口都是高度抽象的功能,区分不同的接口,将方法分类,经典的面向对象的思路,解耦不同的方法,便于后续动态扩展。模板方法将面向接口的编程,使得当前的类功能非常容易扩展。

public interface Registration extends SPIEnabled, SPIOrder, LifeCycle
public interface Discovery extends SPIEnabled, SPIOrder, LifeCycle
  • SPIEnabled,SPI加载进去是否使用。

  • SPIOrder,SPI加载到内存中的排序。

  • LifeCycle,当前类具有生命周期,模板方法相继调用不同的方法,如init、run,destory。

  • ServiceCenterRegistration实现了Registration用于和注册中心交互,代理到RegistryUtils -> RemoteServiceRegistry和注册中心交互。

  • ServiceCenterDiscovery实现了Discovery,同样代理到了RegistryUtils -> RemoteServiceRegistry和注册中心交互。

基于上述二位,我们有必要解读下RemoteServiceRegistry哈。

RemoteServiceRegistry-注意这个怎么和Eureka交互,底层交互有意思

这种类的设计方式和Apollo的类构造很像,分为本地仓库,远端仓库等模式,本地配置,远端配置等。

最终肯定会调用ServiceRegistryClient - > HttpClients 发起Rest请求。

重点关注如下配置?

  1. Provider新增,多久会更新到Consumer服务本地?
  2. Provider挂了,多久会更新到Consumer服务本地?
  3. 这些参数是否支持可以配置?

结合下图理解如下方法流程:

  • Provider心跳保活/注册,定时任务和注册中心保活provider,这是定时间隔serviceRegistryConfig.getHeartbeatInterval() /s,会定时执行ServiceCenterTask -> MicroserviceServiceCenterTask,包括如下Task,注册微服务监控,心跳,状态同步等。

    addTask(new MicroserviceRegisterTask(eventBus, srClient, microservice));
    addTask(new MicroserviceInstanceRegisterTask(eventBus, serviceRegistryConfig, srClient, microservice));
    addTask(new MicroserviceWatchTask(eventBus, serviceRegistryConfig, srClient, microservice));
    addTask(new MicroserviceInstanceHeartbeatTask(eventBus, srClient, microservice)); // 心跳心跳,活着么?
    addTask(new MicroserviceInstanceStatusSyncTask(eventBus, srClient, microservice));
    
  • Provider下线

  • Consumer定时更新拉取Provider并更新本地服务缓存:定时间隔,serviceRegistryConfig.getInstancePullInterval() /s,如果服务失效需要移除服务。org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache#refreshCache

请添加图片描述

寻找某个jar包由哪个pom文件引入

1、mvn dependency:tree -Dverbose -Dincludes=org.apache.servicecomb:java-chassis-core:2.1.5

2、mvn dependency:tree -Dverbose 找出全部依赖树,如果失败则执行向下install方法

@RpcSchema-Pojo类型调用的生产者

@RpcSchema(schemaId = "hello")
public class HelloImpl implements Hello {

  @Override
  public String sayHi(String name) {
    return "Hello " + name;
  }

  @Override
  public String sayHello(Person person) {
    return "Hello person " + person.getName();
  }
}
  • 注解会被PojoProducers这个BeanPostProcessor处理,用于往注册中心注册服务。

    protected void processProvider(String beanName, Object bean) {
        // 1、aop后,新的实例的父类可能是原class,也可能只是个proxy,父类不是原class,所以,需要先取出原class,再取标注
        // 调用 AopProxyUtils.ultimateTargetClass
        Class<?> beanCls = BeanUtils.getImplClassFromBean(bean);
        if (beanCls == null) {
          return;
        }
        RpcSchema rpcSchema = beanCls.getAnnotation(RpcSchema.class);
        if (rpcSchema == null) {
          return;
        }
    
        // 2、获取schemaId,如果没有传递则获取接口名
        String schemaId = rpcSchema.schemaId();
        if (StringUtils.isEmpty(schemaId)) {
          Class<?>[] intfs = beanCls.getInterfaces();
          if (intfs.length == 1) {
            schemaId = intfs[0].getName();
          } else {
            throw new Error("Must be schemaId or implements only one interface");
          }
        }
    
        // 3、注册producer元信息
        PojoProducerMeta pojoProducerMeta = new PojoProducerMeta();
        pojoProducerMeta.setSchemaId(schemaId); // 
        pojoProducerMeta.setSchemaInterface(rpcSchema.schemaInterface()); // 接口名
        pojoProducerMeta.setInstance(bean); // 实例类
    
        registerPojoProducer(pojoProducerMeta);
      }
    
    • 在SCB启动的时候,scbEngine.getProducerProviderManager().getProducerProviderList(),获取全部ProducerProvider的实现类,用于获取Bean工厂中的生产提供者。包括从SPI中创建的,及使用Bean的形式注入的,默认主要有以下两种

      • PojoProducerProvider:scb run的时候,会调用PojoProducerProvider的Init方法从PojoProducers中获取全部的Provider元信息,注意在scbRun的时候,Bean工厂实例化全部的Bean实例。

        public List<ProducerMeta> init() {
            // for some test cases, there is no spring context
            if (BeanUtils.getContext() == null) {
              return Collections.emptyList();
            }
        
            PojoProducers pojoProducers = BeanUtils.getContext().getBean(PojoProducers.class);
            for (ProducerMeta producerMeta : pojoProducers.getProducerMetas()) {
              PojoProducerMeta pojoProducerMeta = (PojoProducerMeta) producerMeta;
              initPojoProducerMeta(pojoProducerMeta);
            }
        
            return pojoProducers.getProducerMetas();
          }
        
      • RestProducerProvider:同理,从RestProducers中获取全部提供Rest接口的实例信息。

    • `会获取PojoProducerProvider -> 。

      
      
  • SCB.Run的时候,会通过获取到的元信息,将其注册到注册中心上对外提供服务。

@RestSchema-Rest形式调用的生产者

和RpcSchema处理方式类似,并且可以和RpcSchema的形式并存。

  • 注解会被RestProducers处理,并将元信息保存。
  • 在scb.run的时候,会被RestProducerProvider获取全部的元信息,并传递到SCB最终的控制中心中去。

@RpcReference-消费者

@RpcReference(microserviceName = "hello", schemaId = "hello")
private static Hello hello;

Pojo形式的Rpc消费方的注解,将自动从注册中心拉取对应的服务名,动态代理Compute并注入代理类。

  • 注解会被RpcReferenceProcessor这个BeanPostProcessor处理,动态代理增强Hello。十分注意StringValueResolver支持动态解析占位符。

    public class RpcReferenceProcessor implements BeanPostProcessor, EmbeddedValueResolverAware {
      private StringValueResolver resolver;
     @Override
      public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        // 扫描所有field,处理扩展的field标注
        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
          public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
            processConsumerField(bean, field);
          }
        });
    
        return bean;
      }
      protected void processConsumerField(Object bean, Field field) {
        RpcReference reference = field.getAnnotation(RpcReference.class);
        if (reference == null) {
          return;
        }
    
        handleReferenceField(bean, field, reference);
      }
      private void handleReferenceField(Object obj, Field field,
          RpcReference reference) {
        String microserviceName = reference.microserviceName();
        microserviceName = resolver.resolveStringValue(microserviceName); // 微服务的名字支持占位符
        PojoReferenceMeta pojoReference = new PojoReferenceMeta();
        pojoReference.setMicroserviceName(microserviceName);
        pojoReference.setSchemaId(reference.schemaId());
        pojoReference.setConsumerIntf(field.getType());
    
    	// proxy = Invoker.createProxy(microserviceName, schemaId, consumerIntf); 动态代理
        pojoReference.afterPropertiesSet();
    
        ReflectionUtils.makeAccessible(field);
        ReflectionUtils.setField(field, obj, pojoReference.getProxy());
      } 
     }
    
    1. 设置Consumer的元信息,包括服务的名字,SchemaId,契约接口名,最后根据接口名,生成动态代理类。
    2. 反射的方式注入field。必须makeAccessible使其支持private注入。

SPIServiceUtils-自定义SPI加载器

ServiceComb大量使用SPI动态扩展对应的实现,SPI定义的使能及优先级,从全部jar包中

META-INF/services的路径文件中加载扩展类,文件名为接口类的全路径里面内容为接口实现。

  • SPIOrder,支持加载SPI顺序,排序从小到大,order数值越小,优先级越高,调用越靠前。

  • SPIEnabled,支持SPI实现类是否使能

  • SPIServiceUtils,代理到JDK自带的ServiceLoader动态加载SPI类,并使用ReflectionUtils用于获取SPIOrder排序对应的实现类

    // 加载SPI全部类
    public static <T> List<T> loadSortedService(Class<T> serviceType) {
        List<Entry<Integer, T>> serviceEntries = new ArrayList<>();
        ServiceLoader<T> serviceLoader = ServiceLoader.load(serviceType); // 代理到JDK
        serviceLoader.forEach(service -> {
          int serviceOrder = 0;
          // 实现getOrder方法就获取,否则就使用默认值,不强迫用户实现Order方法  
          Method getOrder = ReflectionUtils.findMethod(service.getClass(), "getOrder"); 
          if (getOrder != null) {
            serviceOrder = (int) ReflectionUtils.invokeMethod(getOrder, service); // 触发方法
          }
    
          Entry<Integer, T> entry = new SimpleEntry<>(serviceOrder, service);
          serviceEntries.add(entry);
        });
    
        List<T> services = serviceEntries.stream()
            .sorted(Comparator.comparingInt(Entry::getKey))
            .map(Entry::getValue)
            .collect(Collectors.toList()); // 根据 getOrder 排序,并返回全部的serviceType。
    
        LOGGER.info("Found SPI service {}, count={}.", serviceType.getName(), services.size());
        for (int idx = 0; idx < services.size(); idx++) {
          T service = services.get(idx);
          LOGGER.info("  {}. {}.", idx, service.getClass().getName());
        } // 输出加载的日志及其顺序,方便定位调试功能
    
        return services;
      }
    // Map<Class<?>, List<Object>> cache = new ConcurrentHashMap<>();
    // loadSortedService 并非线程安全,因此添加类锁
    public static <T> List<T> getOrLoadSortedService(Class<T> serviceType) {
        List<Object> services = cache.get(serviceType);
        if (services == null) {
          synchronized (LOCK) {
            services = cache.get(serviceType);
            if (services == null) {
              services = (List<Object>) loadSortedService(serviceType);
              cache.put(serviceType, services);
            }
          }
        }
    
    

SPI加载类的全量日志:

在这里插入图片描述

在这里插入图片描述

职责链管理器FilterChainsManager/ConsumerHandlerManager

不论在服务的提供方或者在服务的调用方,在触发请求或者响应请求的时候,都会通过职责链的方式顺序调用各种Handler,框架动态从handler.xml文件中读取Handler算子,支持在yaml文件中通过配置文件动态装配,非常灵活。

  • 实现了Handler仓库HandlerConfigUtils。PaaSResourceUtils继承Spring ResourceUtils,读取资源文件

    // 1、handler-loadbalance-2.6.0.jar ->cse.handler.xml
    <config>
      <handler id="loadbalance"
        class="org.apache.servicecomb.loadbalance.LoadbalanceHandler"/>
    </config>
    
    // 2、从配置文件中加载全部的Config类
    private static Config loadConfig() throws Exception {
        Config config = new Config();
    	// 1、读取配置文件 classpath* 代表在全量jar包中寻找
    	// 2、classpath*:config/cse.handler.xml   classpath*:config/cse.*.handler.xml
    	// 3、PathMatchingResourcePatternResolver 加载全部的Resource资源路径
        // 4、排序资源的路径(xxxx.handler.cse,根据名字xxx进行排序)
        List<Resource> resList =
            PaaSResourceUtils.getSortedResources("classpath*:config/cse.handler.xml", ".handler.xml");
        for (Resource res : resList) {
          // 5. 通过XmlMapper读取handler.xml文件映射到Config类中
          Config tmpConfig = XmlLoaderUtils.load(res, Config.class);
          config.mergeFrom(tmpConfig);
        }
    
  • 从xml文件中加载到Config.class,这里可以学习如何读取XML文件
    通过ObjectMapper xmlMapper = new XmlMapper()读取readValue(res.getURL(), cls)

    <config>
      <handler id="loadbalance"
        class="org.apache.servicecomb.loadbalance.LoadbalanceHandler"/>
    </config>
    
    // 1、通过注解,将Config Bean类 和 xml文件一一映射
    @JacksonXmlRootElement // XML根元素<config>
    public class Config {
      // key为handler id
      private Map<String, Class<Handler>> handlerClassMap = new HashMap<>();
    
      public void mergeFrom(Config otherConfig) {
        handlerClassMap.putAll(otherConfig.handlerClassMap);
      }
    
      public Map<String, Class<Handler>> getHandlerClassMap() {
        return this.handlerClassMap;
      }
    
      @JacksonXmlProperty(localName = "handler") // 属性
      @JacksonXmlElementWrapper(useWrapping = false)
      public void setHandlerConfigList(List<HandlerConfig> handlerConfigList) {
        for (HandlerConfig handlerConfig : handlerConfigList) {
          handlerClassMap.put(handlerConfig.getHandlerId(), handlerConfig.getClazz());
        }
      }
    }
    // 2、定义子属性映射
    public class HandlerConfig {
      private String handlerId;
    
      private Class<Handler> clazz;
    
      @JacksonXmlProperty(localName = "id", isAttribute = true)
      public String getHandlerId() {
        return handlerId;
      }
    
      public void setHandlerId(String handlerId) {
        this.handlerId = handlerId;
      }
    
      @JacksonXmlProperty(localName = "class", isAttribute = true)
      public Class<Handler> getClazz() {
        return clazz;
      }
    
      public void setClazz(Class<Handler> clazz) {
        this.clazz = clazz;
      }
    
      @SuppressWarnings("unchecked")
      public void setClazz(String clazz) throws ClassNotFoundException {
        this.clazz = (Class<Handler>) Class.forName(clazz);
      }
    }
    
    
  • Handler算子的编排,使用ConsumerHandlerManager和ProducerHandlerManager读取yaml中的配置文件,通过名字对Handler进行编排,Handler并未实现Order接口,调用顺序交给编排方。

    • Consumer/Provider支持自定义ConsumerHandlerManager/ProducerHandlerManager类生成职责链列表,最后的Consumer Handler-TransportClientHandler.INSTANCE, 最后的Provider Handler–ProducerOperationHandler.INSTANCE

      以下动态创建职责链配置:

      // 职责链Key,配置在yaml文件中
      protected List<Handler> create(String microserviceName) {
          // 是否定义服务对应的handler编排servicecomb.handler.chain.Provider.service.calculator
          // 没有则使用默认的 servicecomb.handler.chain.Provider.default
          String handlerChainKey = "servicecomb.handler.chain." + getName() + ".service." + microserviceName;
          String chainDef = DynamicPropertyFactory.getInstance()
              .getStringProperty(handlerChainKey,
                  defaultChainDef)
              .get();
          LOGGER.info("get handler chain for [{}]: [{}]", handlerChainKey, chainDef);
          return createHandlerChain(chainDef);
        }
      

BootListener

PojoInvocation保存List<Handler>职责链,并实现

举一反三

EventBus

事件总线:通过发布事件类来驱动监听当前事件的类中方法的调用,简化监听者模式的实现

以下几个使用的注解可以了解下:

@EnableExceptionPropagation
@SubscriberOrder(-1000)
@Subscribe

PaaSResourceUtils+PathMatchingResourcePatternResolver

  • 继承自Spring自带的ResourceUtils用于resolving resource locations to files in the file system,获取文件及判断文件。
  • PathMatchingResourcePatternResolver通过正则方式去读取路径上文件并返回Resource[]
  • 正因为Resolver的存在可以解析"classpath*:config/cse.handler.xml", ".handler.xml"这种资源路径。

PropertySourcesPlaceholderConfigurer

BeanFactoryPostProcessor,用于解析Bean对应的Property。

  • @Value注解,会被AutowiredAnnotationBeanPostProcessor解析,并替换占位符注入合适的值。
  • XML配置如果含有属性注入配置,则PropertySourcesPlaceholderConfigurer#postProcessBeanFactory会被替换占位符注入合适的值。

ThreadPoolExecutorEx/LinkedBlockingQueueEx

ThreadPoolExecutorEx

背景:扩展的ThreadPool,在coreSize到达的时候,并发继续增加,会优先创建Thread直到达到maxSize,才会放入BlockQueue,Dubbo和Tomcat都扩展了类似的功能,当BlockQueue的size为无限大的时候,避免Thread永远无法到达maxSize。

实现:默认ThreadPool按照如下逻辑执行Task。

public void execute(Runnable command) {		
		int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 1、workTask小于coreSize,则直接创建新的Thread执行command。
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 2、已达到coreSize,则放入BlockQueue调用offer方法。
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 3、BlockQueue放满,则继续添加worker,直到达到最大。
            reject(command);
}

如上要点在第二步,只需要修改workQueue的offer逻辑,就可以优先创建Thead,可使用扩展的LinkedBlockingQueueEx。

在offer的时候,获取TheadPool的size,当小于最大的时候,直接创建新的Thead,执行Task。

public class LinkedBlockingQueueEx extends LinkedBlockingQueue<Runnable> {
  private transient volatile ThreadPoolExecutorEx owner = null;
  public void setOwner(ThreadPoolExecutorEx owner) {
    this.owner = owner;
  }

  @Override
  public boolean offer(Runnable runnable) {
    // task can come before owner available
    if (owner == null) {
      return super.offer(runnable);
    }
    // can not create more thread, just queue the task
    if (owner.getPoolSize() == owner.getMaximumPoolSize()) {
      return super.offer(runnable);
    }
    // no need to create more thread, just queue the task
    if (owner.getNotFinished() <= owner.getPoolSize()) {
      return super.offer(runnable);
    }
    // all threads are busy, and can create new thread, not queue the task
    if (owner.getPoolSize() < owner.getMaximumPoolSize()) {
      return false;
    }
    return super.offer(runnable);
  }

  /*
   * when task is rejected (thread pool if full), force the item onto queue.
   */
  public boolean force(Runnable runnable) {
    if (owner == null || owner.isShutdown()) {
      throw new RejectedExecutionException("queue is not running.");
    }
    return super.offer(runnable);
  }
}

相应的扩展的TheadPool也override部分方法,用于当前提交的任务总数,完成的人数总数,拒绝的任务总数。

public class ThreadPoolExecutorEx extends ThreadPoolExecutor {
  private AtomicInteger submittedCount = new AtomicInteger();

  private AtomicInteger finishedCount = new AtomicInteger();

  private AtomicInteger rejectedCount = new AtomicInteger();

  public ThreadPoolExecutorEx(int coreThreads, int maxThreads, int maxIdleInSecond, TimeUnit timeUnit,
      BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
    super(coreThreads, maxThreads, maxIdleInSecond, timeUnit, queue, threadFactory);
    if (queue instanceof LinkedBlockingQueueEx) {
      ((LinkedBlockingQueueEx) queue).setOwner(this);
    }
    setRejectedExecutionHandler(this::rejectedExecution);
  }

  @Override
  public void execute(Runnable command) {
    submittedCount.incrementAndGet();
    try {
      super.execute(command);
    } catch (RejectedExecutionException e) {
      if (getQueue() instanceof LinkedBlockingQueueEx) {
        final LinkedBlockingQueueEx queue = (LinkedBlockingQueueEx) getQueue();
        if (!queue.force(command)) {
          throw new RejectedExecutionException("thread pool queue is full");
        }
      } else {
        throw e;
      }
    }
  }

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    rejectedCount.incrementAndGet();
    finishedCount.incrementAndGet();

    throw new RejectedExecutionException("Task " + r.toString() +
        " rejected from " +
        e.toString());
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    finishedCount.incrementAndGet();
  }

  public int getNotFinished() {
    return submittedCount.get() - finishedCount.get();
  }

  public int getRejectedCount() {
    return rejectedCount.get();
  }
}

ConcurrentHashMapEx

默认的ConcurrentHashMap当Key都在同一个桶的时候,调用computeIfAbsent的时候,都会对当前的桶加锁(分段锁),当在高并发读多余写的场景,这里的加锁会影响单个桶的性能,因此可以优先通过CAS获取下元素,然后再调用默认方法,扩展性能。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FzUwVpEM-1671287799657)(images/image-20221204161851865.png)]

  // ConcurrentHashMap.computeIfAbsent always do "synchronized" operation
  // so we wrap it to improve performance
  @Override
  public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
    V value = get(key); // CAS GET
    if (value != null) {
      return value;
    }

    return super.computeIfAbsent(key, mappingFunction);
  }

AopProxyUtils/BeanUtils

ReflectionUtils

  • ReflectionUtils#doWithFields(Class<?> clazz, FieldCallback, FieldFilter):使用Lamda表达式简化实现模板方法<可以参考>,支持自定义处理方法以及过滤逻辑,也类似实现了访问者模式的效果,可以支持访问Filed,访问Methed等逻辑。

    getDeclaredFields会优先从ConcurrentReferenceHashMap缓存中获取,否则再从clazz.getDeclaredFields()中获取

    		// 递归获取全部的DeclaredFields,然后执行过滤以及处理逻辑
    		Class<?> targetClass = clazz;
    		do {
    // public, protected, default (package) access, and private fields, but excludes inherited fields.
    // 因此需要递归            
    			Field[] fields = getDeclaredFields(targetClass);
    			for (Field field : fields) {
    				if (ff != null && !ff.matches(field)) {
    					continue;
    				}
    				try {
    					fc.doWith(field);
    				}
    				catch (IllegalAccessException ex) {
    					
    				}
    			}
    			targetClass = targetClass.getSuperclass();
    		}
    		while (targetClass != null && targetClass != Object.class);
    	}
    
  • makeAccessible:合理将Method、Feild、Ctor设置为Accessible,支持set值进去。

  • setField(Field field, @Nullable Object target, @Nullable Object value),将实例的Field设置Value。

  • findField(Class<?> clazz, @Nullable String name, @Nullable Class<?> type) :递归通过name或type寻找Field,找到第一个就直接返回。两个判断条件且支持传null,所以可以加个&&和短路法逻辑,type为null则后面逻辑不起作用,如果不为null则后面逻辑需要使用,简化逻辑的书写,这个可以学习下。(type == null || type.equals(field.getType()))

            Class<?> searchType = clazz;
    		while (Object.class != searchType && searchType != null) {
    			Field[] fields = getDeclaredFields(searchType);
    			for (Field field : fields) {
                    // 短路法,极大简化了if else逻辑的书写,非常方便。
    				if ((name == null || name.equals(field.getName())) &&
    						(type == null || type.equals(field.getType()))) {
    					return field; // 找到就返回了哈,不继续找了。
    				}
    			}
    			searchType = searchType.getSuperclass();
    		}
    		return null;
    
  • 其余方法代办。

StringValueResolver

Bean类实现EmbeddedValueResolverAware接口,则会自动注入StringValueResolver,用于解析带有占位符的String值,如${com.huawei.com.name} -> Perjoker。这样我们也支持解析从属性配置中 自动填充String占位符啦。

SpringCloud

负载均衡

最后

以上就是迷人宝马为你收集整理的ServiceComb场景及其原理Java-Chassis核心源码举一反三的全部内容,希望文章能够帮你解决ServiceComb场景及其原理Java-Chassis核心源码举一反三所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部