OpenKruise Source Reading
OpenKruise Source Reading

OpenKruise Source Reading

Created
Oct 24, 2022 11:54 PM
Text
OpenKruise 由阿里云开源,已经在阿里内部经过了大量的生产实践检验,个人认为是阿里云开源的一个非常技术硬核的项目,对于想学习编写控制器的同学来说可以学到更贴近”云原生”的写法。还能学到控制器在大规模k8s下性能优化的小技巧。
Tags
k8s
controller
OpenKruise 是一个基于 Kubernetes 的扩展套件,主要聚焦于云原生应用的自动化,比如部署、发布、运维以及可用性保护OpenKruise提供了大量的增强版本的工作负载,不仅支持类似 k8s 原生工作负载的基础功能,还提供了原地升级、可配置扩缩容/发布测试等。

CloneSet Controller

CloneSet 控制器提供了高效管理无状态应用的能力,你可以把他对比成原生的 Deployment,但 CloneSet 提供了很多增强功能:
  1. 更多可配置化的升级策略
  1. 配合外部组件管理 Pod 的生命周期
想要了解 CloneSet 的更多用法可以参考官方文档:
本节从多个发布场景出发配合 CloneSet Controller 源码来帮你梳理控制器的执行流程和逻辑。控制器的执行入口在: pkg/controller/cloneset/cloneset_controller.go#doReconcile

初始版本发布

该场景是企业发布服务的第一个版本,在这个场景下 CloneSet 控制器的执行流程和步骤都比较简单。首先,我们在自己的集群中安装 OpenKruise,具体安装方式请参考官方文档
apiVersion: apps.kruise.io/v1alpha1 kind: CloneSet metadata: labels: app: sample name: sample spec: replicas: 5 selector: matchLabels: app: sample template: metadata: labels: app: sample spec: containers: - name: nginx image: nginx:1.22-alpine
复制上面的YAML描述,执行kubectl apply 指令在集群中创建一个CloneSet。
➜ pbpaste | kubectl apply -f - cloneset.apps.kruise.io/sample created ➜ kubectl get cloneset NAME DESIRED UPDATED UPDATED_READY READY TOTAL AGE sample 5 18s

首次协调:创建实例

