vscode 探索一波 Kubesphere 後端架構

本章我們學習在vscode上的remote container插件基礎上,嘗試debug 和學習kubesphere後端模塊架構。

前提

配置 launch 文件

$ cat .vscode/launch.json
{
    // 使用 IntelliSense 瞭解相關屬性。 
    // 懸停以查看現有屬性的描述。
    // 欲瞭解更多信息,請訪問: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Launch Package",
            "type": "go",
            "request": "launch",
            "mode": "auto",
            "program": "${workspaceFolder}/cmd/ks-apiserver/apiserver.go"
        }
    ]
}

ks-apiserver 調試依賴文件

在相對路徑cmd/ks-apiserver/下配置kubesphere.yaml

首先,查看集羣之中的cm配置文件:

$ kubectl -n kubesphere-system get cm kubesphere-config  -oyaml
apiVersion: v1
data:
  kubesphere.yaml: |
    authentication:
      authenticateRateLimiterMaxTries: 10
      authenticateRateLimiterDuration: 10m0s
      loginHistoryRetentionPeriod: 168h
      maximumClockSkew: 10s
      multipleLogin: True
      kubectlImage: kubesphere/kubectl:v1.20.0
      jwtSecret: "Xtc8ZWUf9f3cJN89bglrTJhfUPMZR87d"
      oauthOptions:
        clients:
        - name: kubesphere
          secret: kubesphere
          redirectURIs:
          - '*'
    network:
      ippoolType: none
    monitoring:
      endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090
      enableGPUMonitoring: false
    gpu:
      kinds:
      - resourceName: nvidia.com/gpu
        resourceType: GPU
        default: True
    notification:
      endpoint: http://notification-manager-svc.kubesphere-monitoring-system.svc:19093
    kubeedge:
      endpoint: http://edge-watcher.kubeedge.svc/api/
    gateway:
      watchesPath: /var/helm-charts/watches.yaml
      namespace: kubesphere-controls-system
kind: ConfigMap
metadata:
  name: kubesphere-config
  namespace: kubesphere-system

以上是我安裝的或ks默認激活的幾個組件,將其中的yaml文件拷貝過來,並加上kubeconfig文件路徑的配置。

之所以需要添加雲主機上的kubeconfig文件,主要是因爲我們是遠程主機去debug, 而容器中會用到inclusterconfig就不需要添加了。

$ cat ./cmd/ks-apiserver/kubesphere.yaml
kubernetes:
  kubeconfig: "/root/.kube/config"
  master: https://192.168.88.6:6443
  $qps: 1e+06
  burst: 1000000
authentication:
  authenticateRateLimiterMaxTries: 10
  authenticateRateLimiterDuration: 10m0s
  loginHistoryRetentionPeriod: 168h
  maximumClockSkew: 10s
  multipleLogin: True
  kubectlImage: kubesphere/kubectl:v1.20.0
  jwtSecret: "Xtc8ZWUf9f3cJN89bglrTJhfUPMZR87d"
  oauthOptions:
    clients:
    - name: kubesphere
      secret: kubesphere
      redirectURIs:
      - '*'
network:
  ippoolType: none
monitoring:
  endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090
  enableGPUMonitoring: false
gpu:
  kinds:
  - resourceName: nvidia.com/gpu
    resourceType: GPU
    default: True
notification:
  endpoint: http://notification-manager-svc.kubesphere-monitoring-system.svc:19093
kubeedge:
  endpoint: http://edge-watcher.kubeedge.svc/api/
gateway:
  watchesPath: /var/helm-charts/watches.yaml
  namespace: kubesphere-controls-system

現在就可以通過F5來啓動debug了。

debug之前,你可能會問,這個配置文件爲啥要放在/cmd/ks-apiserver/kubesphere.yaml?

我們先來探索一波ks-apiserver的運行邏輯。

啓動 ks-apiserver

查看cmd/ks-apiserver/app/server.go的邏輯:

// Load configuration from file
conf, err := apiserverconfig.TryLoadFromDisk()

TryLoadFromDisk的邏輯如下:

