Kubernetes CRI 分析 - kubelet 創建 Pod 分析

作者:良凱爾,雲原生愛好者,目前在某互聯網大廠,主要負責 Kubernetes 相關的研發工作。

kubelet CRI 創建 Pod 調用流程

下面以 kubelet dockershi 創建 Pod 調用流程爲例做一下分析。

kubelet 通過調用 dockershim 來創建並啓動容器,而 dockershim 則調用 Docker 來創建並啓動容器,並調用 CNI 來構建 Pod 網絡。

kubelet dockershim 創建 Pod 調用流程圖示

dockershim 屬於 kubelet 內置 CRI shim,其餘 remote CRI shim 的創建 Pod 調用流程其實與 dockershim 調用基本一致,只不過是調用了不同的容器引擎來操作容器,但一樣由 CRI shim 調用 CNI 來構建 Pod 網絡。

下面開始詳細的源碼分析。

直接看到 kubeGenericRuntimeManager 的 SyncPod 方法,調用 CRI 創建 Pod 的邏輯將在該方法裏觸發發起。

從該方法代碼也可以看出,kubelet 創建一個 Pod 的邏輯爲:

  1. 先創建並啓動 Pod sandbox 容器,並構建好 Pod 網絡;

  2. 創建並啓動 ephemeral containers;

  3. 創建並啓動 init containers;

  4. 最後創建並啓動 normal containers(即普通業務容器)。

這裏對調用 m.createPodSandbox 來創建 Pod sandbox 進行分析,m.startContainer 等調用分析可以參照該分析自行進行分析,調用流程幾乎一致。

// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
 ...
 // Step 4: Create a sandbox for the pod if necessary.
 podSandboxID := podContainerChanges.SandboxID
 if podContainerChanges.CreateSandbox {
  var msg string
  var err error

  klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
  createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
  result.AddSyncResult(createSandboxResult)
  podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
  ...
}

m.createPodSandbox

m.createPodSandbox 方法主要是調用 m.runtimeService.RunPodSandbox

runtimeService 即 RemoteRuntimeService,實現了 CRI shim 客戶端 - 容器運行時接口 RuntimeService interface,持有與 CRI shim 容器運行時服務端通信的客戶端。所以調用 m.runtimeService.RunPodSandbox,實際上等於調用了 CRI shim 服務端的 RunPodSandbox 方法,來進行 Pod sandbox 的創建。

// pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
 podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
 if err != nil {
  message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
  klog.Error(message)
  return "", message, err
 }

 // Create pod logs directory
 err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
 if err != nil {
  message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
  klog.Errorf(message)
  return "", message, err
 }

 runtimeHandler := ""
 if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
  runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
  if err != nil {
   message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
   return "", message, err
  }
  if runtimeHandler != "" {
   klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
  }
 }

 podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
 if err != nil {
  message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
  klog.Error(message)
  return "", message, err
 }

 return podSandBoxID, "", nil
}

m.runtimeService.RunPodSandbox

m.runtimeService.RunPodSandbox 方法,會調用 r.runtimeClient.RunPodSandbox,即利用 CRI shim 客戶端,調用 CRI shim 服務端來進行 Pod sandbox 的創建。

分析到這裏,kubelet 中的 CRI 相關調用就分析完畢了,接下來將會進入到 CRI shim(以 kubelet 內置 CRI shim-dockershim 爲例)裏進行創建 Pod sandbox 的分析。

// pkg/kubelet/remote/remote_runtime.go
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
 // Use 2 times longer timeout for sandbox operation (4 mins by default)
 // TODO: Make the pod sandbox timeout configurable.
 ctx, cancel := getContextWithTimeout(r.timeout * 2)
 defer cancel()

 resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
  Config:         config,
  RuntimeHandler: runtimeHandler,
 })
 if err != nil {
  klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
  return "", err
 }

 if resp.PodSandboxId == "" {
  errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
  klog.Errorf("RunPodSandbox failed: %s", errorMessage)
  return "", errors.New(errorMessage)
 }

 return resp.PodSandboxId, nil
}

