構建高可用的 Kubernetes Operator

【導讀】本文介紹瞭如何使用 K8s Operator 提供的能力實現部署的微服務應用高可用。

從頭開始開發一個簡單的 Kubernetes Operator。

當一個任務在 Kubernetes 集羣中重複執行時,這可能是我們沒有充分利用 Kubernetes 提供的所有特性,因爲它的功能就是自動化任務的執行。通常,這些任務是由人工操作執行的,他們對系統應該如何運行、如何部署 應用程序 以及如何排除問題有深入的瞭解。

在創建一個 operator 之前,我們需要考慮標準方法,爲我們的應用程序選擇正確的 Kubernetes 資源。假設,我們的應用是有狀態的,StatefulSet 或許比 Deployment 更合適, 因爲它提供了額外的特性,您可以從中受益,比如惟一的網絡標識符、持久化存儲、有序部署 等等。

如果這種方法不合適,標準資源不能覆蓋我們應用程序的特定領域邏輯,我們將需要擴展 Kubernetes 功能來實現自動化並實現 Kubernetes operator。在本文中,我們將使用 client-go 庫構建 hello world operator,對其進行調整以實現高可用性,並使用 Helm 將其部署到 Kubernetes 集羣中。

什麼是 Kubernetes Operator

Operators 是 Kubernetes 的擴展,用於處理自定義資源 (CRD),對應用程序的特定用例進行處理。爲此,它們遵循 operator 模式,特別是控制循環,這是一個無限循環,確保集羣的狀態滿足用戶在 CRD 中聲明定義的要求。一些 operators 例子:

Operator 架構

operator 的主要功能就是監視 kubernetes API 的變化,並作出響應確保集羣的狀態滿足 CRD 中聲明的需求。由於集羣中的事件數量可能是巨大的,對 operator 的合理設計將確保其高性能和可伸縮性:

自定義資源的定義

在開發 operator 代碼之前,我們需要定義將要處理的 CRD。與其他 API 一樣,Kubernetes 允許您使用 OpenAPI 模式定義它的自定義 API 對象。代碼如下:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: echos.mmontes.io
spec:
  group: mmontes.io
  names:
    kind: Echo
    listKind: EchoList
    plural: echos
    singular: echo
    shortNames:
      - ec
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                message:
                  type: string
              required:
                - message

這個 CustomResourceDefinition 資源將由 Helm 在安裝我們的圖表時創建;我們只需要把它放在 CRDS 文件夾。我們將進一步瞭解 Helm chart 的細節。現在,我們可以在下面的代碼中定義 operator 使用的 Go 類型:

package v1alpha1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen=true
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Echo struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata"`
    Spec              EchoSpec `json:"spec"`
}

type EchoSpec struct {
    Message string `json:"message"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type EchoList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata"`
    Items           []Echo `json:"items"`
}

對結構進行註釋,以生成與我們的 CRDs 及其深拷貝方法相關的 clientsets 和 informer。爲了實現這一點,我們將使用以下基於 http://k8s.io/code-generator 的腳本:

#!/usr/bin/env bash

set -o errexit
set -o nounset
set -o pipefail

SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(
  cd "${SCRIPT_ROOT}"
  ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator
)}
GO_PKG="github.com/mmontes11/echoperator/pkg"

bash "${CODEGEN_PKG}"/generate-groups.sh "all" \
  ${GO_PKG}/echo/v1alpha1/apis \
  ${GO_PKG} \
  echo:v1alpha1 \
  --go-header-file "${SCRIPT_ROOT}"/codegen/boilerplate.go.txt

控制器(controller)

我們需要配置的第一件事是與 Kubernetes API 服務的連接。這裏有兩個選擇:

一旦我們創建了連接,我們就可以實例化一個核心 Kubernetes clientset,並使用我們的 CRD clientset 將它們作爲依賴傳遞給 operator(也就是控制器):

var restConfig *rest.Config
var errKubeConfig error
if config.KubeConfig != "" {
    restConfig, errKubeConfig = clientcmd.BuildConfigFromFlags("", config.KubeConfig)
} else {
    restConfig, errKubeConfig = rest.InClusterConfig()
}
if errKubeConfig != nil {
    logger.Fatal("error getting kubernetes config ", err)
}

kubeClientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
    logger.Fatal("error getting kubernetes client ", err)
}
echov1alpha1ClientSet, err := echov1alpha1clientset.NewForConfig(restConfig)
if err != nil {
    logger.Fatal("error creating echo client ", err)
}

