基于kubernetes release-1.9


// HPA Controller的启动代码
├── metrics
├── interfaces.go
├── legacy_metrics_client.go
├── legacy_metrics_client_test.go
├── rest_metrics_client.go
├── rest_metrics_client_test.go
└── utilization.go
├── doc.go
├── horizontal.go
// podautoscaler的核心代码,包括其创建和运行的代码
├── horizontal_test.go
├── legacy_horizontal_test.go
├── legacy_replica_calculator_test.go
├── rate_limitters.go
├── replica_calculator.go
// ReplicaCaculator的创建,以及根据cpu/metrics计算replicas的方法
└── replica_calculator_test.go

关于k8s hpa自定义指标的补充,具体详情可关注本博客的通过prometheus实现k8s hpa自定义指标系列文章。



// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc.
This allows for structured downstream composition and subdivision.
func NewControllerInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
return controllers


--horizontal-pod-autoscaler-downscale-delay duration
The period since last downscale, before another downscale can be performed in horizontal pod autoscaler. (default 5m0s)
--horizontal-pod-autoscaler-sync-period duration
The period for syncing the number of pods in horizontal pod autoscaler. (default 30s)
--horizontal-pod-autoscaler-tolerance float
The minimum change (from 1.0) in the desired-to-actual metrics ratio for the horizontal pod autoscaler to consider scaling. (default 0.1)
--horizontal-pod-autoscaler-upscale-delay duration
The period since last upscale, before another upscale can be performed in horizontal pod autoscaler. (default 3m0s)
If set to true, causes the horizontal pod autoscaler controller to use REST clients through the kube-aggregator, instead of us


