golang 源碼分析:uber-cadence(2)

        下面我能看下 cadence 的 helloword 例子的源碼,它包含兩個文件,第一個文件是啓動程序,第二個定義了 workflow 和 activity

package main
import (
  "flag"
  "time"
  "github.com/pborman/uuid"
  "go.uber.org/cadence/client"
  "go.uber.org/cadence/worker"
  common "exp1/common"
)
// This needs to be done as part of a bootstrap step when the process starts.
// The workers are supposed to be long running.
func startWorkers(h *common.SampleHelper) {
  // Configure worker options.
  workerOptions := worker.Options{
    MetricsScope: h.WorkerMetricScope,
    Logger:       h.Logger,
    FeatureFlags: client.FeatureFlags{
      WorkflowExecutionAlreadyCompletedErrorEnabled: true,
    },
  }
  h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
}
func startShadower(h *common.SampleHelper) {
  workerOptions := worker.Options{
    MetricsScope:       h.WorkerMetricScope,
    Logger:             h.Logger,
    EnableShadowWorker: true,
    ShadowOptions: worker.ShadowOptions{
      WorkflowTypes:  []string{helloWorldWorkflowName},
      WorkflowStatus: []string{"Completed"},
      ExitCondition: worker.ShadowExitCondition{
        ShadowCount: 10,
      },
    },
  }
  h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
}
func startWorkflow(h *common.SampleHelper) {
  workflowOptions := client.StartWorkflowOptions{
    ID:                              "helloworld_" + uuid.New(),
    TaskList:                        ApplicationName,
    ExecutionStartToCloseTimeout:    time.Minute,
    DecisionTaskStartToCloseTimeout: time.Minute,
  }
  h.StartWorkflow(workflowOptions, helloWorldWorkflowName, "Cadence")
}
func registerWorkflowAndActivity(
  h *common.SampleHelper,
) {
  h.RegisterWorkflowWithAlias(helloWorldWorkflow, helloWorldWorkflowName)
  h.RegisterActivity(helloWorldActivity)
}
func main() {
  var mode string
  flag.StringVar(&mode, "m", "trigger", "Mode is worker, trigger or shadower.")
  flag.Parse()
  var h common.SampleHelper
  h.SetupServiceConfig()
  switch mode {
  case "worker":
    registerWorkflowAndActivity(&h)
    startWorkers(&h)
    // The workers are supposed to be long running process that should not exit.
    // Use select{} to block indefinitely for samples, you can quit by CMD+C.
    select {}
  case "shadower":
    registerWorkflowAndActivity(&h)
    startShadower(&h)
    select {}
  case "trigger":
    startWorkflow(&h)
  }
}
package main
import (
  "context"
  "time"
  "go.uber.org/cadence/activity"
  "go.uber.org/cadence/workflow"
  "go.uber.org/zap"
)
/**
 * This is the hello world workflow sample.
 */
// ApplicationName is the task list for this sample
const ApplicationName = "helloWorldGroup"
const helloWorldWorkflowName = "helloWorldWorkflow"
// helloWorkflow workflow decider
func helloWorldWorkflow(ctx workflow.Context, name string) error {
  ao := workflow.ActivityOptions{
    ScheduleToStartTimeout: time.Minute,
    StartToCloseTimeout:    time.Minute,
    HeartbeatTimeout:       time.Second * 20,
  }
  ctx = workflow.WithActivityOptions(ctx, ao)
  logger := workflow.GetLogger(ctx)
  logger.Info("helloworld workflow started")
  var helloworldResult string
  err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
  if err != nil {
    logger.Error("Activity failed.", zap.Error(err))
    return err
  }
  // Adding a new activity to the workflow will result in a non-determinstic change for the workflow
  // Please check https://cadenceworkflow.io/docs/go-client/workflow-versioning/ for more information
  //
  // Un-commenting the following code and the TestReplayWorkflowHistoryFromFile in replay_test.go
  // will fail due to the non-determinstic change
  //
  // If you have a completed workflow execution without the following code and run the
  // TestWorkflowShadowing in shadow_test.go or start the worker in shadow mode (using -m shadower)
  // those two shadowing check will also fail due to the non-deterministic change
  //
  // err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
  // if err != nil {
  //   logger.Error("Activity failed.", zap.Error(err))
  //   return err
  // }
  logger.Info("Workflow completed.", zap.String("Result", helloworldResult))
  return nil
}
func helloWorldActivity(ctx context.Context, name string) (string, error) {
  logger := activity.GetLogger(ctx)
  logger.Info("helloworld activity started")
  return "Hello " + name + "!", nil
}

啓動文件裏首先定義了相關的各種配置,包括日誌配和監控配置,監控採用的是 prometheus。設置配置的函數是 h.SetupServiceConfig(),位於 common/sample_helper.go

