我是靠谱客的博主 暴躁发卡,最近开发中收集的这篇文章主要介绍YARN : FairScheduler深入解析(队列维护,demand、fair share计算),觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
一、概要
首先,YARN FairScheduler主要做的事情:
① 处理NM心跳NodeUpdate,分配container。
② 树状维护队列和任务,定时计算fair share等信息,并进行排序。
本文重点分析②
二、代码
1、初始化FairScheduler
在RM启动时会初始化FairScheduler,
private void initScheduler(Configuration conf) throws IOException {
synchronized (this) {
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
initMaximumResourceCapability(this.conf.getMaximumAllocation());
incrAllocation = this.conf.getIncrementAllocation();
// 持续调度,默认false,一般用于时效性高的实时任务
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
balanceSchedulingEnabled = this.conf.isBalanceSchedulingEnabled();
...
preemptionUtilizationThreshold =
this.conf.getPreemptionUtilizationThreshold();
// 一次性分配多个container,加大吞吐
assignMultiple = this.conf.getAssignMultiple();
maxAssignDynamic = this.conf.isMaxAssignDynamic();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
preemptionInterval = this.conf.getPreemptionInterval();
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
reservableNodesRatio = this.conf.getReservableNodes();
if (this.conf.isCpuSchedulingEnabled()) {
RESOURCE_CALCULATOR = new CpuResourceCalculator();
} else {
RESOURCE_CALCULATOR = new DefaultResourceCalculator();
}
// 重新计算fair share的频率
updateInterval = this.conf.getUpdateInterval();
if (updateInterval < 0) {
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
+ " is invalid, so using default value " +
+FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ " ms instead");
}
// 一些性能指标打点采集
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
fsOpDurations = FSOpDurations.getInstance(true);
// This stores per-application scheduling information
this.applications = new ConcurrentHashMap<
ApplicationId, SchedulerApplication<FSAppAttempt>>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
allocConf = new AllocationConfiguration(conf);
rmNodeLabelsManager = rmContext.getNodeLabelManager();
try {
// QueueManager管理队列及挂在队列下的application
queueMgr.initialize(conf);
} catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
// 启动定时计算demand和fair share线程
updateThread = new UpdateThread();
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
// 启动持续调度线程
if (continuousSchedulingEnabled) {
// start continuous scheduling thread
schedulingThread = new ContinuousSchedulingThread();
schedulingThread.setName("FairSchedulerContinuousScheduling");
schedulingThread.setDaemon(true);
}
}
// 初始化AllocationFileLoaderService
allocsLoader.init(conf);
// If we fail to load allocations file on initialize, we want to fail
// immediately.
After a successful load, exceptions on future reloads
// will just result in leaving things as they are.
try {
allocsLoader.reloadAllocations();
// 获取 NM label
rmNodeLabelsManager.reinitializeQueueLabels(getQueueToLabels());
} catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
}
}
2、updateDemand
更新每个队列、application资源需求
public void updateDemand() {
// Compute demand by iterating through apps in the queue
// Limit demand to maxResources
Resource maxRes = scheduler.getAllocationConfiguration()
.getMaxResources(getName());
demand = Resources.createResource(0);
readLock.lock();
try {
for (FSAppAttempt sched : runnableApps) {
// demand达上限,break
if (Resources.equals(demand, maxRes)) {
break;
}
// 内部逻辑是把当前已经占用的资源加上额外请求的资源总和
// 遍历每个额外请求,对所请求的资源求和,如果加起来大于
// 最大资源限制,则将demand设为mapRes
updateDemandForApp(sched, maxRes);
}
for (FSAppAttempt sched : nonRunnableApps) {
if (Resources.equals(demand, maxRes)) {
break;
}
updateDemandForApp(sched, maxRes);
}
} finally {
readLock.unlock();
}
// sort it in advance.
Comparator<Schedulable> comparator = policy.getComparator();
writeLock.lock();
try {
// 对队列 application进行排序
Collections.sort(runnableApps, comparator);
} finally {
writeLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxRes);
}
}
队列排序规则,FairShareComparator:
public int compare(Schedulable s1, Schedulable s2) {
Priority priority1 = s1.getPriority();
Priority priority2 = s2.getPriority();
// 先比较优先级,优先级低排后
if (!priority1.equals(priority2)) {
return priority1.compareTo(priority2);
}
Resource demand1 = s1.getDemand();
Resource demand2 = s2.getDemand();
// 不需要资源的,排后
if (demand1.equals(Resources.none()) &&
!demand2.equals(Resources.none())) {
return 1;
} else if (demand2.equals(Resources.none()) &&
!demand1.equals(Resources.none())) {
return -1;
}
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
double weight1, weight2;
//Do not repeat the getResourceUsage calculation
Resource resourceUsage1 = s1.getResourceUsageFaster();
Resource resourceUsage2 = s2.getResourceUsageFaster();
// 取min share和资源需求中的最小值
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), demand1);
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), demand2);
// 根据当前使用资源和minShare比较,如果小于则需要资源
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
resourceUsage1, minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
resourceUsage2, minShare2);
// 内存使用占比
minShareRatio1 = (double) resourceUsage1.getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) resourceUsage2.getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
// 比较队列权重
weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
if (weight1 > 0.0 && weight2 > 0.0) {
// 根据队列权重计算比例
useToWeightRatio1 = resourceUsage1.getMemory() / weight1;
useToWeightRatio2 = resourceUsage2.getMemory() / weight2;
} else { // Either weight1 or weight2 equals to 0
if (weight1 == weight2) {
// 权重相等则直接比较使用的内存
useToWeightRatio1 = resourceUsage1.getMemory();
useToWeightRatio2 = resourceUsage2.getMemory();
} else {
// 权重绝对值越接近0,排后
useToWeightRatio1 = -weight1;
useToWeightRatio2 = -weight2;
}
}
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
// 比谁share大,用的多 排后
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// 比谁使用权重占比大,大的排后
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
// 仍然相同则比开始时间,后开始的 排后
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
// 还相同,比name字符串,字典序靠前的先跑
res = s1.getName().compareTo(s2.getName());
}
return res;
}
3、recomputeShares
计算Instantaneous Fair Share和Steady Fair Share逻辑
ComputeFairShares.computeSharesInternal:
private static void computeSharesInternal(
Collection<? extends Schedulable> allSchedulables,
Resource totalResources, ResourceType type, boolean isSteadyShare) {
// 前提:type均为memory,vcore在此不考虑,DominantResourceFairnessPolicy会同时考虑memory和vcore做排序。
// fair-scheduler.xml中配置的所有队列minShare之和必小于集群NM资源总和
// 过滤出需要参与计算fair share的队列
// isSteadyShare=true,过滤掉weight和maxShare不符合规定的队列
// isSteadyShare=false,过滤掉weight和maxShare不符合规定的队列、没有running application的队列
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
int takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);
if (schedulables.isEmpty()) {
return;
}
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables' max shares.
// 获取所有队列的MaxShare总和
int totalMaxShare = 0;
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
break;
}
}
// 剩余参与fair share分配的总资源
int totalResource = Math.max((getResourceValue(totalResources, type) -
takenResources), 0);
totalResource = Math.min(totalMaxShare, totalResource);
// rMax为第一个能够满足所有队列资源均在min和max之间,且大于集群总资源,从1开始每次扩大一倍
double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
< totalResource) {
rMax *= 2.0;
}
// 二分rMax,迭代25次或算出来的资源刚好等于totalResource
// 目的是得出来的总资源标准尽量接近真实总资源
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
mid, schedulables, type);
if (plannedResourceUsed == totalResource) {
right = mid;
break;
} else if (plannedResourceUsed < totalResource) {
left = mid;
} else {
right = mid;
}
}
// 给每个队列设置fair share 或 steady fair share
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
setResourceValue(computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else {
setResourceValue(
computeShare(sched, right, type), sched.getFairShare(), type);
}
}
}
handleFixedFairShares:
private static int handleFixedFairShares(
Collection<? extends Schedulable> schedulables,
Collection<Schedulable> nonFixedSchedulables,
boolean isSteadyShare, ResourceType type) {
// 所有队列资源总和
int totalResource = 0;
for (Schedulable sched : schedulables) {
// 若maxShare或者weight配置为0,这个队列在任何时候都不会运行任何app,即固定队列,并且分配给他的fair share或instaneous fair share都为0
// 若计算instaneous fair share,且队列没有app运行,那么,这个队列的instaneous fair share是0,并且这个队列被判定为fix sheduler,所以这个队列不再参与instaneous fair share的计算
// 若计算的steady fair share,steady fair share值只和该队列的min max配置有关,和是否有app正在运行无关
int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
if (fixedShare < 0) {
nonFixedSchedulables.add(sched);
} else {
// 若isSteadyShare=true,即steady fairshares,则将其steady fair share设置为fixedShare
// 若isSteadyShare=false,即instaneous fair share,则将instaneous fair share设置为fixedShare
setResourceValue(fixedShare,
isSteadyShare
? ((FSQueue)sched).getSteadyFairShare()
: sched.getFairShare(),
type);
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
Integer.MAX_VALUE);
}
}
return totalResource;
}
resourceUsedWithWeightToResourceRatio:
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
Collection<? extends Schedulable> schedulables, ResourceType type) {
long resourcesTaken = 0;
// 对每个队列计算share,再求和
for (Schedulable sched : schedulables) {
int share = computeShare(sched, w2rRatio, type);
resourcesTaken += share;
}
return (int)Math.min(resourcesTaken, Integer.MAX_VALUE);
}
computeShare:
private static int computeShare(Schedulable sched, double w2rRatio,
ResourceType type) {
// 根据总share*权重计算出该队列share值,并控制在minShare和maxShare之间
double share = sched.getWeights().getWeight(type) * w2rRatio;
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
return (int) share;
}
三、SLS性能测试
根据jmx采集的指标,加上SLS可以针对RM进行性能评估。
具体暂时先不介绍了,以后空了再写。
官方文档 YARN Scheduler Load Simulator
四、Metrics采集
FSOpDuration
{
"beans" : [ {
"name" : "Hadoop:service=ResourceManager,name=FSOpDurations",
"modelerType" : "FSOpDurations",
"tag.FSOpDurations" : "FSOpDurations",
"tag.Context" : "fairscheduler-op-durations",
"tag.Hostname" : "host",
"ContinuousSchedulingRunNumOps" : 0,
"ContinuousSchedulingRunAvgTime" : 0.0,
"ContinuousSchedulingRunStdevTime" : 0.0,
"ContinuousSchedulingRunIMinTime" : 3.4028234663852886E38,
"ContinuousSchedulingRunIMaxTime" : 1.401298464324817E-45,
"ContinuousSchedulingRunMinTime" : 3.4028234663852886E38,
"ContinuousSchedulingRunMaxTime" : 1.401298464324817E-45,
// 处理NodeUpdate总次数
"NodeUpdateCallNumOps" : 19230,
// 一次采样周期内NodeUpdate平均耗时ms
"NodeUpdateCallAvgTime" : 14.101851851851853,
"NodeUpdateCallStdevTime" : 7.589392615725218,
"NodeUpdateCallIMinTime" : 10.0,
"NodeUpdateCallIMaxTime" : 113.0,
"NodeUpdateCallMinTime" : 3.0,
"NodeUpdateCallMaxTime" : 1403.0,
// fair share计算总次数
"UpdateThreadRunNumOps" : 115,
// 一次采样周期内计算fair share平均耗时ms
"UpdateThreadRunAvgTime" : 2350.0,
"UpdateThreadRunStdevTime" : 110.30865786510141,
"UpdateThreadRunIMinTime" : 2272.0,
"UpdateThreadRunIMaxTime" : 2428.0,
"UpdateThreadRunMinTime" : 4.0,
"UpdateThreadRunMaxTime" : 6177.0,
// 和上面差不多,这些指标统计更内层的方法,不包括ContinuousScheduling
"UpdateCallNumOps" : 115,
"UpdateCallAvgTime" : 2350.0,
"UpdateCallStdevTime" : 110.30865786510141,
"UpdateCallIMinTime" : 2272.0,
"UpdateCallIMaxTime" : 2428.0,
"UpdateCallMinTime" : 4.0,
"UpdateCallMaxTime" : 6169.0,
"PreemptCallNumOps" : 0,
"PreemptCallAvgTime" : 0.0,
"PreemptCallStdevTime" : 0.0,
"PreemptCallIMinTime" : 3.4028234663852886E38,
"PreemptCallIMaxTime" : 1.401298464324817E-45,
"PreemptCallMinTime" : 3.4028234663852886E38,
"PreemptCallMaxTime" : 1.401298464324817E-45,
// 处理AssignContainer请求
"AssignContainerCallNumOps" : 2860902,
"AssignContainerCallAvgTime" : 31.927672432911855,
"AssignContainerCallStdevTime" : 575.0777254854487,
"AssignContainerCallIMinTime" : 16.0,
"AssignContainerCallIMaxTime" : 100921.0,
"AssignContainerCallMinTime" : 15.0,
"AssignContainerCallMaxTime" : 1393653.0,
// 处理CompletedContainer请求
"CompletedContainerCallNumOps" : 1293897,
"CompletedContainerCallAvgTime" : 26.40577603228669,
"CompletedContainerCallStdevTime" : 22.917249699220484,
"CompletedContainerCallIMinTime" : 14.0,
"CompletedContainerCallIMaxTime" : 2026.0,
"CompletedContainerCallMinTime" : 13.0,
"CompletedContainerCallMaxTime" : 991449.0
} ]
}
最后
以上就是暴躁发卡为你收集整理的YARN : FairScheduler深入解析(队列维护,demand、fair share计算)的全部内容,希望文章能够帮你解决YARN : FairScheduler深入解析(队列维护,demand、fair share计算)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复