viper.SetConfigName(defaultConfigurationName) // defaultConfigurationName = "kubesphere"
// AddConfigPath adds a path for Viper to search for the config file in.
// Can be called multiple times to define multiple search paths.
viper.AddConfigPath(defaultConfigurationPath) // defaultConfigurationPath = "/etc/kubesphere"
// Load from current working directory, only used for debugging
viper.AddConfigPath(".")
// Load from Environment variables
// E.g. if your prefix is "spf", the env registry will look for env variables that start with "SPF_".
viper.SetEnvPrefix("kubesphere")
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
// 單步調試,這一步讀取的文件路徑是  jww.INFO.Println("Searching for config in ", v.configPaths)
// v.configPaths:["/etc/kubesphere","/root/go/src/kubesphere.io/kubesphere/cmd/ks-apiserver"]
if err := viper.ReadInConfig(); err != nil {
    if _, ok := err.(viper.ConfigFileNotFoundError); ok {
      return nil, err
    } else {
      return nil, fmt.Errorf("error parsing configuration file %s", err)
    }
}
conf := New() // 獲取各個組件的默認配置
if err := viper.Unmarshal(conf); err != nil {
    return nil, err
}
return conf, nil

上面的註釋,解釋了需要在指定路徑下添加kubesphere.yamldebug

我們接着往下擼,這裏使用cobra.Command來調用:

func Run(s *options.ServerRunOptions, ctx context.Context) error {
// NewAPIServer 通過給定的配置啓動apiserver實例,綁定實例化的各個模塊的client,綁定RuntimeClient和RuntimeCache
apiserver, err := s.NewAPIServer(ctx.Done())
if err != nil {
return err
}
// PrepareRun 主要是使用resful-go集成kapis, 也就是代理服務
err = apiserver.PrepareRun(ctx.Done())
if err != nil {
return nil
}
// Start listening, 開啓監聽
return apiserver.Run(ctx)
}

上面的註釋已經闡述, s.NewAPIServer(ctx.Done())主要是啓動apiserver實例, 集成了apis功能,PrepareRun 主要是使用resful-go集成kapis, 也就是代理服務。下面分開闡述說明。

NewAPIServer

client實例化完畢後會啓動一個server來響應請求:

...
server := &http.Server{
    Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),
}
if s.GenericServerRunOptions.SecurePort != 0 {
    certificate, err := tls.LoadX509KeyPair(s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey)
    if err != nil {
        return nil, err
    }
    server.TLSConfig = &tls.Config{
        Certificates: []tls.Certificate{certificate},
    }
    server.Addr = fmt.Sprintf(":%d", s.GenericServerRunOptions.SecurePort)
}
sch := scheme.Scheme
if err := apis.AddToScheme(sch); err != nil {
    klog.Fatalf("unable add APIs to scheme: %v", err)
}
...

注意這一步apis.AddToScheme(sch), 看下怎麼注入apis接口的:

// AddToSchemes may be used to add all resources defined in the project to a Scheme
var AddToSchemes runtime.SchemeBuilder
// AddToScheme adds all Resources to the Scheme
func AddToScheme(s *runtime.Scheme) error {
return AddToSchemes.AddToScheme(s)
}

AddToSchemes這個類型的是[]func(*Scheme) error的別名,只需要在package apis下的接口文件中實現init()方法來導入實現的版本API,就可以注入對應的API,舉個例子:

$ cat pkg/apis/addtoscheme_dashboard_v1alpha2.gopackage apisimport monitoringdashboardv1alpha2 "kubesphere.io/monitoring-dashboard/api/v1alpha2"func init() {  AddToSchemes = append(AddToSchemes, monitoringdashboardv1alpha2.SchemeBuilder.AddToScheme)}

也就是,我們開發的插件集成的版本化API,必須集成xxx.SchemeBuilder.AddToScheme功能。

至此,所有子模塊對應的client已經與這個apiserver綁定,apis也注入了。

PrepareRun

我們探討下PrepareRun是怎麼註冊kapis以及綁定handler的。

