構建高可用的 Kubernetes Operator
【導讀】本文介紹瞭如何使用 K8s Operator 提供的能力實現部署的微服務應用高可用。
從頭開始開發一個簡單的 Kubernetes Operator。
當一個任務在 Kubernetes 集羣中重複執行時,這可能是我們沒有充分利用 Kubernetes 提供的所有特性,因爲它的功能就是自動化任務的執行。通常,這些任務是由人工操作執行的,他們對系統應該如何運行、如何部署 應用程序 以及如何排除問題有深入的瞭解。
在創建一個 operator 之前,我們需要考慮標準方法,爲我們的應用程序選擇正確的 Kubernetes 資源。假設,我們的應用是有狀態的,StatefulSet 或許比 Deployment 更合適, 因爲它提供了額外的特性,您可以從中受益,比如惟一的網絡標識符、持久化存儲、有序部署 等等。
如果這種方法不合適,標準資源不能覆蓋我們應用程序的特定領域邏輯,我們將需要擴展 Kubernetes 功能來實現自動化並實現 Kubernetes operator。在本文中,我們將使用 client-go 庫構建 hello world operator,對其進行調整以實現高可用性,並使用 Helm 將其部署到 Kubernetes 集羣中。
什麼是 Kubernetes Operator
Operators 是 Kubernetes 的擴展,用於處理自定義資源 (CRD),對應用程序的特定用例進行處理。爲此,它們遵循 operator 模式,特別是控制循環,這是一個無限循環,確保集羣的狀態滿足用戶在 CRD 中聲明定義的要求。一些 operators 例子:
-
創建一個應用程序的 Deployment 並根據流量情況或其他性能指標對其 pod 副本進行自動伸縮,
-
獲取和恢復 StatefulSet 的備份,例如數據庫。
-
擴展標準資源以添加新特性並提供更大的靈活性。例如,Traefik 定義了 IngressRoute CRD 來擴展標準的 Ingress。在我們的例子當中,由於開發一個 operator 可能比較複雜,我們將構建一個非常簡單的示例,用於監視一些自定義資源的變化並創建一個 Job 任務。
Operator 架構
operator 的主要功能就是監視 kubernetes API 的變化,並作出響應確保集羣的狀態滿足 CRD 中聲明的需求。由於集羣中的事件數量可能是巨大的,對 operator 的合理設計將確保其高性能和可伸縮性:
-
clientset:與不同的 API Groups 交互的 客戶端。
-
informer:跟蹤 API 中的變化。
-
indexer:在內存中索引 API 對象以避免 API 調用。
-
workerqueue:以併發安全的方式處理與 API 對象相關的事件的 內存隊列。通過這種方式,我們確保不會在兩個不同的 worker 中同時處理同一個事件。
-
leaderelection:使用 Kubernetes 租賃對象的多個副本機制選擇領導者。
自定義資源的定義
在開發 operator 代碼之前,我們需要定義將要處理的 CRD。與其他 API 一樣,Kubernetes 允許您使用 OpenAPI 模式定義它的自定義 API 對象。代碼如下:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: echos.mmontes.io
spec:
group: mmontes.io
names:
kind: Echo
listKind: EchoList
plural: echos
singular: echo
shortNames:
- ec
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
message:
type: string
required:
- message
這個 CustomResourceDefinition 資源將由 Helm 在安裝我們的圖表時創建;我們只需要把它放在 CRDS 文件夾。我們將進一步瞭解 Helm chart 的細節。現在,我們可以在下面的代碼中定義 operator 使用的 Go 類型:
package v1alpha1
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen=true
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Echo struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
Spec EchoSpec `json:"spec"`
}
type EchoSpec struct {
Message string `json:"message"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type EchoList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []Echo `json:"items"`
}
對結構進行註釋,以生成與我們的 CRDs 及其深拷貝方法相關的 clientsets 和 informer。爲了實現這一點,我們將使用以下基於 http://k8s.io/code-generator 的腳本:
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(
cd "${SCRIPT_ROOT}"
ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator
)}
GO_PKG="github.com/mmontes11/echoperator/pkg"
bash "${CODEGEN_PKG}"/generate-groups.sh "all" \
${GO_PKG}/echo/v1alpha1/apis \
${GO_PKG} \
echo:v1alpha1 \
--go-header-file "${SCRIPT_ROOT}"/codegen/boilerplate.go.txt
控制器(controller)
我們需要配置的第一件事是與 Kubernetes API 服務的連接。這裏有兩個選擇:
-
** KUBECONFIG**:指向 kubeconfig 文件的環境變量。適合本地開發。
-
** InClusterConfig**:使用 pod 的 service account 令牌訪問 API,因此還需要正確配置 RBAC。稍後我們將對此進行詳細解釋。
一旦我們創建了連接,我們就可以實例化一個核心 Kubernetes clientset,並使用我們的 CRD clientset 將它們作爲依賴傳遞給 operator(也就是控制器):
var restConfig *rest.Config
var errKubeConfig error
if config.KubeConfig != "" {
restConfig, errKubeConfig = clientcmd.BuildConfigFromFlags("", config.KubeConfig)
} else {
restConfig, errKubeConfig = rest.InClusterConfig()
}
if errKubeConfig != nil {
logger.Fatal("error getting kubernetes config ", err)
}
kubeClientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
logger.Fatal("error getting kubernetes client ", err)
}
echov1alpha1ClientSet, err := echov1alpha1clientset.NewForConfig(restConfig)
if err != nil {
logger.Fatal("error creating echo client ", err)
}
ctrl := controller.New(
kubeClientSet,
echov1alpha1ClientSet,
config.Namespace,
logger.WithField("type", "controller"),
)
之後,我們可以配置控制器的 informers,以便開始接收有關我們感興趣的資源的事件。我們將使用 cache.SharedIndexInformer 來實現並將 informer 和 indexer 的職責集中在一個對象上。換句話說,此對象會維護自己更新的索引,並允許您配置事件處理程序,以便在資源更改時得到通知。唯一的要求是它需要在啓動時同步。
事件將被放入 workerqueue.RateLimiterInterface 隊列中。用於對將要處理的工作排隊,而不是在事件發生時立即執行。通過這種方式,我們可以確保一次只處理固定數量的對象,而且我們永遠不會在不同的 worker 中同時處理同一事件。以下是控制器代碼:
type Controller struct {
kubeClientSet kubernetes.Interface
echoInformer cache.SharedIndexInformer
jobInformer cache.SharedIndexInformer
scheduledEchoInformer cache.SharedIndexInformer
cronjobInformer cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
namespace string
logger log.Logger
}
func (c *Controller) Run(ctx context.Context, numWorkers int) error {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
c.logger.Info("starting controller")
c.logger.Info("starting informers")
for _, i := range []cache.SharedIndexInformer{
c.echoInformer,
c.scheduledEchoInformer,
c.jobInformer,
c.cronjobInformer,
} {
go i.Run(ctx.Done())
}
c.logger.Info("waiting for informer caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), []cache.InformerSynced{
c.echoInformer.HasSynced,
c.scheduledEchoInformer.HasSynced,
c.jobInformer.HasSynced,
c.cronjobInformer.HasSynced,
}...) {
err := errors.New("failed to wait for informers caches to sync")
utilruntime.HandleError(err)
return err
}
c.logger.Infof("starting %d workers", numWorkers)
for i := 0; i < numWorkers; i++ {
go wait.Until(func() {
c.runWorker(ctx)
}, time.Second, ctx.Done())
}
c.logger.Info("controller ready")
<-ctx.Done()
c.logger.Info("stopping controller")
return nil
}
func (c *Controller) addEcho(obj interface{}) {
c.logger.Debug("adding echo")
echo, ok := obj.(*echov1alpha1.Echo)
if !ok {
c.logger.Errorf("unexpected object %v", obj)
return
}
c.queue.Add(event{
eventType: addEcho,
newObj: echo.DeepCopy(),
})
}
func New(
kubeClientSet kubernetes.Interface,
echoClientSet echov1alpha1clientset.Interface,
namespace string,
logger log.Logger,
) *Controller {
echoInformerFactory := echoinformers.NewSharedInformerFactory(
echoClientSet,
10*time.Second,
)
echoInformer := echoInformerFactory.Mmontes().V1alpha1().Echos().Informer()
scheduledechoInformer := echoInformerFactory.Mmontes().V1alpha1().ScheduledEchos().Informer()
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClientSet, 10*time.Second)
jobInformer := kubeInformerFactory.Batch().V1().Jobs().Informer()
cronjobInformer := kubeInformerFactory.Batch().V1().CronJobs().Informer()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
ctrl := &Controller{
kubeClientSet: kubeClientSet,
echoInformer: echoInformer,
jobInformer: jobInformer,
scheduledEchoInformer: scheduledechoInformer,
cronjobInformer: cronjobInformer,
queue: queue,
namespace: namespace,
logger: logger,
}
echoInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.addEcho,
})
scheduledechoInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.addScheduledEcho,
UpdateFunc: ctrl.updateScheduledEcho,
})
return ctrl
}
worker
worker 的職責是通過執行任務來處理來自隊列的事件確保集羣處於所聲明狀態。爲此,worker 實現了一個無限控制循環,根據用戶的要求調節狀態。在我們的例子中,調節狀態意味着創建一個 Job 來響應一個添加自定義 Echo 資源的事件。
我們將使用 http://k8s.io/api,通過編程創建 Kubernetes 資源:
import (
echo "github.com/mmontes11/echoperator/pkg/echo"
echov1alpha1 "github.com/mmontes11/echoperator/pkg/echo/v1alpha1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func createJob(newEcho *echov1alpha1.Echo, namespace string) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: newEcho.ObjectMeta.Name,
Namespace: namespace,
Labels: make(map[string]string),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(
newEcho,
echov1alpha1.SchemeGroupVersion.WithKind(echo.EchoKind),
),
},
},
Spec: createJobSpec(newEcho.Name, namespace, newEcho.Spec.Message),
}
}
func createJobSpec(name, namespace, msg string) batchv1.JobSpec {
return batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
GenerateName: name + "-",
Namespace: namespace,
Labels: make(map[string]string),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: name,
Image: "busybox:1.33.1",
Command: []string{"echo", msg},
ImagePullPolicy: "IfNotPresent",
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
}
}
上面代碼實現以編程方式創建 Kubernetes 對象。
事件類型決定調用哪個方法以及在哪裏創建或更新相應的對象。值得注意的是,當控制器啓動時,出於一致性的原因,我們將收到添加事件,因此我們需要檢查是否已經創建了對象,以避免創建兩次。實現這一點的策略是從對象元數據中獲取一個鍵,並檢查它是否已經存在於索引中。代碼如下:
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextItem(ctx) {
}
}
func (c *Controller) processNextItem(ctx context.Context) bool {
obj, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(obj)
err := c.processEvent(ctx, obj)
if err == nil {
c.logger.Debug("processed item")
c.queue.Forget(obj)
} else if c.queue.NumRequeues(obj) < maxRetries {
c.logger.Errorf("error processing event: %v, retrying", err)
c.queue.AddRateLimited(obj)
} else {
c.logger.Errorf("error processing event: %v, max retries reached", err)
c.queue.Forget(obj)
utilruntime.HandleError(err)
}
return true
}
func (c *Controller) processEvent(ctx context.Context, obj interface{}) error {
event, ok := obj.(event)
if !ok {
c.logger.Error("unexpected event ", obj)
return nil
}
switch event.eventType {
case addEcho:
return c.processAddEcho(ctx, event.newObj.(*echov1alpha1.Echo))
case addScheduledEcho:
return c.processAddScheduledEcho(ctx, event.newObj.(*echov1alpha1.ScheduledEcho))
case updateScheduledEcho:
return c.processUpdateScheduledEcho(
ctx,
event.oldObj.(*echov1alpha1.ScheduledEcho),
event.newObj.(*echov1alpha1.ScheduledEcho),
)
}
return nil
}
func (c *Controller) processAddEcho(ctx context.Context, echo *echov1alpha1.Echo) error {
job := createJob(echo, c.namespace)
exists, err := resourceExists(job, c.jobInformer.GetIndexer())
if err != nil {
return fmt.Errorf("error checking job existence %v", err)
}
if exists {
c.logger.Debug("job already exists, skipping")
return nil
}
_, err = c.kubeClientSet.BatchV1().
Jobs(c.namespace).
Create(ctx, job, metav1.CreateOptions{})
return err
}
func resourceExists(obj interface{}, indexer cache.Indexer) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return false, fmt.Errorf("error getting key %v", err)
}
_, exists, err := indexer.GetByKey(key)
return exists, err
}
運行高可用的控制器
部署控制器之前最後一件事是根據配置確定使用哪種架構:單機架構還是高可用性架構。這正是 runner 的責任,但在本文中,我們將重點關注高可用性。高可用性意味着需運行控制器的多個副本,以確保如果 leader 宕機,另一個副本將獲得 leader 並開始運行控制循環來處理 CRDs。
Kubernetes 是爲解決類似這樣的分佈式系統問題而設計的,並提供開箱即用的解決方案。在這種情況下,一個租賃(lease)對象會負責這件事;該對象可以被看作是一個分佈式互斥鎖,它只能有一個副本,根據該副本來確定 leader。這看起來很好,但是… Kubernetes 如何高效地做到這一點呢?
Kubernetes 使用 etcd 分佈式鍵 - 值存儲,與其他鍵 - 值存儲不同,它提供了一種 watching keys 機制。能及時發現對象的變更,而不需要進行長時間的輪詢或消耗額外的網絡資源。此外,client-go 提供了 leaderelection 包,它在底層使用租約對象提供了一個抽象。下面是代碼:
type Runner struct {
ctrl *controller.Controller
clientset *kubernetes.Clientset
config config.Config
logger log.Logger
}
func (r *Runner) Start(ctx context.Context) {
if r.config.HA.Enabled {
r.logger.Info("starting HA controller")
r.runHA(ctx)
} else {
r.logger.Info("starting standalone controller")
r.runSingleNode(ctx)
}
}
func (r *Runner) runSingleNode(ctx context.Context) {
if err := r.ctrl.Run(ctx, r.config.NumWorkers); err != nil {
r.logger.Fatal("error running controller ", err)
}
}
func (r *Runner) runHA(ctx context.Context) {
if r.config.HA == (config.HA{}) || !r.config.HA.Enabled {
r.logger.Fatal("HA config not set or not enabled")
}
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: r.config.HA.LeaseLockName,
Namespace: r.config.Namespace,
},
Client: r.clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: r.config.HA.NodeId,
},
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: r.config.HA.LeaseDuration,
RenewDeadline: r.config.HA.RenewDeadline,
RetryPeriod: r.config.HA.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
r.logger.Info("start leading")
r.runSingleNode(ctx)
},
OnStoppedLeading: func() {
r.logger.Info("stopped leading")
},
OnNewLeader: func(identity string) {
if identity == r.config.HA.NodeId {
r.logger.Info("obtained leadership")
return
}
r.logger.Infof("leader elected: '%s'", identity)
},
},
})
}
部署到 Kubernetes 集羣
我們的 operator 代碼已經準備好部署了。下一步將創建一個 Helm chart。首先將 values.yml 用於配置 Kubernetes 資源。代碼:
nameOverride: ""
fullnameOverride: ""
image:
repository: mmontes11/echoperator
pullPolicy: IfNotPresent
tag: v0.0.1
env: production
logLevel: info
numWorkers: 4
ha:
enabled: true
leaderElection:
leaseDurationSeconds: 15
renewDeadlineSeconds: 10
retryPeriodSeconds: 2
replicaCount: 3
monitoring:
enabled: true
path: /metrics
port: 2112
namespace: monitoring
interval: 10s
labels:
release: monitoring
resources: {}
nodeSelector: {}
如您所見,有一個用於配置高可用性的對象 ha。完成這些之後,現在我們可以在 configmap 中創建與高可用性相關的鍵了,如下所示:
{{ $fullName := include "echoperator.fullname" . }}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ $fullName }}
labels:
{{ include "echoperator.labels" . | nindent 4 }}
data:
NAMESPACE: {{ .Release.Namespace }}
NUM_WORKERS: {{ .Values.numWorkers | quote }}
HA_ENABLED: {{ .Values.ha.enabled | quote }}
{{ if .Values.ha.enabled }}
HA_LEASE_LOCK_NAME: {{ $fullName }}
HA_LEASE_DURATION_SECONDS: {{ .Values.ha.leaderElection.leaseDurationSeconds | quote }}
HA_RENEW_DEADLINE_SECONDS: {{ .Values.ha.leaderElection.renewDeadlineSeconds | quote }}
HA_RETRY_PERIOD_SECONDS: {{ .Values.ha.leaderElection.retryPeriodSeconds | quote }}
{{ end }}
METRICS_ENABLED: {{ .Values.monitoring.enabled | quote }}
{{ if .Values.monitoring.enabled }}
METRICS_PATH: {{ .Values.monitoring.path }}
METRICS_PORT: {{ .Values.monitoring.port | quote }}
{{ end }}
ENV: {{ .Values.env }}
LOG_LEVEL: {{ .Values.logLevel }}
如果要高可用,deployment 將設置 replicas 鍵,並引用這個 configmap,將其鍵作爲環境變量導出到 pod 中。代碼如下:
{{ $fullName := include "echoperator.fullname" . }}
{{ $selectorLabels := include "echoperator.selectorLabels" . }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ $fullName }}
labels:
{{ include "echoperator.labels" . | nindent 4 }}
spec:
{{ if .Values.ha.enabled}}
replicas: {{ .Values.ha.replicaCount }}
{{ end }}
selector:
matchLabels:
{{ $selectorLabels | nindent 6 }}
template:
metadata:
labels:
{{ $selectorLabels | nindent 8 }}
spec:
serviceAccountName: {{ $fullName }}
containers:
- name: {{ $fullName }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
{{ with .Values.resources }}
resources:
{{ toYaml . | nindent 12 }}
{{ end }}
envFrom:
- configMapRef:
name: {{ $fullName }}
{{ with .Values.nodeSelector }}
nodeSelector:
{{ toYaml . | nindent 8 }}
{{ end }}
注意,我們在 deployment 中指定了一個自定義的 serviceAccountName,原因是我們需要爲該帳戶定義安全策略,以便可以從 pod 訪問 CRDs。否則,我們將使用默認 service account 訪問它們,但不具有訪問 CRDs 的權限。
當在 deployment 中使用 service account 時,一個帶有令牌的卷 (/var/run/secrets/ Kubernetes .io/serviceaccount/token) 將被掛載到 pod 上,以便它們可以在 Kubernetes API 中進行身份驗證。
爲了定義該 令牌 的安全策略,我們將使用 Kubernetes RBAC:
-
serviceAccount:將在集羣中授予權限的資源。
-
ClusterRole:集羣範圍角色,可以對某些資源執行某些操作。
-
ClusterRoleBinding:爲 serviceaccount 分配角色。
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "echoperator.fullname" . }}
labels:
{{- include "echoperator.labels" . | nindent 4 }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "echoperator.fullname" . }}
labels:
{{ include "echoperator.labels" . | nindent 4 }}
rules:
- apiGroups:
- mmontes.io
resources:
- echos
- scheduledechos
verbs:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- get
- list
- watch
- create
- apiGroups:
- batch
resources:
- cronjobs
verbs:
- get
- list
- watch
- create
- update
{{ if .Values.ha.enabled }}
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- watch
- create
- update
{{ end }}
---
{{ $fullName := include "echoperator.fullname" . }}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ $fullName }}
labels:
{{ include "echoperator.labels" . | nindent 4 }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ $fullName }}
subjects:
- kind: ServiceAccount
name: {{ $fullName }}
namespace: {{ .Release.Namespace }}
最後,我們可以執行如下命令來部署 operator:
$ helm repo add mmontes [https://charts.mmontes-dev.duckdns.org](https://charts.mmontes-dev.duckdns.org/)
$ helm install echoperator mmontes/echoperator
創建 CRD
下面,讓我們看看 operator 如何創建 hello world Echo CRD,如下所示:
apiVersion: mmontes.io/v1alpha1
kind: Echo
metadata:
name: hello-world
namespace: default
spec:
message: "Hola, 世界!"
總結
構建 Kubernetes operator 是一種量身定製的解決方案,只有在標準 Kubernetes 資源不能滿足應用程序特定領域的需求時才應該考慮這個解決方案。原因是,解決一個非常具體的問題需要大量的時間和對 Kubernetes 的瞭解。
然而,如果你認爲你的用例已經足夠先進,並且你已經決定承擔成本,那麼 Kubernetes 社區中有一些很棒的工具可以用:
-
client-go
-
kubebuilder
-
operator-sdk 作爲另一種選擇,你可以考慮使用 GitOps 和標準資源來對你的 deployment 進行自動化。
轉自:
zhuanlan.zhihu.com/p/400890229
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/nv1tP9wVqwcMY5BKWdkv9A