r.runtimeClient.RunPodSandbox

接下來將會以 dockershim 爲例,進入到 CRI shim 來進行創建 Pod sandbox 的分析。

前面 kubelet 調用 r.runtimeClient.RunPodSandbox,會進入到 dockershim 下面的 RunPodSandbox 方法。

創建 Pod sandbox 主要有 5 個步驟:

  1. 調用 docker,拉取 pod sandbox 的鏡像;

  2. 調用 docker,創建 pod sandbox 容器;

  3. 創建 pod sandbox 的 Checkpoint;

  4. 調用 docker,啓動 pod sandbox 容器;

  5. 調用 CNI,給 pod sandbox 構建網絡。

// pkg/kubelet/dockershim/docker_sandbox.go
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
// For docker, PodSandbox is implemented by a container holding the network
// namespace for the pod.
// Note: docker doesn't use LogDirectory (yet).
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
 config := r.GetConfig()

 // Step 1: Pull the image for the sandbox.
 image := defaultSandboxImage
 podSandboxImage := ds.podSandboxImage
 if len(podSandboxImage) != 0 {
  image = podSandboxImage
 }

 // NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.
 // see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
 // Only pull sandbox image when it's not present - v1.PullIfNotPresent.
 if err := ensureSandboxImageExists(ds.client, image); err != nil {
  return nil, err
 }

 // Step 2: Create the sandbox container.
 if r.GetRuntimeHandler() != "" && r.GetRuntimeHandler() != runtimeName {
  return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler())
 }
 createConfig, err := ds.makeSandboxDockerConfig(config, image)
 if err != nil {
  return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
 }
 createResp, err := ds.client.CreateContainer(*createConfig)
 if err != nil {
  createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
 }

 if err != nil || createResp == nil {
  return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
 }
 resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}

 ds.setNetworkReady(createResp.ID, false)
 defer func(e *error) {
  // Set networking ready depending on the error return of
  // the parent function
  if *e == nil {
   ds.setNetworkReady(createResp.ID, true)
  }
 }(&err)

 // Step 3: Create Sandbox Checkpoint.
 if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
  return nil, err
 }

 // Step 4: Start the sandbox container.
 // Assume kubelet's garbage collector would remove the sandbox later, if
 // startContainer failed.
 err = ds.client.StartContainer(createResp.ID)
 if err != nil {
  return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
 }

 // Rewrite resolv.conf file generated by docker.
 // NOTE: cluster dns settings aren't passed anymore to docker api in all cases,
 // not only for pods with host network: the resolver conf will be overwritten
 // after sandbox creation to override docker's behaviour. This resolv.conf
 // file is shared by all containers of the same pod, and needs to be modified
 // only once per pod.
 if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
  containerInfo, err := ds.client.InspectContainer(createResp.ID)
  if err != nil {
   return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
  }

  if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
   return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
  }
 }

 // Do not invoke network plugins if in hostNetwork mode.
 if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
  return resp, nil
 }

 // Step 5: Setup networking for the sandbox.
 // All pod networking is setup by a CNI plugin discovered at startup time.
 // This plugin assigns the pod ip, sets up routes inside the sandbox,
 // creates interfaces etc. In theory, its jurisdiction ends with pod
 // sandbox networking, but it might insert iptables rules or open ports
 // on the host as well, to satisfy parts of the pod spec that aren't
 // recognized by the CNI standard yet.
 cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
 networkOptions := make(map[string]string)
 if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
  // Build DNS options.
  dnsOption, err := json.Marshal(dnsConfig)
  if err != nil {
   return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err)
  }
  networkOptions["dns"] = string(dnsOption)
 }
 err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
 if err != nil {
  errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)}

  // Ensure network resources are cleaned up even if the plugin
  // succeeded but an error happened between that success and here.
  err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
  if err != nil {
   errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err))
  }

  err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod)
  if err != nil {
   errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err))
  }

  return resp, utilerrors.NewAggregate(errList)
 }

 return resp, nil
}
接下來以ds.client.CreateContainer調用爲例,分析下dockershim是如何調用docker的。

