我是靠谱客的博主 彩色星月,这篇文章主要介绍【kubernetes/k8s源码分析】kube-controller-manager之attach detach controller源码分析1. NewAttachDetachController2. Run 函数3. reconcile 函数,现在分享给大家,希望可以做个参考。

AttachDetach Controller is responsible for attaching and detaching volumes

当 AttachDetachController 需要进行“Attach”操作时(“Attach 阶段”),它实际上会执行到 pkg/volume/csi 目录中,创建一个 VolumeAttachment 对象,从而触发 External Attacher 调用 CSI Controller 服务的 ControllerPublishVolume 方法。

 

NewAttachDetachController

        --> operationexecutor.NewOperationExecutor

        --> reconciler.NewReconciler

        --> populator.NewDesiredStateOfWorldPopulator

 

AttachDetachController Run

           -->  adc.populateActualStateOfWorld()

                   --> adc.actualStateOfWorld.MarkVolumeAsAttached

                              --> asw.AddVolumeNode

                                     --> addVolumeToReportAsAttached

                   --> dc.processVolumesInUse

                   --> adc.addNodeToDswp

           --> adc.populateDesiredStateOfWorld()

                   --> podAdd

                              --> ProcessPodVolumes 

                                          --> desiredStateOfWorld.AddPod

                                          --> desiredStateOfWorld.DeletePod

           --> adc.reconciler.Run(stopCh)

                        --> reconcile()

                                --> rc.attachDesiredVolumes

           --> adc.desiredStateOfWorldPopulator.Run(stopCh)

                      --> findAndRemoveDeletedPods

                      --> findAndAddActivePods

           --> adc.pvcWorker

    

    

1. NewAttachDetachController

    实例化 attachDetachController 实现了 AttachDetachController 接口,关注的资源有 pod  node  pvc  pv

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// NewAttachDetachController returns a new instance of AttachDetachController. func NewAttachDetachController( kubeClient clientset.Interface, csiClient csiclient.Interface, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin, prober volume.DynamicPluginProber, disableReconciliationSync bool, reconcilerSyncDuration time.Duration, timerConfig TimerConfig) (AttachDetachController, error) {

    1.1 创建广播事件器,赋值期望状态与实际状态

复制代码
1
2
3
4
5
6
7
8
eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"}) blkutil := volumepathhandler.NewBlockVolumePathHandler() adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)

    1.2 实例化 OperationExecutor,主要是实现了 AttachVolume DetachVolume 等方法

复制代码
1
2
3
4
5
6
7
adc.attacherDetacher = operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( kubeClient, &adc.volumePluginMgr, recorder, false, // flag for experimental binary check for volume mount blkutil))

    1.3 实例化 reconciler,实现了 Reconciler接口

复制代码
1
2
3
4
5
6
7
8
9
10
11
// Default these to values in options adc.reconciler = reconciler.NewReconciler( timerConfig.ReconcilerLoopPeriod, timerConfig.ReconcilerMaxWaitForUnmountDuration, reconcilerSyncDuration, disableReconciliationSync, adc.desiredStateOfWorld, adc.actualStateOfWorld, adc.attacherDetacher, adc.nodeStatusUpdater, recorder)

    1.4 实例化 desiredStateOfWorldPopulator,实现了 DesiredStateOfWorldPopulator 接口

复制代码
1
2
3
4
5
6
7
8
adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator( timerConfig.DesiredStateOfWorldPopulatorLoopSleepPeriod, timerConfig.DesiredStateOfWorldPopulatorListPodsRetryDuration, podInformer.Lister(), adc.desiredStateOfWorld, &adc.volumePluginMgr, pvcInformer.Lister(), pvInformer.Lister())

    1.5 添加 pod informer 机制,设置回调函数

复制代码
1
2
3
4
5
podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: adc.podAdd, UpdateFunc: adc.podUpdate, DeleteFunc: adc.podDelete, })

    1.6 添加 node informer 机制,设置回调函数