主要是通過restful-go框架來實現的,前文提到該框架是使用containerhold住擁有特定GVRwebservice, 而一個webserver可以綁定多個routercontainer或者webserver還能自定義攔截器,也就是調用filter方法。

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {  s.container = restful.NewContainer()  // 添加請求Request日誌攔截器  s.container.Filter(logRequestAndResponse)  s.container.Router(restful.CurlyRouter{})    // RecoverHandler changes the default function (logStackOnRecover) to be called  // when a panic is detected. DoNotRecover must be have its default value (=false).  s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {    logStackOnRecover(panicReason, httpWriter)  })    // 註冊、綁定kapi路由和回調函數,發生在所有informers啓動之前  s.installKubeSphereAPIs()    // 註冊metrics指標: ks_server_request_total、ks_server_request_duration_seconds  // 綁定API /kapis/metrics以及handle, 主要是調用prometheus collector集成,在pkg/utils/metrics/metrics.go  s.installMetricsAPI()  // 過濾無效監控請求  s.container.Filter(monitorRequest)  for _, ws := range s.container.RegisteredWebServices() {    klog.V(2).Infof("%s", ws.RootPath())  }    // 將container綁定給s.Server.Handler  s.Server.Handler = s.container    // 添加k8s api資源、auditing資源、登錄驗證、憑據驗證等各個調用鏈的攔截器  s.buildHandlerChain(stopCh)  return nil}

上面主要使用restful-go框架給s.Server.handler綁定了一個container, 添加了各種攔截器。

s.installKubeSphereAPIS()這一步安裝GVR綁定了kapis代理,具體是這樣實現的:

// 調用各api組的AddToContainer方法來向container註冊api:
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
// 詳細來說,各個組件實現的AddToContainer方法則實現了帶組和版本信息子路由和handler的綁定:// 首先給router綁定/kapis的父router。ws := runtime.NewWebService(GroupVersion)// 給子路由綁定回調函數
ws.Route(ws.GET("/kubesphere")
.To(h.handleKubeSphereMetricsQuery)
.Doc("Get platform-level metric data.")
.Metadata(restfulspec.KeyOpenAPITags, []string{constants.KubeSphereMetricsTag})
.Writes(model.Metrics{})
.Returns(http.StatusOK, respOK, model.Metrics{}))
.Produces(restful.MIME_JSON)

至此,ks-apiserver就啓動了,我們做一下簡單總結:

假定遠程雲主機的服務已經啓動,服務端口在9090, 下面介紹下各個模塊的邏輯。

顯然,我們只需要關注各模塊的AddToContainer方法就行了。

iam.kubesphere.io

pkg/kapis/iam/v1alpha2/register.go

從代碼註釋來看,這個模塊管理着usersclustermembersglobalrolesclusterrolesworkspacerolesrolesworkspaces groupsworkspace membersdevops members等賬號角色的CRUD

現在我們可以在handler中打上斷點,去請求這些api

$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users"
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/clustermembers"
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users/admin/globalroles"...

oauth.kubesphere.io

pkg/kapis/oauth/register.go

主要進行身份驗證和授權,通過第三方服務平臺登錄或退出,感興趣可以深入瞭解。

順帶一提,oauth裏集成了/kapis/iam.kubesphere.io/v1alpha2/login接口,我們嘗試請求一個login API:

$ curl -X POST -H "Content-Type: application/json" -d '{"username":"admin","password":"P@88w0rd"}' "http://localhost:9090/kapis/iam.kubesphere.io/v1alpha2/login"

最後會通過PasswordVerify(user.Spec.EncryptedPassword, password)來比對驗證的密碼是否正確來返回access_token

kubeedge.kubesphere.io

pkg/kapis/kubeedge/v1alpha1/register.go

代碼裏面使用的代理轉發請求:

func AddToContainer(container *restful.Container, endpoint string) error {  proxy, err := generic.NewGenericProxy(endpoint, GroupVersion.Group, GroupVersion.Version)  { 
   if err != nil { 
      return nil  
   }  
   return proxy.AddToContainer(container)
 }

也就是 kapis/kubeedge.kubesphere.io的請求會轉發到http://edge-watcher.kubeedge.svc/api/,也就是kubeedge下的service,相關的接口集成在那裏。

關於邊緣計算平臺的集成,順帶一提,如果需要支撐不止一家的平臺,可以集成一個edge-shim的適配器,大概需要集成以下幾個接口:

notification.kubesphere.io

pkg/kapis/notification/v2beta1/register.go

這個組下的api主要實現了notification的全局或租戶級別的configreceivers資源的CRUD

config 資源

用於配置對接通知渠道相關參數的一些配置,分爲全局的和租戶級別的config資源;

reciever 資源

用於配置接收者的一些配置信息,區分全局的和租戶級別的接收者;

我們挑選一個回調函數進行剖析:

func (h *handler) ListResource(req *restful.Request, resp *restful.Response) {  
   // 租戶或用戶的名稱  
   user := req.PathParameter("user") 
   // 資源類型,configs/recievers/secrets  
   resource := req.PathParameter("resources") 
   // 通知渠道 dingtalk/slack/email/webhook/wechat 
   subresource := req.QueryParameter("type") 
   q := query.ParseQueryParameter(req)
   if !h.operator.IsKnownResource(resource, subresource) {   
      api.HandleBadRequest(resp, req, servererr.New("unknown resource type %s/%s", resource, subresource))
      return  
   }  
   objs, err := h.operator.List(user, resource, subresource, q)
   handleResponse(req, resp, objs, err)
 }

我們看下list object的邏輯:

// List objects
func (o *operator) List(user, resource, subresource string, q *query.Query) (*api.ListResult, error) { 
 if len(q.LabelSelector) > 0 {
     q.LabelSelector = q.LabelSelector + ","  
 }
 filter := ""  // 如果沒有給定租戶的名稱,則獲取全局的對象
 if user == "" {   
      if isConfig(o.GetObject(resource)) {   
      // type=default對config資源來說是全局的 
           filter = "type=default"   
      } else {      
        // type=global對receiever資源來說是全局的     
        filter = "type=global"   
      } 
 } else {  
 // 否則就給過濾器綁定租戶名稱   
    filter = "type=tenant,user=" + user  
  }  
  // 組裝過濾標籤 
    q.LabelSelector = q.LabelSelector + filter
     ... 
  // 通過過濾標籤獲取cluster或者namespace下的指定資源
    res, err := o.resourceGetter.List(resource, ns, q) 
    if err != nil {  
        return nil, err 
    } 
      if subresource == "" || resource == Secret { 
         return res, nil
      }  
      results := &api.ListResult{}  
      ...
}

這樣一來,就實現了租戶級別的通知告警CR配置的CRUD,這些CR是這麼分類的:

那麼configreciever怎麼相互綁定、告警是如何通過渠道給租戶發消息的?

https://github.com/kubesphere/notification-manager/blob/master/pkg/webhook/v1/handler.go#L45

https://github.com/kubesphere/notification-manager/blob/master/pkg/notify/notify.go#L66

notification-manager簡稱nm,我這裏斷章取義地簡要回答一下。

功能方面:

告警到通知的流程:

monitoring.kubesphere.io

pkg/kapis/monitoring/v1alpha3/register.go

將監控指標分爲平臺級、節點級、workspacesnamespacespods等級別,不僅可以獲取總的統計,還能獲取nodes/namespaces/workspaces下的所有pods/containers等監控指標。

我們查看回調函數,以handleNamedMetricsQuery爲例分析:

代碼如下:

func (h handler) handleNamedMetricsQuery(resp *restful.Response, q queryOptions) {
  var res model.Metrics
  var metrics []string
  // q.namedMetrics 是一組按照監控指標級別分類好的擁有promsql expr定義的完整指標名數組
  // 監控指標級別分類是根據 monitoring.Levelxxx在上一個棧裏細分的,i.e: monitoring.LevelPod
  for _, metric := range q.namedMetrics {
    if strings.HasPrefix(metric, model.MetricMeterPrefix) {
      // skip meter metric
      continue
    }
    // 根據請求參數中的指標名來過濾
    ok, _ := regexp.MatchString(q.metricFilter, metric)
    if ok {
      metrics = append(metrics, metric)
    }
  }
  if len(metrics) == 0 {
    resp.WriteAsJson(res)
    return
  }
  // 判斷是否是範圍查詢還是實時查詢,繼續調用相關函數
  // 主要還是用prometheus client去查詢promsql, 邊緣節點的指標目前通過metrics server來查詢
  if q.isRangeQuery() {
    res = h.mo.GetNamedMetricsOverTime(metrics, q.start, q.end, q.step, q.option)
  } else {
    res = h.mo.GetNamedMetrics(metrics, q.time, q.option)
    if q.shouldSort() {
      res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
    }
  }
  resp.WriteAsJson(res)
}

現在,我們將視角移植到:

pkg/models/monitoring/monitoring.go:156

GetNamedMetricsOverTime爲例,這裏闡述了會合並prometheusmetrics-server的查詢結果進行返回:

func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics {
    // 獲取prometheus client查詢結果,主要使用sync.WaitGroup併發查詢,每個指標啓動一個goroutine,最後將結果和並返回
  ress := mo.prometheus.GetNamedMetricsOverTime(metrics, start, end, step, opt)
  // 如果metrics-server激活了
  if mo.metricsserver != nil {
    //合併邊緣節點數據
    edgeMetrics := make(map[string]monitoring.MetricData)
    for i, ressMetric := range ress {
      metricName := ressMetric.MetricName
      ressMetricValues := ressMetric.MetricData.MetricValues
      if len(ressMetricValues) == 0 {
        // this metric has no prometheus metrics data
        if len(edgeMetrics) == 0 {
          // start to request monintoring metricsApi data
          mr := mo.metricsserver.GetNamedMetricsOverTime(metrics, start, end, step, opt)
          for _, mrMetric := range mr {
            edgeMetrics[mrMetric.MetricName] = mrMetric.MetricData
          }
        }
        if val, ok := edgeMetrics[metricName]; ok {
          ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...)
        }
      }
    }
  }
  return Metrics{Results: ress}
}

此外,monitoring包還定義了各監控查詢 client的接口方法,可以按需探索:

tenant.kubesphere.io

再聊api之前,順帶一提多租戶在隔離的安全程度上,我們可以將其分爲軟隔離 (Soft Multi-tenancy) 和硬隔離 (Hard Multi-tenancy) 兩種。

這個group下比較重要的部分是實現租戶查詢logs/audits/events

以查詢日誌爲例:

func (h *tenantHandler) QueryLogs(req *restful.Request, resp *restful.Response) {
    // 查詢上下文中攜帶的租戶信息
  user, ok := request.UserFrom(req.Request.Context())
  if !ok {
    err := fmt.Errorf("cannot obtain user info")
    klog.Errorln(err)
    api.HandleForbidden(resp, req, err)
    return
  }
  // 解析查詢的參數,比如確定屬於哪個ns/workload/pod/container的查詢、時間段,是否爲柱狀查詢等
  queryParam, err := loggingv1alpha2.ParseQueryParameter(req)
  if err != nil {
    klog.Errorln(err)
    api.HandleInternalError(resp, req, err)
    return
  }
  // 導出數據
  if queryParam.Operation == loggingv1alpha2.OperationExport {
    resp.Header().Set(restful.HEADER_ContentType, "text/plain")
    resp.Header().Set("Content-Disposition", "attachment")
    // 驗證賬號是否有權限
    // admin賬號可以導出所有ns的日誌,租戶只能導出本ns的日誌
    // 組裝loggingclient進行日誌導出
    err := h.tenant.ExportLogs(user, queryParam, resp)
    if err != nil {
      klog.Errorln(err)
      api.HandleInternalError(resp, req, err)
      return
    }
  } else {
    // 驗證賬號是否有權限
    // admin賬號可以查看所有ns的日誌,租戶只能查看本ns的日誌
    // 組裝loggingclient進行日誌返回
    result, err := h.tenant.QueryLogs(user, queryParam)
    if err != nil {
      klog.Errorln(err)
      api.HandleInternalError(resp, req, err)
      return
    }
    resp.WriteAsJson(result)
  }
}

這裏順帶一提,關於租戶權限驗證這塊,可以看下這裏接口定義:

// Authorizer makes an authorization decision based on information gained by making
// zero or more calls to methods of the Attributes interface.  It returns nil when an action is
// authorized, otherwise it returns an error.
type Authorizer interface {
  Authorize(a Attributes) (authorized Decision, reason string, err error)
}

alwaysAllowAuthorizer/alwaysDenyAuthorizer/rabc都實現該方法。

前面在ks-apiserver啓動的時候,有個buildHandlerChain方法:

var authorizers authorizer.Authorizer
// 默認是RBAC,可以通過authorization.mode在啓動配置文件修改
switch s.Config.AuthorizationOptions.Mode { 
case authorizationoptions.AlwaysAllow:
  authorizers = authorizerfactory.NewAlwaysAllowAuthorizer()
case authorizationoptions.AlwaysDeny:
  authorizers = authorizerfactory.NewAlwaysDenyAuthorizer()
default:
  fallthrough
case authorizationoptions.RBAC:
  excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"}
  pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)
  amOperator := am.NewReadOnlyOperator(s.InformerFactory)
  authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator))
}

由於篇幅有限,只對以上GVR進行了調試,感興趣可以深入瞭解~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/IrBkQF5ayliRnj5QgtInNw