ds.client.CreateContainer
主要是調用d.client.ContainerCreate。

// pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
 ctx, cancel := d.getTimeoutContext()
 defer cancel()
 // we provide an explicit default shm size as to not depend on docker daemon.
 // TODO: evaluate exposing this as a knob in the API
 if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
  opts.HostConfig.ShmSize = defaultShmSize
 }
 createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
 if ctxErr := contextError(ctx); ctxErr != nil {
  return nil, ctxErr
 }
 if err != nil {
  return nil, err
 }
 return &createResp, nil
}

接下來以 ds.client.CreateContainer 調用爲例,分析下 dockershim 是如何調用 Docker 的。

ds.client.CreateContainer 主要是調用 d.client.ContainerCreate

// pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
 ctx, cancel := d.getTimeoutContext()
 defer cancel()
 // we provide an explicit default shm size as to not depend on docker daemon.
 // TODO: evaluate exposing this as a knob in the API
 if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
  opts.HostConfig.ShmSize = defaultShmSize
 }
 createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
 if ctxErr := contextError(ctx); ctxErr != nil {
  return nil, ctxErr
 }
 if err != nil {
  return nil, err
 }
 return &createResp, nil
}

d.client.ContainerCreate 構建請求參數,向 Docker 指定的 url 發送 http 請求,創建 Pod sandbox 容器。

// vendor/github.com/docker/docker/client/container_create.go
// ContainerCreate creates a new container based in the given configuration.
// It can be associated with a name, but it's not mandatory.
func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) {
 var response container.ContainerCreateCreatedBody

 if err := cli.NewVersionError("1.25""stop timeout"); config != nil && config.StopTimeout != nil && err != nil {
  return response, err
 }

 // When using API 1.24 and under, the client is responsible for removing the container
 if hostConfig != nil && versions.LessThan(cli.ClientVersion()"1.25") {
  hostConfig.AutoRemove = false
 }

 query := url.Values{}
 if containerName != "" {
  query.Set("name", containerName)
 }

 body := configWrapper{
  Config:           config,
  HostConfig:       hostConfig,
  NetworkingConfig: networkingConfig,
 }

 serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
 defer ensureReaderClosed(serverResp)
 if err != nil {
  return response, err
 }

 err = json.NewDecoder(serverResp.body).Decode(&response)
 return response, err
}
// vendor/github.com/docker/docker/client/request.go
// post sends an http request to the docker API using the method POST with a specific Go context.
func (cli *Client) post(ctx context.Context, path string, query url.Values, obj interface{}, headers map[string][]string) (serverResponse, error) {
 body, headers, err := encodeBody(obj, headers)
 if err != nil {
  return serverResponse{}, err
 }
 return cli.sendRequest(ctx, "POST", path, query, body, headers)
}

總結

CRI 架構圖

在 CRI 之下,包括兩種類型的容器運行時的實現:

kubelet 調用 CRI 創建 Pod 流程分析

kubelet 創建一個 Pod 的邏輯爲:

  1. 先創建並啓動 pod sandbox 容器,並構建好 Pod 網絡;

  2. 創建並啓動 ephemeral containers;

  3. 創建並啓動 init containers;

  4. 最後創建並啓動 normal containers(即普通業務容器)。

下面以 kubelet dockershim 創建 Pod 調用流程爲例做一下分析。

kubelet 通過調用 dockershim 來創建並啓動容器,而 dockershim 則調用 Docker 來創建並啓動容器,並調用 CNI 來構建 Pod 網絡。

kubelet dockershim 創建 Pod 調用流程圖示

dockershim 屬於 kubelet 內置 CRI shim,其餘 remote CRI shim 的創建 Pod 調用流程其實與 dockershim 調用基本一致,只不過是調用了不同的容器引擎來操作容器,但一樣由 CRI shim 調用 CNI 來構建 Pod 網絡。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/6bCF6Eb72Aqag7BlRA7moQ