ctrl := controller.New(
    kubeClientSet,
    echov1alpha1ClientSet,
    config.Namespace,
    logger.WithField("type""controller"),
)

之後,我們可以配置控制器的 informers,以便開始接收有關我們感興趣的資源的事件。我們將使用 cache.SharedIndexInformer 來實現並將 informer 和 indexer 的職責集中在一個對象上。換句話說,此對象會維護自己更新的索引,並允許您配置事件處理程序,以便在資源更改時得到通知。唯一的要求是它需要在啓動時同步。

事件將被放入 workerqueue.RateLimiterInterface 隊列中。用於對將要處理的工作排隊,而不是在事件發生時立即執行。通過這種方式,我們可以確保一次只處理固定數量的對象,而且我們永遠不會在不同的 worker 中同時處理同一事件。以下是控制器代碼:

type Controller struct {
    kubeClientSet kubernetes.Interface

    echoInformer          cache.SharedIndexInformer
    jobInformer           cache.SharedIndexInformer
    scheduledEchoInformer cache.SharedIndexInformer
    cronjobInformer       cache.SharedIndexInformer

    queue workqueue.RateLimitingInterface

    namespace string

    logger log.Logger
}

func (c *Controller) Run(ctx context.Context, numWorkers int) error {
    defer utilruntime.HandleCrash()
    defer c.queue.ShutDown()

    c.logger.Info("starting controller")

    c.logger.Info("starting informers")
    for _, i := range []cache.SharedIndexInformer{
        c.echoInformer,
        c.scheduledEchoInformer,
        c.jobInformer,
        c.cronjobInformer,
    } {
        go i.Run(ctx.Done())
    }

    c.logger.Info("waiting for informer caches to sync")
    if !cache.WaitForCacheSync(ctx.Done()[]cache.InformerSynced{
        c.echoInformer.HasSynced,
        c.scheduledEchoInformer.HasSynced,
        c.jobInformer.HasSynced,
        c.cronjobInformer.HasSynced,
    }...) {
        err := errors.New("failed to wait for informers caches to sync")
        utilruntime.HandleError(err)
        return err
    }

    c.logger.Infof("starting %d workers", numWorkers)
    for i := 0; i < numWorkers; i++ {
        go wait.Until(func() {
            c.runWorker(ctx)
        }, time.Second, ctx.Done())
    }
    c.logger.Info("controller ready")

    <-ctx.Done()
    c.logger.Info("stopping controller")

    return nil
}

func (c *Controller) addEcho(obj interface{}) {
    c.logger.Debug("adding echo")
    echo, ok := obj.(*echov1alpha1.Echo)
    if !ok {
        c.logger.Errorf("unexpected object %v", obj)
        return
    }
    c.queue.Add(event{
        eventType: addEcho,
        newObj:    echo.DeepCopy(),
    })
}

func New(
    kubeClientSet kubernetes.Interface,
    echoClientSet echov1alpha1clientset.Interface,
    namespace string,
    logger log.Logger,
) *Controller {

    echoInformerFactory := echoinformers.NewSharedInformerFactory(
        echoClientSet,
        10*time.Second,
    )
    echoInformer := echoInformerFactory.Mmontes().V1alpha1().Echos().Informer()
    scheduledechoInformer := echoInformerFactory.Mmontes().V1alpha1().ScheduledEchos().Informer()

    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClientSet, 10*time.Second)
    jobInformer := kubeInformerFactory.Batch().V1().Jobs().Informer()
    cronjobInformer := kubeInformerFactory.Batch().V1().CronJobs().Informer()

    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    ctrl := &Controller{
        kubeClientSet: kubeClientSet,

        echoInformer:          echoInformer,
        jobInformer:           jobInformer,
        scheduledEchoInformer: scheduledechoInformer,
        cronjobInformer:       cronjobInformer,

        queue: queue,

        namespace: namespace,

        logger: logger,
    }

    echoInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: ctrl.addEcho,
    })
    scheduledechoInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    ctrl.addScheduledEcho,
        UpdateFunc: ctrl.updateScheduledEcho,
    })

    return ctrl
}

worker

worker 的職責是通過執行任務來處理來自隊列的事件確保集羣處於所聲明狀態。爲此,worker 實現了一個無限控制循環,根據用戶的要求調節狀態。在我們的例子中,調節狀態意味着創建一個 Job 來響應一個添加自定義 Echo 資源的事件。

