概述
1. 在Hystrix中,Command就相当于接口服务,每个Command都可以设置自己的数据配置,继承HystrixCommand实现他的构造函数,最少需要设置HystrixCommandGroupKey,因为后续要根据这个来区别每个Command,服务总该有个执行方法也就是实现父类的run(),当满足熔断要求的时候需要实现getFallback(),当需要程序缓存的时候需要实现getCacheKey()。
2. HystrixCommandGroupKey.Factory.asKey("User"),创建服务独有的key并且保存到HystrixCommandGroupKey的InternMap中,其他类似的还有HystrixCommandKey和HystrixThreadPoolKey生成。
3. 初始化AbstractCommand构造函数
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
初始化HystrixCommandKey,默认是类的名字为对应的key,即class.getSimpleName()
4. 初始化通用的配置HystrixCommandProperties,有一套默认的配置,每个服务组都可以有自己独特的配置,HystrixPropertiesFactory.getCommandProperties(commandKey, commandPropertiesDefaults);
HystrixPropertiesStrategy hystrixPropertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
先初始化HystrixPlugins,类加载器是加载当前类的加载器。通过回调把日志属性传递到方法中。
private HystrixPlugins(ClassLoader classLoader, LoggerSupplier logSupplier) {
//This will load Archaius if its in the classpath.
this.classLoader = classLoader;
//N.B. Do not use a logger before this is loaded as it will most likely load the configuration system.
//The configuration system may need to do something prior to loading logging. @agentgt
dynamicProperties = resolveDynamicProperties(classLoader, logSupplier);
}
先从系统配置中查找对应的配置类System.getProperty(name, fallback)
HystrixDynamicProperties hp = getPluginImplementationViaProperties(HystrixDynamicProperties.class,
HystrixDynamicPropertiesSystemProperties.getInstance());
没有的话从类加载器加载的类中查找HystrixDynamicProperties类型的类,
没有的话从Archaius中加载对应的类
最后没有的话就会加载默认的配置类HystrixDynamicPropertiesSystemProperties,设置动态配置类HystrixDynamicProperties
private static HystrixDynamicProperties resolveDynamicProperties(ClassLoader classLoader, LoggerSupplier logSupplier) {
HystrixDynamicProperties hp = getPluginImplementationViaProperties(HystrixDynamicProperties.class,
HystrixDynamicPropertiesSystemProperties.getInstance());
if (hp != null) {
logSupplier.getLogger().debug(
"Created HystrixDynamicProperties instance from System property named "
+ ""hystrix.plugin.HystrixDynamicProperties.implementation". Using class: {}",
hp.getClass().getCanonicalName());
return hp;
}
hp = findService(HystrixDynamicProperties.class, classLoader);
if (hp != null) {
logSupplier.getLogger()
.debug("Created HystrixDynamicProperties instance by loading from ServiceLoader. Using class: {}",
hp.getClass().getCanonicalName());
return hp;
}
hp = HystrixArchaiusHelper.createArchaiusDynamicProperties();
if (hp != null) {
logSupplier.getLogger().debug("Created HystrixDynamicProperties. Using class : {}",
hp.getClass().getCanonicalName());
return hp;
}
hp = HystrixDynamicPropertiesSystemProperties.getInstance();
logSupplier.getLogger().info("Using System Properties for HystrixDynamicProperties! Using class: {}",
hp.getClass().getCanonicalName());
return hp;
}
private static <T> T findService(
Class<T> spi,
ClassLoader classLoader) throws ServiceConfigurationError {
ServiceLoader<T> sl = ServiceLoader.load(spi,
classLoader);
for (T s : sl) {
if (s != null)
return s;
}
return null;
}
加载属性策略类,依次从上面的动态配置中获取,从类加载器的类中获取,没有则设置默认HystrixPropertiesStrategyDefault
从属性策略中获取通用配置类,先取缓存key,这里的key就是我们最初设置的服务组名,没有则生成自己特有的通用配置。
// create new instance
properties = hystrixPropertiesStrategy.getCommandProperties(key, builder);
// cache and return
HystrixCommandProperties existing = commandProperties.putIfAbsent(cacheKey, properties);
通用配置默认为HystrixPropertiesCommandDefault,HystrixCommandProperties.Setter可以配置自己需要独特的属性,所有的通用配置如下,基本上都会有全局默认值。配置key前缀为"hystrix"
protected HystrixCommandProperties(HystrixCommandKey key, HystrixCommandProperties.Setter builder, String propertyPrefix) {
this.key = key;
this.circuitBreakerEnabled = getProperty(propertyPrefix, key, "circuitBreaker.enabled", builder.getCircuitBreakerEnabled(), default_circuitBreakerEnabled);
this.circuitBreakerRequestVolumeThreshold = getProperty(propertyPrefix, key, "circuitBreaker.requestVolumeThreshold", builder.getCircuitBreakerRequestVolumeThreshold(), default_circuitBreakerRequestVolumeThreshold);
this.circuitBreakerSleepWindowInMilliseconds = getProperty(propertyPrefix, key, "circuitBreaker.sleepWindowInMilliseconds", builder.getCircuitBreakerSleepWindowInMilliseconds(), default_circuitBreakerSleepWindowInMilliseconds);
this.circuitBreakerErrorThresholdPercentage = getProperty(propertyPrefix, key, "circuitBreaker.errorThresholdPercentage", builder.getCircuitBreakerErrorThresholdPercentage(), default_circuitBreakerErrorThresholdPercentage);
this.circuitBreakerForceOpen = getProperty(propertyPrefix, key, "circuitBreaker.forceOpen", builder.getCircuitBreakerForceOpen(), default_circuitBreakerForceOpen);
this.circuitBreakerForceClosed = getProperty(propertyPrefix, key, "circuitBreaker.forceClosed", builder.getCircuitBreakerForceClosed(), default_circuitBreakerForceClosed);
this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);
//this property name is now misleading.
//TODO figure out a good way to deprecate this property name
this.executionTimeoutInMilliseconds = getProperty(propertyPrefix, key, "execution.isolation.thread.timeoutInMilliseconds", builder.getExecutionIsolationThreadTimeoutInMilliseconds(), default_executionTimeoutInMilliseconds);
this.executionTimeoutEnabled = getProperty(propertyPrefix, key, "execution.timeout.enabled", builder.getExecutionTimeoutEnabled(), default_executionTimeoutEnabled);
this.executionIsolationThreadInterruptOnTimeout = getProperty(propertyPrefix, key, "execution.isolation.thread.interruptOnTimeout", builder.getExecutionIsolationThreadInterruptOnTimeout(), default_executionIsolationThreadInterruptOnTimeout);
this.executionIsolationThreadInterruptOnFutureCancel = getProperty(propertyPrefix, key, "execution.isolation.thread.interruptOnFutureCancel", builder.getExecutionIsolationThreadInterruptOnFutureCancel(), default_executionIsolationThreadInterruptOnFutureCancel);
this.executionIsolationSemaphoreMaxConcurrentRequests = getProperty(propertyPrefix, key, "execution.isolation.semaphore.maxConcurrentRequests", builder.getExecutionIsolationSemaphoreMaxConcurrentRequests(), default_executionIsolationSemaphoreMaxConcurrentRequests);
this.fallbackIsolationSemaphoreMaxConcurrentRequests = getProperty(propertyPrefix, key, "fallback.isolation.semaphore.maxConcurrentRequests", builder.getFallbackIsolationSemaphoreMaxConcurrentRequests(), default_fallbackIsolationSemaphoreMaxConcurrentRequests);
this.fallbackEnabled = getProperty(propertyPrefix, key, "fallback.enabled", builder.getFallbackEnabled(), default_fallbackEnabled);
this.metricsRollingStatisticalWindowInMilliseconds = getProperty(propertyPrefix, key, "metrics.rollingStats.timeInMilliseconds", builder.getMetricsRollingStatisticalWindowInMilliseconds(), default_metricsRollingStatisticalWindow);
this.metricsRollingStatisticalWindowBuckets = getProperty(propertyPrefix, key, "metrics.rollingStats.numBuckets", builder.getMetricsRollingStatisticalWindowBuckets(), default_metricsRollingStatisticalWindowBuckets);
this.metricsRollingPercentileEnabled = getProperty(propertyPrefix, key, "metrics.rollingPercentile.enabled", builder.getMetricsRollingPercentileEnabled(), default_metricsRollingPercentileEnabled);
this.metricsRollingPercentileWindowInMilliseconds = getProperty(propertyPrefix, key, "metrics.rollingPercentile.timeInMilliseconds", builder.getMetricsRollingPercentileWindowInMilliseconds(), default_metricsRollingPercentileWindow);
this.metricsRollingPercentileWindowBuckets = getProperty(propertyPrefix, key, "metrics.rollingPercentile.numBuckets", builder.getMetricsRollingPercentileWindowBuckets(), default_metricsRollingPercentileWindowBuckets);
this.metricsRollingPercentileBucketSize = getProperty(propertyPrefix, key, "metrics.rollingPercentile.bucketSize", builder.getMetricsRollingPercentileBucketSize(), default_metricsRollingPercentileBucketSize);
this.metricsHealthSnapshotIntervalInMilliseconds = getProperty(propertyPrefix, key, "metrics.healthSnapshot.intervalInMilliseconds", builder.getMetricsHealthSnapshotIntervalInMilliseconds(), default_metricsHealthSnapshotIntervalInMilliseconds);
this.requestCacheEnabled = getProperty(propertyPrefix, key, "requestCache.enabled", builder.getRequestCacheEnabled(), default_requestCacheEnabled);
this.requestLogEnabled = getProperty(propertyPrefix, key, "requestLog.enabled", builder.getRequestLogEnabled(), default_requestLogEnabled);
// threadpool doesn't have a global override, only instance level makes sense
this.executionIsolationThreadPoolKeyOverride = forString().add(propertyPrefix + ".command." + key.name() + ".threadPoolKeyOverride", null).build();
}
初始化HystrixThreadPoolKey,key值可以设置command命令通用的或者是服务组名。HystrixThreadPoolKey.Factory.asKey(groupKey.name())
5. 初始化HystrixCommandMetrics,初始化一些统计流类,限流等相关的类,
HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
super(null);
this.key = key;
this.group = commandGroup;
this.threadPoolKey = threadPoolKey;
this.properties = properties;
healthCountsStream = HealthCountsStream.getInstance(key, properties);
rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);
rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
}
初始化熔断器HystrixCircuitBreakerImpl,设置对应的规则,订阅结果处理。比如设置熔断开关等等
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur Subscription s = subscribeToStream();
6. 初始化线程池,获取默认配置策略HystrixPropertiesStrategy hystrixPropertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();获取线程池配置HystrixPropertiesThreadPoolDefault,获取并发策略HystrixConcurrencyStrategyDefault,生成对应的线程池ThreadPoolExecutor。
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
初始化HystrixThreadPoolMetrics,初始化关于线程的一些统计流和限流数据,
private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
super(null);
this.threadPoolKey = threadPoolKey;
this.threadPool = threadPool;
this.properties = properties;
rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties);
}
6. 初始化HystrixEventNotifier时间通知,HystrixConcurrencyStrategy并发策略,HystrixMetricsPublisherCommand发布命令,HystrixCommandExecutionHook命令执行钩子,HystrixRequestCache请求缓存类,初始化HystrixRequestLog时采用了懒初始化,返回return new HystrixLifecycleForwardingRequestVariable<T>(rv);
public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
/*
* 1) Fetch RequestVariable implementation from cache.
* 2) If no implementation is found in cache then construct from factory.
* 3) Cache implementation from factory as each object instance needs to be statically cached to be relevant across threads.
*/
RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
if (rvInstance == null) {
requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
/*
* A safety check to help debug problems if someone starts injecting dynamically created HystrixConcurrencyStrategy instances - which should not be done and has no good reason to be done.
*
* The 100 value is arbitrary ... just a number far higher than we should see.
*/
if (requestVariableInstance.size() > 100) {
logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
}
}
return (T) requestVariableInstance.get(key).get();
}
当get()时,先判断请求上下文是否已经初始化,我们在最开始使用时已经调用了HystrixRequestContext context = HystrixRequestContext.initializeContext();获取上下文HystrixRequestContext.getContextForCurrentThread().state对应的LazyInitializer,然后执行HystrixRequestVariableDefault#initialValue()返回return new HystrixRequestLog();最后初始化信号量重写机制以控制请求。
最后
以上就是轻松皮皮虾为你收集整理的Hystrix的Command初始化的全部内容,希望文章能够帮你解决Hystrix的Command初始化所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复