type (
  // SampleHelper class for workflow sample helper.
  SampleHelper struct {
    Service            workflowserviceclient.Interface
    WorkerMetricScope  tally.Scope
    ServiceMetricScope tally.Scope
    Logger             *zap.Logger
    Config             Configuration
    Builder            *WorkflowClientBuilder
    DataConverter      encoded.DataConverter
    CtxPropagators     []workflow.ContextPropagator
    workflowRegistries []registryOption
    activityRegistries []registryOption
    Tracer             opentracing.Tracer
    configFile string
  }
// Configuration for running samples.
  Configuration struct {
    DomainName      string                    `yaml:"domain"`
    ServiceName     string                    `yaml:"service"`
    HostNameAndPort string                    `yaml:"host"`
    Prometheus      *prometheus.Configuration `yaml:"prometheus"`
  }
registryOption struct {
    registry interface{}
    alias    string
  }

其中配置文件使用的是 config/development.yaml

func (h *SampleHelper) SetupServiceConfig() {
          if h.configFile == "" {
    h.configFile = defaultConfigFile
        if err := yaml.Unmarshal(configData, &h.Config); err != nil {
        logger, err := zap.NewDevelopment()
        if h.Config.Prometheus != nil {
    reporter, err := h.Config.Prometheus.NewReporter(
      prometheus.ConfigurationOptions{
        Registry: prom.NewRegistry(),
        h.Builder = NewBuilder(logger).
    SetHostPort(h.Config.HostNameAndPort).
    SetDomain(h.Config.DomainName).
    SetMetricsScope(h.ServiceMetricScope).
    SetDataConverter(h.DataConverter).
    SetTracer(h.Tracer).
    SetContextPropagators(h.CtxPropagators)
        service, err := h.Builder.BuildServiceClient()
        h.Service = service
        domainClient, _ := h.Builder.BuildCadenceDomainClient()
  _, err = domainClient.Describe(context.Background(), h.Config.DomainName)
          h.workflowRegistries = make([]registryOption, 0, 1)
  h.activityRegistries = make([]registryOption, 0, 1)
const (
  defaultConfigFile = "config/development.yaml"

然後是註冊 workflow 和 activity 即 registerWorkflowAndActivity(&h)

h.RegisterWorkflowWithAlias(helloWorldWorkflow, helloWorldWorkflowName)

common/sample_helper.go

func (h *SampleHelper) RegisterWorkflowWithAlias(workflow interface{}, alias string) {
          registryOption := registryOption{
    registry: workflow,
    alias:    alias,
  }
        h.workflowRegistries = append(h.workflowRegistries, registryOption)
    h.RegisterActivity(helloWorldActivity)
func (h *SampleHelper) RegisterActivity(activity interface{}) {
  h.RegisterActivityWithAlias(activity, "")
}
func (h *SampleHelper) RegisterActivityWithAlias(activity interface{}, alias string) {
          registryOption := registryOption{
    registry: activity,
    alias:    alias,
  }
        h.activityRegistries = append(h.activityRegistries, registryOption)

前面 workflow.go 裏定義我們的 workflow 和 activity

func helloWorldWorkflow(ctx workflow.Context, name string) error {
          ao := workflow.ActivityOptions{
    ScheduleToStartTimeout: time.Minute,
    StartToCloseTimeout:    time.Minute,
    HeartbeatTimeout:       time.Second * 20,
  }
        err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult)
func helloWorldActivity(ctx context.Context, name string) (string, error) {

go.uber.org/cadence@v0.19.1/workflow/workflow.go

func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Future {
  return internal.ExecuteActivity(ctx, activity, args...)
}

指定完成後就可以啓動任務了 startWorkers(&h)

workflowOptions := client.StartWorkflowOptions{
    ID:                              "helloworld_" + uuid.New(),
    TaskList:                        ApplicationName,
    ExecutionStartToCloseTimeout:    time.Minute,
    DecisionTaskStartToCloseTimeout: time.Minute,
  }
    h.StartWorkflow(workflowOptions, helloWorldWorkflowName, "Cadence")
func startWorkers(h *common.SampleHelper) {
      workerOptions := worker.Options{
    MetricsScope: h.WorkerMetricScope,
    Logger:       h.Logger,
    FeatureFlags: client.FeatureFlags{
      WorkflowExecutionAlreadyCompletedErrorEnabled: true,
    },
  }
      h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)

common/sample_helper.go

func (h *SampleHelper) StartWorkers(domainName string, groupName string, options worker.Options) {
        worker := worker.New(h.Service, domainName, groupName, options)
        h.registerWorkflowAndActivity(worker)
        err := worker.Start()
func (h *SampleHelper) registerWorkflowAndActivity(worker worker.Worker) {
        for _, w := range h.workflowRegistries {
    if len(w.alias) == 0 {
      worker.RegisterWorkflow(w.registry)
    } else {
      worker.RegisterWorkflowWithOptions(w.registry, workflow.RegisterOptions{Name: w.alias})
    }
  }
  for _, act := range h.activityRegistries {
    if len(act.alias) == 0 {
      worker.RegisterActivity(act.registry)
    } else {
      worker.RegisterActivityWithOptions(act.registry, activity.RegisterOptions{Name: act.alias})
    }
  }

go.uber.org/cadence@v0.19.1/worker/worker.go

func New(
  service workflowserviceclient.Interface,
  domain string,
  taskList string,
  options Options,
) Worker {
  return internal.NewWorker(service, domain, taskList, options)
}
type (
  // Worker hosts workflow and activity implementations.
  // Use worker.New(...) to create an instance.
  Worker interface {
    Registry
    // Start starts the worker in a non-blocking fashion
    Start() error
    // Run is a blocking start and cleans up resources when killed
    // returns error only if it fails to start the worker
    Run() error
    // Stop cleans up any resources opened by worker
    Stop()
  }
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_wTbGBZ1W3RqufLjgHKkdA