notion image
1.获取待处理的 CloneSet 对象:informer 监听集群中 CloneSet 资源的变化,当接收到 CloneSet的创建事件会通知 CloneSet Controller 开始处理新的 CloneSet,我们可以跳转到 doReconcile 函数查看控制器是如何处理新创建的 CloneSet。控制器首先会调用 Client 从本地缓存(缓存和etcd数据保持同步)中获取到要处理的 CloneSet 对象完整信息。
instance := &appsv1alpha1.CloneSet{} err := r.Get(context.TODO(), request.NamespacedName, instance)
2.初始化 Control 对象Control 是抽象出的一个通用的接口,可以看到这个接口需要实现一些计算Revision、处理扩缩容和 Pod 更新相关的操作。不过这里创建出的 Control 对象,貌似用处不大,在其他执行函数中,会经常见到这个对象被频繁创建出来。
3.获取 CloneSet 所管控的资源:控制器获取所有 CloneSet 管控活跃的 Pod 和 PVC。
// list all active Pods and PVCs belongs to cs filteredPods, filteredPVCs, err := r.getOwnedResource(instance)
这里活跃的 Pod 是指:没有被删除的,并且 Pod.Status.Phase 不处于 SucceededFailed 状态的Pod集合;活跃的 PVC 是指没有被删除的 PVC 集合。
4.过滤有多个管控 Owner 的 Pod:这一步会过滤掉不满足 CloneSet selector 筛选条件和一些有多个Owner的Pod,同时会释放掉CloneSet对这些有多个Owner的Pod的管理权限。
5.获取 CloneSet 当前的 Revision 和 计算待更新的下个版本的 Revision:当前场景比较简单,CloneSet 没有历史版本,只负责创建存储当前 CloneSet 快照信息的 ControlllerRevision即可。这里会使用第2步创建的 Control 对象生成 Revision模板存储到ControlllerRevision 对象的 Data 字段中, ControlllerRevision 的名字由Data的Hash值生成。最终返回的 currentRevision 和 updateRevision 均指向这个 ControlllerRevision 对象。
func (c *commonControl) SetRevisionTemplate(revisionSpec map[string]interface{}, template map[string]interface{}) { revisionSpec["template"] = template template["$patch"] = "replace" }
其中,SetRevisionTemplate 函数会给 cloneSet.Spec.Template 字段新增一个 $patch:replace 键值对。后面我们会提到为什么要新增这个字段,最后生成的 ControllerRevision 对象如下:
apiVersion: apps/v1 kind: ControllerRevision metadata: labels: app: sample controller.kubernetes.io/hash: 66bf7df478 name: sample-66bf7df478 ownerReferences: - apiVersion: apps.kruise.io/v1alpha1 kind: CloneSet name: sample data: spec: template: $patch: replace metadata: creationTimestamp: null labels: app: sample spec: containers: - image: nginx:1.22-alpine name: nginx resources: {} revision: 1
💡
ControllerRevision 实现了状态数据的不可变快照,APIServer 将拒绝所有尝试改变 data 字段的请求。 但是,可以删除 ControllerRevisions。 DaemonSet 和 StatefulSet 控制器都使用它来进行更新和回滚。
6.创建 Pods :在 初始版本发布 场景下 r.syncCloneSet 的操作会比较简单,在涉及版本升级的场景时,我们再详细分析这部分。这里涉及的操作有:先计算需要扩容的个数这里就等于Spec.Replicas,在真正把初始版本的 Pod 创建到集群前,会把扩容创建的Pod信息记录到 realScaleExpectations 对象中,后调用 DoItSlowly 方法分批并发创建 Pod,如果创建Pod失败,会把创建失败的 Pod 从 realScaleExpectations 对象中移除。realScaleExpectations 是用来解决 Indexer 同步延迟导致控制器可能会扩容出错误个数Pod 的问题,文章 100 K8s Mistakes and How to Avoid Them 会有详细的讲解。
简单解释一下, 控制器获取 k8s资源数据是直接访问本地缓存(Indexer),但是 Indexer的数据同步因为网络等问题会出现延迟的情况,导致在第三步获取管控资源(Pod、PVC)的时候会出现不一致的问题,容易导致控制器后续处理的时候会创建出多余的 Pod,为了解决这个问题就加入了一个临时缓存,如上图所示,当Informar监听到Pod创建的事件就会调用 ObserveScale 函数把之前记录的Pod信息去掉。
// pkg/controller/cloneset/cloneset_controller.go:#add // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { ... // Watch for changes to Pod err = c.Watch(&source.Kind{Type: &v1.Pod{}}, &podEventHandler{Reader: mgr.GetCache()}) if err != nil { return err } ... } // pkg/controller/cloneset/cloneset_event_handler.go:#Create func (e *podEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { ... // If it has a ControllerRef, that's all that matters. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil { req := resolveControllerRef(pod.Namespace, controllerRef) if req == nil { return } klog.V(4).Infof("Pod %s/%s created, owner: %s", pod.Namespace, pod.Name, req.Name) clonesetutils.ScaleExpectations.ObserveScale(req.String(), expectations.Create, pod.Name) q.Add(*req) return } ... }
在控制器每次协调的时候,会调用SatisfiedExpectations 来判断是不是读到了脏数据,如果数据没有及时更新就直接返回把事件重新入队,直到数据更新。
// If scaling expectations have not satisfied yet, just skip this reconcile. if scaleSatisfied, unsatisfiedDuration, scaleDirtyPods := clonesetutils.ScaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied { if unsatisfiedDuration >= expectations.ExpectationTimeout { klog.Warningf("Expectation unsatisfied overtime for %v, scaleDirtyPods=%v, overtime=%v", request.String(), scaleDirtyPods, unsatisfiedDuration) return reconcile.Result{}, nil } klog.V(4).Infof("Not satisfied scale for %v, scaleDirtyPods=%v", request.String(), scaleDirtyPods) return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil }
7.更新CloneSet Status :第一次协调,仅仅记录少量的信息到 Status字段中,等到控制器获取到 Pod 状态更新的事件之后的后续协调会继续更新 Status 字段。
status: availableReplicas: 0 collisionCount: 0 currentRevision: sample-66bf7df478 expectedUpdatedReplicas: 5 labelSelector: app=sample observedGeneration: 1 readyReplicas: 0 replicas: 0 updateRevision: sample-66bf7df478 updatedReadyReplicas: 0 updatedReplicas: 0
8.执行一些垃圾回收的操作: 这一步会清理一些历史 ControlllerRevison 对象,降低k8s的存储压力,k8s不会主动回收 ControllerRevison 资源,需要创建者负责相关资源的回收。