const (
DefaultHeapsterNamespace = "kube-system"
= "http"
= "heapster"
= "" // use the first exposed port on the service
func startHPAController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
return false, nil
if ctx.Options.HorizontalPodAutoscalerUseRESTClients {
// use the new-style clients if support for custom metrics is enabled
return startHPAControllerWithRESTClient(ctx)
return startHPAControllerWithLegacyClient(ctx)
//This commit switches over the HPA controller to use the custom metrics
It also converts the HPA controller to use the generated client
//in k8s.io/metrics for the resource metrics API.
//In order to enable support, you must enable
//`--horizontal-pod-autoscaler-use-rest-clients` on the
//controller-manager, which will switch the HPA controller's MetricsClient
//implementation over to use the standard rest clients for both custom
//metrics and resource metrics.
This requires that at the least resource
//metrics API is registered with kube-aggregator, and that the controller
//manager is pointed at kube-aggregator.
For this to work, Heapster
//must be serving the new-style API server (`--api-server=true`).
func startHPAControllerWithRESTClient(ctx ControllerContext) (bool, error) {
clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
metricsClient := metrics.NewRESTMetricsClient(
return startHPAControllerWithMetricsClient(ctx, metricsClient)
func startHPAControllerWithLegacyClient(ctx ControllerContext) (bool, error) {
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
//metricsClient参数配置,固定了heapster的service在namespace的kube-system下,并且名称为heapster,端口为service ports中的第一个端口
metricsClient := metrics.NewHeapsterMetricsClient(
return startHPAControllerWithMetricsClient(ctx, metricsClient)


func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (bool, error) {
// 创建ReplicaCaculator,后面会用它来计算desired replicas
replicaCalc := podautoscaler.NewReplicaCalculator(
// horizontalPodAutoscalerTolerance is the tolerance for when
// resource usage suggests upscaling/downscaling
go podautoscaler.NewHorizontalController(
ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),//Informer用来监控HPA Resource的增删改事件
return true, nil


// Run begins watching and syncing.
func (a *HorizontalController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer a.queue.ShutDown()
glog.Infof("Starting HPA controller")
defer glog.Infof("Shutting down HPA controller")
if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced) {
// start a single worker (we may wish to start more in the future)
go wait.Until(a.worker, time.Second, stopCh)


func (a *HorizontalController) worker() {
for a.processNextWorkItem() {
glog.Infof("horizontal pod autoscaler controller worker shutting down")
func (a *HorizontalController) processNextWorkItem() bool {
key, quit := a.queue.Get()
if quit {
return false
defer a.queue.Done(key)
err := a.reconcileKey(key.(string))
if err == nil {
// don't "forget" here because we want to only process a given HPA once per resync interval
return true
return true


func (a *HorizontalController) reconcileKey(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if errors.IsNotFound(err) {
glog.Infof("Horizontal Pod Autoscaler has been deleted %v", key)
return nil
return a.reconcileAutoscaler(hpa)
func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler) error {
// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpav1 := hpav1Shared.DeepCopy()
// then, convert to autoscaling/v2, which makes our lives easier when calculating metrics
hpaRaw, err := unsafeConvertToVersionVia(hpav1, autoscalingv2.SchemeGroupVersion)
hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
scale, targetGR, err := a.scaleForResourceMappings(hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
// 得到当前副本数
currentReplicas := scale.Status.Replicas
var metricStatuses []autoscalingv2.MetricStatus
metricDesiredReplicas := int32(0)
metricName := ""
metricTimestamp := time.Time{}
desiredReplicas := int32(0)
rescaleReason := ""
timestamp := time.Now()
rescale := true
// 如果期望副本数为0,这不进行scale操作。
if scale.Spec.Replicas == 0 {
// Autoscaling is disabled for this resource
desiredReplicas = 0
rescale = false
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
} else if currentReplicas > hpa.Spec.MaxReplicas {// 期望副本数不能超过hpa中配置的最大副本数
rescaleReason = "Current number of replicas above Spec.MaxReplicas"
desiredReplicas = hpa.Spec.MaxReplicas
} else if hpa.Spec.MinReplicas != nil && currentReplicas < *hpa.Spec.MinReplicas {// 期望副本数不能低于配置的最小副本数
rescaleReason = "Current number of replicas below Spec.MinReplicas"
desiredReplicas = *hpa.Spec.MinReplicas
} else if currentReplicas == 0 {// 期望副本数最少为1
rescaleReason = "Current number of replicas must be greater than 0"
desiredReplicas = 1
} else {
// 如果当前副本数在Min和Max之间,则需要计算metry的副本数
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
if err != nil {
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
glog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, timestamp, reference)
rescaleMetric := ""
if metricDesiredReplicas > desiredReplicas {
desiredReplicas = metricDesiredReplicas
timestamp = metricTimestamp
rescaleMetric = metricName
if desiredReplicas > currentReplicas {
rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
if desiredReplicas < currentReplicas {
rescaleReason = "All metrics below target"
desiredReplicas = a.normalizeDesiredReplicas(hpa, currentReplicas, desiredReplicas)
rescale = a.shouldScale(hpa, currentReplicas, desiredReplicas, timestamp)
backoffDown := false
backoffUp := false
if hpa.Status.LastScaleTime != nil {
if !hpa.Status.LastScaleTime.Add(a.downscaleForbiddenWindow).Before(timestamp) {
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "BackoffDownscale", "the time since the previous scale is still within the downscale forbidden window")
backoffDown = true
if !hpa.Status.LastScaleTime.Add(a.upscaleForbiddenWindow).Before(timestamp) {
backoffUp = true
if backoffDown {
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "BackoffBoth", "the time since the previous scale is still within both the downscale and upscale forbidden windows")
} else {
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "BackoffUpscale", "the time since the previous scale is still within the upscale forbidden window")
if !backoffDown && !backoffUp {
// mark that we're not backing off
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "the last scale time was sufficiently old as to warrant a new scale")
if rescale {
scale.Spec.Replicas = desiredReplicas
_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(targetGR, scale)
if err != nil {
a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
return fmt.Errorf("failed to rescale %s: %v", reference, err)
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
glog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
} else {
glog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
desiredReplicas = currentReplicas
a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
return a.updateStatusIfNeeded(hpaStatusOriginal, hpa)


 1. 转换hpav1到autoscalingv2.HorizontalPodAutoscaler版本,便于后面的逻辑处理;
2. 获得group和kind,并根据它们通过RESTMappings获取mapping资源映射;
3. 获取scale,包含当前的副本数和定义副本数等信息;
4. 对比scale中定于副本及当前副本和hpa中的MaxReplicas和MinReplicas
4.1 如果scale定义副本数等于0,则此次不做scale操作;
4.2 如果scale定义副本数大于hpa定义的MaxReplicas,将期望副本数设定为MaxReplicas;
4.3 如果scale定义副本数小于hpa定义的MinReplicas,将期望副本数设定为MinReplicas;
4.4 如果scale当前副本数等于0,将期望副本数设定为15. 当不满足4时,通过computeReplicasForMetrics函数,该函数循环每个metric的配置,更新对应metric name和该metric的建议副本,改建议副本取所循环的metric的最大者,最后得到建议副本数metricDesiredReplicas;
6. 对比通过5得到的metricDesiredReplicas和4得到的desiredReplicas以及当前副本数,如果metricDesiredReplicas大于desiredReplicas,则建议副本数等于metricDesiredReplicas。最好再经过normalizeDesiredReplicas函数综合对比建议副本数和hpa的定义最大最小副本数比较,得到的结果必须是在MinReplicas和MaxReplicas之间;
7. 判断此次是否需要scale,通过shouldScale函数返回是否伸缩标志,bool类型
7.1 如果6得到的期望副本数等于当前副本数,则不需要scale;
7.2 如果hpa的最后一次伸缩时间为空,则需要scale;
7.3 如果6得到的期望副本数小于当前副本数,并且最后的缩容时间距离现在超过设定的downscaleForbiddenWindow时间数,则需要缩容;
7.4 如果6得到的期望副本数大于当前的副本数,并且最后的扩容时间距离现在超过设否定的upscaleForbiddenWindow时间数,则需要扩容;
7.5 否则不需要scale;
8. 通过步骤7获得的scale标志,设定scale的定义副本数为desiredReplicas,调用sclae的伸缩函数执行伸缩操作,同时更新hpa的status参数,完成整个伸缩的过程


// Computes the desired number of replicas for the metric specifications listed in the HPA, returning the maximum
// of the computed replica counts, a description of the associated metric, and the statuses of all metrics
// computed.
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
currentReplicas := scale.Status.Replicas
statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))
for i, metricSpec := range metricSpecs {
var replicaCountProposal int32
var utilizationProposal int64
var timestampProposal time.Time
var metricNameProposal string
switch metricSpec.Type {
case autoscalingv2.ObjectMetricSourceType:
replicaCountProposal, utilizationProposal, timestampProposal, err = a.replicaCalc.GetObjectMetricReplicas(currentReplicas, metricSpec.Object.TargetValue.MilliValue(), metricSpec.Object.MetricName, hpa.Namespace, &metricSpec.Object.Target)
case autoscalingv2.PodsMetricSourceType:
replicaCountProposal, utilizationProposal, timestampProposal, err = a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.TargetAverageValue.MilliValue(), metricSpec.Pods.MetricName, hpa.Namespace, selector)
case autoscalingv2.ResourceMetricSourceType:
if metricSpec.Resource.TargetAverageValue != nil {
var rawProposal int64
replicaCountProposal, rawProposal, timestampProposal, err = a.replicaCalc.GetRawResourceReplicas(currentReplicas, metricSpec.Resource.TargetAverageValue.MilliValue(), metricSpec.Resource.Name, hpa.Namespace, selector)
} else {
// set a default utilization percentage if none is set
if metricSpec.Resource.TargetAverageUtilization == nil {
errMsg := "invalid resource metric source: neither a utilization target nor a value target was set"
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetResourceMetric", errMsg)
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "FailedGetResourceMetric", "the HPA was unable to compute the replica count: %s", errMsg)
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
targetUtilization := *metricSpec.Resource.TargetAverageUtilization
var percentageProposal int32
var rawProposal int64
// 调用GetResourceMetric函数获取metrics数组,最后通过pod中container定义的requests,计算出最后的使用率
replicaCountProposal, percentageProposal, rawProposal, timestampProposal, err = a.replicaCalc.GetResourceReplicas(currentReplicas, targetUtilization, metricSpec.Resource.Name, hpa.Namespace, selector)
errMsg := fmt.Sprintf("unknown metric source type %q", string(metricSpec.Type))
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidMetricSourceType", errMsg)
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidMetricSourceType", "the HPA was unable to compute the replica count: %s", errMsg)
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
if replicas == 0 || replicaCountProposal > replicas {
timestamp = timestampProposal
replicas = replicaCountProposal
metric = metricNameProposal
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to succesfully calculate a replica count from %s", metric)
return replicas, metric, statuses, timestamp, nil


// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
// 通过metric中设定的resource name从hepster中获取监控数据,源码中支持的监控类型为"cpu","memory","storage","ephemeral-storage","alpha.kubernetes.io/nvidia-gpu",具体支持的类型还是得看cadvisor的采集数据和hepaster的采集,返回累加每个pod指定的resource name数据
GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error)
// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
GetRawMetric(metricName string, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error)
// GetObjectMetric gets the given metric (and an associated timestamp) for the given
// object in the given namespace
GetObjectMetric(metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference) (int64, time.Time, error)

k8s后面的hpa的功能越来越丰富,支持的伸缩类型越来越多,同时代码逻辑也较之前版本要复杂一点,单纯的通过读源码对一些参数含义还不是很清晰,最好是要通过实践,上述代码可以通过以下hpa yaml文件清晰的体现

apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
name: php-apache
namespace: default
apiVersion: apps/v1
kind: Deployment
name: php-apache
minReplicas: 1
maxReplicas: 10
- type: Resource
name: cpu
targetAverageUtilization: 50
- type: Pods
metricName: packets-per-second
targetAverageValue: 1k
- type: Object
metricName: requests-per-second
apiVersion: extensions/v1beta1
kind: Ingress
name: main-route
targetValue: 10k




  • horizontal-pod-autoscaler-downscale-delay:此选项的值是一个持续时间,指定控制器在两次缩容之间的时间间隔。默认值是5分钟(5m0s)。
  • horizontal-pod-autoscaler-upscale-delay:此选项的值是一个持续时间,指定控制器在两次扩容之间的时间间隔。默认值是3分钟(3m0s)。