复制代码
1
2
3
4
5
nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: adc.nodeAdd, UpdateFunc: adc.nodeUpdate, DeleteFunc: adc.nodeDelete, })

    1.7 PVC informer 机制

复制代码
1
2
3
4
5
6
7
8
pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { adc.enqueuePVC(obj) }, UpdateFunc: func(old, new interface{}) { adc.enqueuePVC(new) }, })

 

2. Run 函数

    路径 pkg/controller/volume/attachdetach/attach_detach_controller.go

复制代码
1
2
3
4
5
6
7
8
9
10
func (adc *attachDetachController) Run(stopCh <-chan struct{}) { defer runtime.HandleCrash() defer adc.pvcQueue.ShutDown() klog.Infof("Starting attach detach controller") defer klog.Infof("Shutting down attach detach controller") if !controller.WaitForCacheSync("attach detach", stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) { return }

    2.1 populateActualStateOfWorld 函数 (对 node)

        将 nodevolume 当前的对应状态存入 actualStateOfWorld,获取所有node,并根据 node.Status.VolumesAttached 将 volume 标记为已被挂载

  volumesAttached:
  - devicePath: ""
    name: kubernetes.io/rbd/replicapool:kubernetes-dynamic-pvc-69da12fe-85da-11e9-958a-0800271c9f15
  - devicePath: ""
    name: kubernetes.io/rbd/replicapool:kubernetes-dynamic-pvc-7ca75ce6-85ca-11e9-958a-0800271c9f15
  volumesInUse:
  - kubernetes.io/rbd/replicapool:kubernetes-dynamic-pvc-69da12fe-85da-11e9-958a-0800271c9f15
  - kubernetes.io/rbd/replicapool:kubernetes-dynamic-pvc-7ca75ce6-85ca-11e9-958a-0800271c9f15

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (adc *attachDetachController) populateActualStateOfWorld() error { klog.V(5).Infof("Populating ActualStateOfworld") nodes, err := adc.nodeLister.List(labels.Everything()) if err != nil { return err } for _, node := range nodes { nodeName := types.NodeName(node.Name) for _, attachedVolume := range node.Status.VolumesAttached { uniqueName := attachedVolume.Name // The nil VolumeSpec is safe only in the case the volume is not in use by any pod. // In such a case it should be detached in the first reconciliation cycle and the // volume spec is not needed to detach a volume. If the volume is used by a pod, it // its spec can be: this would happen during in the populateDesiredStateOfWorld which // scans the pods and updates their volumes in the ActualStateOfWorld too. err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath) if err != nil { klog.Errorf("Failed to mark the volume as attached: %v", err) continue } adc.processVolumesInUse(nodeName, node.Status.VolumesInUse) adc.addNodeToDswp(node, types.NodeName(node.Name)) } } return nil }

    2.1.1 MarkVolumeAsAttached

       2.1.1.1 调用 FindAttachPluginBySpec,只要是否实现了 attach 接口

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (asw *actualStateOfWorld) AddVolumeNode( uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) { asw.Lock() defer asw.Unlock() var volumeName v1.UniqueVolumeName if volumeSpec != nil { attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) if err != nil || attachableVolumePlugin == nil { return "", fmt.Errorf( "failed to get AttachablePlugin from volumeSpec for volume %q err=%v", volumeSpec.Name(), err) }

      2.1.1.2 存入 attachedVolume map中

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// The volume object represents a volume the attach/detach controller // believes to be successfully attached to a node it is managing. type attachedVolume struct { // volumeName contains the unique identifier for this volume. volumeName v1.UniqueVolumeName // spec is the volume spec containing the specification for this volume. // Used to generate the volume plugin object, and passed to attach/detach // methods. spec *volume.Spec // nodesAttachedTo is a map containing the set of nodes this volume has // successfully been attached to. The key in this map is the name of the // node and the value is a node object containing more information about // the node. nodesAttachedTo map[types.NodeName]nodeAttachedTo // devicePath contains the path on the node where the volume is attached devicePath string }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
volumeObj, volumeExists := asw.attachedVolumes[volumeName] if !volumeExists { volumeObj = attachedVolume{ volumeName: volumeName, spec: volumeSpec, nodesAttachedTo: make(map[types.NodeName]nodeAttachedTo), devicePath: devicePath, } } else { // If volume object already exists, it indicates that the information would be out of date. // Update the fields for volume object except the nodes attached to the volumes. volumeObj.devicePath = devicePath volumeObj.spec = volumeSpec klog.V(2).Infof("Volume %q is already added to attachedVolume list to node %q, update device path %q", volumeName, nodeName, devicePath) } asw.attachedVolumes[volumeName] = volumeObj

    2.1.1.3 对于已经attach的volume 存入node信息

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
_, nodeExists := volumeObj.nodesAttachedTo[nodeName] if !nodeExists { // Create object if it doesn't exist. volumeObj.nodesAttachedTo[nodeName] = nodeAttachedTo{ nodeName: nodeName, mountedByNode: true, // Assume mounted, until proven otherwise mountedByNodeSetCount: 0, detachRequestedTime: time.Time{}, } } else { klog.V(5).Infof("Volume %q is already added to attachedVolume list to the node %q", volumeName, nodeName) }

    2.1.1.4  存入 nodesToUpdateStatusFor map[types.NodeName]nodeToUpdateStatusFor

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Add the volumeName to the node's volumesToReportAsAttached list // This is an internal function and caller should acquire and release the lock func (asw *actualStateOfWorld) addVolumeToReportAsAttached( volumeName v1.UniqueVolumeName, nodeName types.NodeName) { // In case the volume/node entry is no longer in attachedVolume list, skip the rest if _, _, err := asw.getNodeAndVolume(volumeName, nodeName); err != nil { klog.V(4).Infof("Volume %q is no longer attached to node %q", volumeName, nodeName) return } nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName] if !nodeToUpdateExists { // Create object if it doesn't exist nodeToUpdate = nodeToUpdateStatusFor{ nodeName: nodeName, statusUpdateNeeded: true, volumesToReportAsAttached: make(map[v1.UniqueVolumeName]v1.UniqueVolumeName), } asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate klog.V(4).Infof("Add new node %q to nodesToUpdateStatusFor", nodeName) } _, nodeToUpdateVolumeExists := nodeToUpdate.volumesToReportAsAttached[volumeName] if !nodeToUpdateVolumeExists { nodeToUpdate.statusUpdateNeeded = true nodeToUpdate.volumesToReportAsAttached[volumeName] = volumeName asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate klog.V(4).Infof("Report volume %q as attached to node %q", volumeName, nodeName) } }

    2.1.2 processVolumesInUse 处理 in-use 状态的volume

      GetAttachedVolumesForNode 获取所有挂载到 node 上的 volume

      当 volume 已经挂载到node上且状态是in-use时,调用SetVolumeMountedByNode将mountedByNode设置为true,增加 mountedByNodeSetCount 计数

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// processVolumesInUse processes the list of volumes marked as "in-use" // according to the specified Node's Status.VolumesInUse and updates the // corresponding volume in the actual state of the world to indicate that it is // mounted. func (adc *attachDetachController) processVolumesInUse( nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) { klog.V(4).Infof("processVolumesInUse for node %q", nodeName) for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) { mounted := false for _, volumeInUse := range volumesInUse { if attachedVolume.VolumeName == volumeInUse { mounted = true break } } err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted) } }

    2.1.3 addNodeToDswp

     将 node 存入desiredStateOfWorld记录,如果node含有终止挂载volume的注解,需要将node上的volume期望状态设置为detached,进而再根据volume 执行 detachvolume操作

metadata:
  annotations:
    node.alpha.kubernetes.io/ttl: "0"
    volumes.kubernetes.io/controller-managed-attach-detach: "true"

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) { if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists { keepTerminatedPodVolumes := false if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok { keepTerminatedPodVolumes = (t == "true") } // Node specifies annotation indicating it should be managed by attach // detach controller. Add it to desired state of world. adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes) } }

 

复制代码
1
2
3
4
5
6
7
8
9
10
type desiredStateOfWorld struct { // nodesManaged is a map containing the set of nodes managed by the attach/ // detach controller. The key in this map is the name of the node and the // value is a node object containing more information about the node. nodesManaged map[k8stypes.NodeName]nodeManaged // volumePluginMgr is the volume plugin manager used to create volume // plugin objects. volumePluginMgr *volume.VolumePluginMgr sync.RWMutex }

    2.2. populateDesiredStateOfWorld 函数 (对 pod)

       将 podvolume 期望的状态存入 desiredStateOfWorld

      2.2.1 podAdd 

        DetermineVolumeAction,如果 volume pod 需要加入到期望中返回true,如果需要移除返回 false

        ProcessPodVolumes 根据 volumeActionFlag(true/false),添加到期望中,或者从期望中删除

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (adc *attachDetachController) podAdd(obj interface{}) { pod, ok := obj.(*v1.Pod) if pod == nil || !ok { return } if pod.Spec.NodeName == "" { // Ignore pods without NodeName, indicating they are not scheduled. return } volumeActionFlag := util.DetermineVolumeAction( pod, adc.desiredStateOfWorld, true /* default volume action */) util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */ adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister) }

    2.2.1.1 ProcessPodVolumes

       如果 pod 没有被调度,也就是 nodeName为空,或者期望中不包含则不处理

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
nodeName := types.NodeName(pod.Spec.NodeName) if nodeName == "" { klog.V(10).Infof( "Skipping processing of pod %q/%q: it is not scheduled to a node.", pod.Namespace, pod.Name) return } else if !desiredStateOfWorld.NodeExists(nodeName) { // If the node the pod is scheduled to does not exist in the desired // state of the world data structure, that indicates the node is not // yet managed by the controller. Therefore, ignore the pod. klog.V(4).Infof( "Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.", pod.Namespace, pod.Name, nodeName) return }

    对于 pod 所有volume,需要加入期望中的则调用desiredStateOfWorld.AddPod,需要删除的则调用 desiredStateOfWorld.DeletePod

 

    2.3 reconciler Run 函数

     定期执行挂载卸载操作,处理 desiredStateOfWorld 与 actualStateOfWorld,并定期检查 actualStateOfWorld 的状态是真实挂载的,否则将volume从actualStateOfWorld删除,第 3 章节讲解

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (rc *reconciler) Run(stopCh <-chan struct{}) { wait.Until(rc.reconciliationLoopFunc(), rc.loopPeriod, stopCh) } // reconciliationLoopFunc this can be disabled via cli option disableReconciliation. // It periodically checks whether the attached volumes from actual state // are still attached to the node and update the status if they are not. func (rc *reconciler) reconciliationLoopFunc() func() { return func() { rc.reconcile() if rc.disableReconciliationSync { klog.V(5).Info("Skipping reconciling attached volumes still attached since it is disabled via the command line.") } else if rc.syncDuration < time.Second { klog.V(5).Info("Skipping reconciling attached volumes still attached since it is set to less than one second via the command line.") } else if time.Since(rc.timeOfLastSync) > rc.syncDuration { klog.V(5).Info("Starting reconciling attached volumes still attached") rc.sync() } } }

 

    2.4 desiredStateOfWorldPopulator Run 函数

        循环获取新的pod的挂载信息,生成desiredStateOfWorld对应的状态

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) { wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh) } func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() { return func() { dswp.findAndRemoveDeletedPods() // findAndAddActivePods is called periodically, independently of the main // populator loop. if time.Since(dswp.timeOfLastListPods) < dswp.listPodsRetryDuration { klog.V(5).Infof( "Skipping findAndAddActivePods(). Not permitted until %v (listPodsRetryDuration %v).", dswp.timeOfLastListPods.Add(dswp.listPodsRetryDuration), dswp.listPodsRetryDuration) return } dswp.findAndAddActivePods() } }

    2.4.1 findAndRemoveDeletedPods 将nodesManaged中不再存在的pod删除

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
informerPod, err := dswp.podLister.Pods(namespace).Get(name) switch { case errors.IsNotFound(err): // if we can't find the pod, we need to delete it below case err != nil: klog.Errorf("podLister Get failed for pod %q (UID %q) with %v", dswPodKey, dswPodUID, err) continue default: volumeActionFlag := util.DetermineVolumeAction( informerPod, dswp.desiredStateOfWorld, true /* default volume action */) if volumeActionFlag { informerPodUID := volutil.GetUniquePodName(informerPod) // Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer if informerPodUID == dswPodUID { klog.V(10).Infof("Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID) continue } } } // the pod from dsw does not exist in pod informer, or it does not match the unique identifier retrieved // from the informer, delete it from dsw klog.V(1).Infof("Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID) dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)

    2.4.2 findAndAddActivePods 将新的active的pod及pod上的volume存入nodesManaged

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() { pods, err := dswp.podLister.List(labels.Everything()) if err != nil { klog.Errorf("podLister List failed: %v", err) return } dswp.timeOfLastListPods = time.Now() for _, pod := range pods { if volutil.IsPodTerminated(pod, pod.Status) { // Do not add volumes for terminated pods continue } util.ProcessPodVolumes(pod, true, dswp.desiredStateOfWorld, dswp.volumePluginMgr, dswp.pvcLister, dswp.pvLister) } }

 

    2.5 pvcWorker

       处理队列中 PVC,调用函数 syncPVCByKey函数 如下

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// pvcWorker processes items from pvcQueue func (adc *attachDetachController) pvcWorker() { for adc.processNextItem() { } } func (adc *attachDetachController) processNextItem() bool { keyObj, shutdown := adc.pvcQueue.Get() if shutdown { return false } defer adc.pvcQueue.Done(keyObj) if err := adc.syncPVCByKey(keyObj.(string)); err != nil { // Rather than wait for a full resync, re-add the key to the // queue to be processed. adc.pvcQueue.AddRateLimited(keyObj) runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err)) return true } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. adc.pvcQueue.Forget(keyObj) return true }

    2.5.1 syncPVCByKey 函数

      对于未bound的不处理,调用 ProcessPodVolumes 更新 desciredStateOfWorld

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (adc *attachDetachController) syncPVCByKey(key string) error { namespace, name, err := kcache.SplitMetaNamespaceKey(key) pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name) if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { // Skip unbound PVCs. return nil } objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key) for _, obj := range objs { volumeActionFlag := util.DetermineVolumeAction( pod, adc.desiredStateOfWorld, true /* default volume action */) util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */ adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister) } return nil }

 

3. reconcile 函数

     遍历 actualStateOfWorld 中已经挂载的volume,如果 volume 不在 desiredStateOfWorld中如下处理

复制代码
1
2
3
4
5
6
7
8
func (rc *reconciler) reconcile() { // Detaches are triggered before attaches so that volumes referenced by // pods that are rescheduled to a different node are detached first. // Ensure volumes that should be detached are detached. for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { if !rc.desiredStateOfWorld.VolumeExists( attachedVolume.VolumeName, attachedVolume.NodeName) {

    3.1 挂载的时间超过 maxWaitForUnmountDuration

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
// Set the detach request time elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) if err != nil { klog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err) continue } // Check whether timeout has reached the maximum waiting time timeout := elapsedTime > rc.maxWaitForUnmountDuration // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout if attachedVolume.MountedByNode && !timeout { klog.V(12).Infof(attachedVolume.GenerateMsgDetailed("Cannot detach volume because it is still mounted", "")) continue }

    3.2 把 actualStateOfWorld 中删除 attached状态

复制代码
1
2
3
4
5
6
7
8
9
// Before triggering volume detach, mark volume as detached and update the node status // If it fails to update node status, skip detach volume err = rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) if err != nil { klog.V(5).Infof("RemoveVolumeFromReportAsAttached failed while removing volume %q from node %q with: %v", attachedVolume.VolumeName, attachedVolume.NodeName, err) }

    3.3 DetachVolume 调用volume插件去detach

复制代码
1
2
3
4
5
// Trigger detach volume which requires verifing safe to detach step // If timeout is true, skip verifySafeToDetach check klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting attacherDetacher.DetachVolume", "")) verifySafeToDetach := !timeout err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)

    3.4 attachDesiredVolumes

       确保 volume 应该被attach 或者 detach,所有 desiredStateOfWorld 需要 attach 的 volume,如果 actualStateOfWorld 中存在,表示已经 attach,调用 ResetDetachRequestTime 重置 attach 请求时间

复制代码
1
2
3
4
5
6
7
8
9
10
11
func (rc *reconciler) attachDesiredVolumes() { // Ensure volumes that should be attached are attached. for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() { if rc.actualStateOfWorld.VolumeNodeExists(volumeToAttach.VolumeName, volumeToAttach.NodeName) { // Volume/Node exists, touch it to reset detachRequestedTime if klog.V(5) { klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", "")) } rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) continue }

    3.4.1 看看 volume 是否可以可以 attach 到多个节点,根据 PVC 设置的 accessMode,但这里会有问题,比如节点down或者kubelet,pod调度到其他节点,但是attach还在,就不能attach到其他节点

出现错误:

attachdetach-controller  Multi-Attach error for volume "pvc-d0fde86c-8661-11e9-b873-0800271c9f15" Volume is already used by pod

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) { nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName) if len(nodes) > 0 { if !volumeToAttach.MultiAttachErrorReported { rc.reportMultiAttachError(volumeToAttach, nodes) rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName) } continue } } // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { return false } }

    3.4.2 对于需要attach的调用插件的 AttachVolume 没啥可说的了

   rc.attacherDetacher.AttachVolume

              --> GenerateAttachVolumeFunc

                       --> attachableVolumePlugin.NewAttacher()

                      --> volumeAttacher.Attach

                      --> actualStateOfWorld.MarkVolumeAsAttached

复制代码
1
2
3
4
5
6
7
8
9
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld) if err == nil { klog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", "")) } if err != nil && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error()) }

 

      比如使用 CSI plugin,则 NewAttacher 方法,实现在 pkg/volume/csi/csi_plugin.go

复制代码
1
2
3
func (p *csiPlugin) NewAttacher() (volume.Attacher, error) { return p.newAttacherDetacher() }

     CSI plugin 插件的 Attach 方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { if spec == nil { klog.Error(log("attacher.Attach missing volume.Spec")) return "", errors.New("missing spec") } pvSrc, err := getPVSourceFromSpec(spec) if err != nil { klog.Error(log("attacher.Attach failed to get CSIPersistentVolumeSource: %v", err)) return "", err } node := string(nodeName) attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node)

       创建了 volumeattachment 资源

kind: VolumeAttachment
metadata:
  creationTimestamp: "2020-05-14T02:28:32Z"
  name: csi-fbcecaed2a122919ca11ef1fa40d9d51f545db042b9a0acf386c0726c9100a5d
  resourceVersion: "995957"
  selfLink: /apis/storage.k8s.io/v1/volumeattachments/csi-fbcecaed2a122919ca11ef1fa40d9d51f545db042b9a0acf386c0726c9100a5d
  uid: a2ab45ec-a90c-4614-8361-76c0144e70e0
spec:
  attacher: hostpath.csi.k8s.io
  nodeName: master-node
  source:
    persistentVolumeName: pvc-149bb57c-4c8f-48bf-8439-09dd505e6aa2
status:
  attached: true

最后

以上就是彩色星月最近收集整理的关于【kubernetes/k8s源码分析】kube-controller-manager之attach detach controller源码分析1. NewAttachDetachController2. Run 函数3. reconcile 函数的全部内容,更多相关【kubernetes/k8s源码分析】kube-controller-manager之attach内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(76)

评论列表共有 0 条评论

立即
投稿
返回
顶部