我們將使用 http://k8s.io/api,通過編程創建 Kubernetes 資源:

import (
    echo "github.com/mmontes11/echoperator/pkg/echo"
    echov1alpha1 "github.com/mmontes11/echoperator/pkg/echo/v1alpha1"
    batchv1 "k8s.io/api/batch/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func createJob(newEcho *echov1alpha1.Echo, namespace string) *batchv1.Job {
    return &batchv1.Job{
        ObjectMeta: metav1.ObjectMeta{
            Name:      newEcho.ObjectMeta.Name,
            Namespace: namespace,
            Labels:    make(map[string]string),
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(
                    newEcho,
                    echov1alpha1.SchemeGroupVersion.WithKind(echo.EchoKind),
                ),
            },
        },
        Spec: createJobSpec(newEcho.Name, namespace, newEcho.Spec.Message),
    }
}

func createJobSpec(name, namespace, msg string) batchv1.JobSpec {
    return batchv1.JobSpec{
        Template: corev1.PodTemplateSpec{
            ObjectMeta: metav1.ObjectMeta{
                GenerateName: name + "-",
                Namespace:    namespace,
                Labels:       make(map[string]string),
            },
            Spec: corev1.PodSpec{
                Containers: []corev1.Container{
                    {
                        Name:            name,
                        Image:           "busybox:1.33.1",
                        Command:         []string{"echo", msg},
                        ImagePullPolicy: "IfNotPresent",
                    },
                },
                RestartPolicy: corev1.RestartPolicyNever,
            },
        },
    }
}

上面代碼實現以編程方式創建 Kubernetes 對象。

事件類型決定調用哪個方法以及在哪裏創建或更新相應的對象。值得注意的是,當控制器啓動時,出於一致性的原因,我們將收到添加事件,因此我們需要檢查是否已經創建了對象,以避免創建兩次。實現這一點的策略是從對象元數據中獲取一個鍵,並檢查它是否已經存在於索引中。代碼如下:

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextItem(ctx) {
    }
}

func (c *Controller) processNextItem(ctx context.Context) bool {
    obj, shutdown := c.queue.Get()
    if shutdown {
        return false
    }
    defer c.queue.Done(obj)

    err := c.processEvent(ctx, obj)
    if err == nil {
        c.logger.Debug("processed item")
        c.queue.Forget(obj)
    } else if c.queue.NumRequeues(obj) < maxRetries {
        c.logger.Errorf("error processing event: %v, retrying", err)
        c.queue.AddRateLimited(obj)
    } else {
        c.logger.Errorf("error processing event: %v, max retries reached", err)
        c.queue.Forget(obj)
        utilruntime.HandleError(err)
    }

    return true
}

func (c *Controller) processEvent(ctx context.Context, obj interface{}) error {
    event, ok := obj.(event)
    if !ok {
        c.logger.Error("unexpected event ", obj)
        return nil
    }
    switch event.eventType {
    case addEcho:
        return c.processAddEcho(ctx, event.newObj.(*echov1alpha1.Echo))
    case addScheduledEcho:
        return c.processAddScheduledEcho(ctx, event.newObj.(*echov1alpha1.ScheduledEcho))
    case updateScheduledEcho:
        return c.processUpdateScheduledEcho(
            ctx,
            event.oldObj.(*echov1alpha1.ScheduledEcho),
            event.newObj.(*echov1alpha1.ScheduledEcho),
        )
    }
    return nil
}

func (c *Controller) processAddEcho(ctx context.Context, echo *echov1alpha1.Echo) error {
    job := createJob(echo, c.namespace)
    exists, err := resourceExists(job, c.jobInformer.GetIndexer())
    if err != nil {
        return fmt.Errorf("error checking job existence %v", err)
    }
    if exists {
        c.logger.Debug("job already exists, skipping")
        return nil
    }

    _, err = c.kubeClientSet.BatchV1().
        Jobs(c.namespace).
        Create(ctx, job, metav1.CreateOptions{})
    return err
}

func resourceExists(obj interface{}, indexer cache.Indexer) (bool, error) {
    key, err := cache.MetaNamespaceKeyFunc(obj)
    if err != nil {
        return false, fmt.Errorf("error getting key %v", err)
    }
    _, exists, err := indexer.GetByKey(key)
    return exists, err
}

運行高可用的控制器