后续协调:状态更新

从函数 pkg/controller/cloneset/cloneset_controller.go#add ,可以看到控制器会监听所有 CloneSet、Pod和PVC资源的变化,当CloneSet创建出的Pod资源状态更新的时候,都会触发控制器继续协调更新实例状态。
func add(mgr manager.Manager, r reconcile.Reconciler) error { ... // Watch for changes to CloneSet err = c.Watch(&source.Kind{Type: &appsv1alpha1.CloneSet{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool { oldCS := e.ObjectOld.(*appsv1alpha1.CloneSet) newCS := e.ObjectNew.(*appsv1alpha1.CloneSet) if *oldCS.Spec.Replicas != *newCS.Spec.Replicas { klog.V(4).Infof("Observed updated replicas for CloneSet: %s/%s, %d->%d", newCS.Namespace, newCS.Name, *oldCS.Spec.Replicas, *newCS.Spec.Replicas) } return true }, }) if err != nil { return err } // Watch for changes to Pod err = c.Watch(&source.Kind{Type: &v1.Pod{}}, &podEventHandler{Reader: mgr.GetCache()}) if err != nil { return err } // Watch for changes to PVC, just ensure cache updated err = c.Watch(&source.Kind{Type: &v1.PersistentVolumeClaim{}}, &pvcEventHandler{}) if err != nil { return err } return nil }
每次协调的执行逻辑和首次协调的执行流程大致相似,主要的处理逻辑在函数pkg/controller/cloneset/cloneset_status.go#calculateStatus 中。
我们主要看下 Status 字段中和 Replicas 相关数值的计算。
1.符合ReadyReplicas 条件的 Pod 需要满足 InPlaceUpdateReady PodReady 这2个 Condition 的状态为 True ,并且 PodPhase 处于 Running 状态。
2.AvailableReplicas 的要求比 ReadyReplicas 更严格一些,除了满足 ReadyReplicas 的条件,还需要 Pod 的生命周期状态为 Normal (CloneSet生命周期相关的内容会在后面介绍)。除此之外,Pod 的 Condition 转变到 PodReady 的时间要超过在 CloneSet 中设置的 minReadSeconds
notion image
3.UpdatedReplicas 等于处于最新版本的 Pod 。
4.UpdatedReadyReplicas 表示处于最新版本而且满足 ReadyReplicas 的 Pod。
5.ExpectedUpdatedReplicas 等于 Replicas - partition (partition 表示期望保留老版本 Pod 的个数)
最终协调后的 Status 如下:
status: availableReplicas: 5 collisionCount: 0 currentRevision: sample-66bf7df478 expectedUpdatedReplicas: 5 labelSelector: app=sample observedGeneration: 1 readyReplicas: 5 replicas: 5 updateRevision: sample-66bf7df478 updatedReadyReplicas: 5 updatedReplicas: 5

新版本发布

随着业务需求的更新,企业很快迭代出了一个新版本要部署,构建打包好镜像之后,准备向集群下发新版本的 CloneSet :我们除了更改了服务镜像,还新增了2个更新策略,partition: 3 表示我们会保留 3 个老版本的 Pod,maxSurge: 2 表示允许控制器一下子扩出超过 replicas 个数的 Pod。
💡
解释这样配置的原因
apiVersion: apps.kruise.io/v1alpha1 kind: CloneSet metadata: labels: app: sample name: sample namespace: default spec: minReadySeconds: 10 # 设置更新策略 updateStrategy: # 保留旧版本 Pod 的数量或百分比 partition: 3 # 控制最多能扩出来超过 replicas 的 Pod 数量 maxSurge: 2 replicas: 5 selector: matchLabels: app: sample template: metadata: labels: app: sample spec: containers: - image: nginx:1.23-alpine name: nginx
复制👆🏻的 CloneSet 并更新到集群中。
➜ pbpaste | kubectl apply -f - cloneset.apps.kruise.io/sample configured

更新后的首次协调:

此时如果直接删除ToDelete的pod那么不可以的个数就会超过maxUnavailable,不符合需求,所以先计算出来在满足条件下要扩出来的实例个数

总结

  1. maxsurge 2 maxUnavailable (在replica的前提能忍受多少个)
  1. maxsurge 2 scaleMaxUnavailable(全局能忍受多少个不可用)
  1. maxsurge + maxUnavailable 发布窗口

Loading Comments...