部署控制器之前最後一件事是根據配置確定使用哪種架構:單機架構還是高可用性架構。這正是 runner 的責任,但在本文中,我們將重點關注高可用性。高可用性意味着需運行控制器的多個副本,以確保如果 leader 宕機,另一個副本將獲得 leader 並開始運行控制循環來處理 CRDs。

Kubernetes 是爲解決類似這樣的分佈式系統問題而設計的,並提供開箱即用的解決方案。在這種情況下,一個租賃(lease)對象會負責這件事;該對象可以被看作是一個分佈式互斥鎖,它只能有一個副本,根據該副本來確定 leader。這看起來很好,但是… Kubernetes 如何高效地做到這一點呢?

Kubernetes 使用 etcd 分佈式鍵 - 值存儲,與其他鍵 - 值存儲不同,它提供了一種 watching keys 機制。能及時發現對象的變更,而不需要進行長時間的輪詢或消耗額外的網絡資源。此外,client-go 提供了 leaderelection 包,它在底層使用租約對象提供了一個抽象。下面是代碼:

type Runner struct {
    ctrl      *controller.Controller
    clientset *kubernetes.Clientset
    config    config.Config
    logger    log.Logger
}

func (r *Runner) Start(ctx context.Context) {
    if r.config.HA.Enabled {
        r.logger.Info("starting HA controller")
        r.runHA(ctx)
    } else {
        r.logger.Info("starting standalone controller")
        r.runSingleNode(ctx)
    }
}

func (r *Runner) runSingleNode(ctx context.Context) {
    if err := r.ctrl.Run(ctx, r.config.NumWorkers); err != nil {
        r.logger.Fatal("error running controller ", err)
    }
}

func (r *Runner) runHA(ctx context.Context) {
    if r.config.HA == (config.HA{}) || !r.config.HA.Enabled {
        r.logger.Fatal("HA config not set or not enabled")
    }

    lock := &resourcelock.LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Name:      r.config.HA.LeaseLockName,
            Namespace: r.config.Namespace,
        },
        Client: r.clientset.CoordinationV1(),
        LockConfig: resourcelock.ResourceLockConfig{
            Identity: r.config.HA.NodeId,
        },
    }
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock:            lock,
        ReleaseOnCancel: true,
        LeaseDuration:   r.config.HA.LeaseDuration,
        RenewDeadline:   r.config.HA.RenewDeadline,
        RetryPeriod:     r.config.HA.RetryPeriod,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                r.logger.Info("start leading")
                r.runSingleNode(ctx)
            },
            OnStoppedLeading: func() {
                r.logger.Info("stopped leading")
            },
            OnNewLeader: func(identity string) {
                if identity == r.config.HA.NodeId {
                    r.logger.Info("obtained leadership")
                    return
                }
                r.logger.Infof("leader elected: '%s'", identity)
            },
        },
    })
}

部署到 Kubernetes 集羣

我們的 operator 代碼已經準備好部署了。下一步將創建一個 Helm chart。首先將 values.yml 用於配置 Kubernetes 資源。代碼:

nameOverride: ""
fullnameOverride: ""

image:
  repository: mmontes11/echoperator
  pullPolicy: IfNotPresent
  tag: v0.0.1

env: production

logLevel: info

numWorkers: 4

ha:
  enabled: true
  leaderElection:
    leaseDurationSeconds: 15
    renewDeadlineSeconds: 10
    retryPeriodSeconds: 2
  replicaCount: 3

monitoring:
  enabled: true
  path: /metrics
  port: 2112
  namespace: monitoring
  interval: 10s
  labels:
    release: monitoring

resources: {}

nodeSelector: {}

如您所見,有一個用於配置高可用性的對象 ha。完成這些之後,現在我們可以在 configmap 中創建與高可用性相關的鍵了,如下所示:

{{ $fullName := include "echoperator.fullname" . }}
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ $fullName }}
  labels:
    {{ include "echoperator.labels" . | nindent 4 }}
data:
  NAMESPACE: {{ .Release.Namespace }}
  NUM_WORKERS: {{ .Values.numWorkers | quote }}
  HA_ENABLED: {{ .Values.ha.enabled | quote }}
  {{ if .Values.ha.enabled }}
  HA_LEASE_LOCK_NAME: {{ $fullName }}
  HA_LEASE_DURATION_SECONDS: {{ .Values.ha.leaderElection.leaseDurationSeconds | quote }}
  HA_RENEW_DEADLINE_SECONDS: {{ .Values.ha.leaderElection.renewDeadlineSeconds | quote }}
  HA_RETRY_PERIOD_SECONDS: {{ .Values.ha.leaderElection.retryPeriodSeconds | quote }}
  {{ end }}
  METRICS_ENABLED: {{ .Values.monitoring.enabled | quote }}
  {{ if .Values.monitoring.enabled }}
  METRICS_PATH: {{ .Values.monitoring.path }}
  METRICS_PORT: {{ .Values.monitoring.port | quote }}
  {{ end }}
  ENV: {{ .Values.env }}
  LOG_LEVEL: {{ .Values.logLevel }}

如果要高可用,deployment 將設置 replicas 鍵,並引用這個 configmap,將其鍵作爲環境變量導出到 pod 中。代碼如下:

{{ $fullName := include "echoperator.fullname" . }}
{{ $selectorLabels := include "echoperator.selectorLabels" . }}
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ $fullName }}
  labels:
    {{ include "echoperator.labels" . | nindent 4 }}
spec:
  {{ if .Values.ha.enabled}}
  replicas: {{ .Values.ha.replicaCount }}
  {{ end }}
  selector:
    matchLabels:
      {{ $selectorLabels  | nindent 6 }}
  template:
    metadata:
      labels:
        {{ $selectorLabels  | nindent 8 }}
    spec:
      serviceAccountName: {{ $fullName }}
      containers:
        - name: {{ $fullName }}
          image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
          imagePullPolicy: {{ .Values.image.pullPolicy }}
          {{ with .Values.resources }}
          resources:
            {{ toYaml . | nindent 12 }}
          {{ end }}
          envFrom:
            - configMapRef:
                name: {{ $fullName }}
      {{ with .Values.nodeSelector }}
      nodeSelector:
        {{ toYaml . | nindent 8 }}
      {{ end }}

注意,我們在 deployment 中指定了一個自定義的 serviceAccountName,原因是我們需要爲該帳戶定義安全策略,以便可以從 pod 訪問 CRDs。否則,我們將使用默認 service account 訪問它們,但不具有訪問 CRDs 的權限。

當在 deployment 中使用 service account 時,一個帶有令牌的卷 (/var/run/secrets/ Kubernetes .io/serviceaccount/token) 將被掛載到 pod 上,以便它們可以在 Kubernetes API 中進行身份驗證。

爲了定義該 令牌 的安全策略,我們將使用 Kubernetes RBAC:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: {{ include "echoperator.fullname" . }}
  labels:
    {{- include "echoperator.labels" . | nindent 4 }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: {{ include "echoperator.fullname" . }}
  labels:
    {{ include "echoperator.labels" . | nindent 4 }}
rules:
  - apiGroups:
      - mmontes.io
    resources:
      - echos
      - scheduledechos
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - batch
    resources:
      - jobs
    verbs:
      - get
      - list
      - watch
      - create
  - apiGroups:
      - batch
    resources:
      - cronjobs
    verbs:
      - get
      - list
      - watch
      - create
      - update
  {{ if .Values.ha.enabled }}
  - apiGroups:
      - coordination.k8s.io
    resources:
      - leases
    verbs:
      - get
   
      - watch
      - create
      - update
  {{ end }}
---
{{ $fullName := include "echoperator.fullname" . }}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: {{ $fullName }}
  labels:
    {{ include "echoperator.labels" . | nindent 4 }}
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: {{ $fullName }}
subjects:
  - kind: ServiceAccount
    name: {{ $fullName }}
    namespace: {{ .Release.Namespace }}

最後,我們可以執行如下命令來部署 operator:

$ helm repo add mmontes [https://charts.mmontes-dev.duckdns.org](https://charts.mmontes-dev.duckdns.org/)
$ helm install echoperator mmontes/echoperator

創建 CRD

下面,讓我們看看 operator 如何創建 hello world Echo CRD,如下所示:

apiVersion: mmontes.io/v1alpha1
kind: Echo
metadata:
  name: hello-world
  namespace: default
spec:
  message: "Hola, 世界!"

總結

構建 Kubernetes operator 是一種量身定製的解決方案,只有在標準 Kubernetes 資源不能滿足應用程序特定領域的需求時才應該考慮這個解決方案。原因是,解決一個非常具體的問題需要大量的時間和對 Kubernetes 的瞭解。

然而,如果你認爲你的用例已經足夠先進,並且你已經決定承擔成本,那麼 Kubernetes 社區中有一些很棒的工具可以用:

轉自:

zhuanlan.zhihu.com/p/400890229

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/nv1tP9wVqwcMY5BKWdkv9A