gRPC OTel 鏈路追蹤

Open-Telemetry 的第三方軟件包合集 包括了多個社區中常用庫的 OpenTelemetry 支持。隨着 OpenTelemetry 的不斷迭代,相信整個鏈路追蹤的生態也會越發完善。

gRPC 的鏈路追蹤

採集 gRPC 的 trace 數據,推薦使用 otelgrpc 。

安裝依賴。

go get go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc

這裏使用一個簡單的 hello 程序,其proto文件內容如下。

syntax = "proto3"; // 版本聲明,使用Protocol Buffers v3版本

package pb; // 包名


// 定義服務
service Greeter {
    // SayHello 方法
    rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

// 請求消息
message HelloRequest {
    string name = 1;
}

// 響應消息
message HelloResponse {
    string reply = 1;
}

Server 端

在創建 gRPC Server 時,按如下方式設置 StatsHandlerotelgrpc.NewServerHandler()

import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

// ...

s := grpc.NewServer(
  grpc.StatsHandler(otelgrpc.NewServerHandler()), // 設置 StatsHandler
)

完整代碼如下:

package main

import (
 "context"
 "fmt"
 "log"
 "net"
 "time"

 "hello_server/pb"

 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
 "go.opentelemetry.io/otel"
 "go.opentelemetry.io/otel/attribute"
 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
 "go.opentelemetry.io/otel/propagation"
 "go.opentelemetry.io/otel/sdk/resource"
 sdktrace "go.opentelemetry.io/otel/sdk/trace"
 semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
 "go.opentelemetry.io/otel/trace"
 "google.golang.org/grpc"
 "google.golang.org/grpc/metadata"
)

const (
 serviceName       = "gRPC-Jaeger-Demo"
 jaegerRPCEndpoint = "127.0.0.1:4317" // Jaeger RPC端口
)

var tracer = otel.Tracer("grpc-example")

// newJaegerTraceProvider 創建一個 Jaeger Trace Provider
func newJaegerTraceProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
 // 創建一個使用 HTTP 協議連接本機Jaeger的 Exporter
 exp, err := otlptracegrpc.New(ctx,
  otlptracegrpc.WithEndpoint(jaegerRPCEndpoint),
  otlptracegrpc.WithInsecure())
 if err != nil {
  return nil, err
 }
 res, err := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(serviceName)))
 if err != nil {
  return nil, err
 }
 traceProvider := sdktrace.NewTracerProvider(
  sdktrace.WithResource(res),
  sdktrace.WithSampler(sdktrace.AlwaysSample()), // 採樣
  sdktrace.WithBatcher(exp, sdktrace.WithBatchTimeout(time.Second)),
 )
 return traceProvider, nil
}

// initTracer 初始化 Tracer
func initTracer(ctx context.Context) (*sdktrace.TracerProvider, error) {
 tp, err := newJaegerTraceProvider(ctx)
 if err != nil {
  return nil, err
 }

 otel.SetTracerProvider(tp)
 otel.SetTextMapPropagator(
  propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
 )
 return tp, nil
}

type server struct {
 pb.UnimplementedGreeterServer
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
 md, _ := metadata.FromIncomingContext(ctx)
 _, span := tracer.Start(ctx, "SayHello",
  trace.WithAttributes(
   attribute.String("name", in.GetName()),
   attribute.StringSlice("client-id", md.Get("client-id")),
   attribute.StringSlice("user-id", md.Get("user-id")),
  ),
 )
 defer span.End()
 return &pb.HelloResponse{Reply: "Hello " + in.Name}, nil
}

func main() {
 ctx := context.Background()
 tp, err := initTracer(ctx)
 if err != nil {
  log.Fatal(err)
 }
 defer func() {
  if err := tp.Shutdown(context.Background()); err != nil {
   log.Printf("Error shutting down tracer provider: %v", err)
  }
 }()

 // 監聽本地的8972端口
 lis, err := net.Listen("tcp"":8972")
 if err != nil {
  fmt.Printf("failed to listen: %v", err)
  return
 }

 // 創建gRPC服務器
 s := grpc.NewServer(
  grpc.StatsHandler(otelgrpc.NewServerHandler()), // 設置 StatsHandler
 )

 pb.RegisterGreeterServer(s, &server{}) // 在gRPC服務端註冊服務
 // 啓動服務
 err = s.Serve(lis)
 if err != nil {
  fmt.Printf("failed to serve: %v", err)
  return
 }
}

Client 端

在建立連接時,指定 StatsHandlerotelgrpc.NewClientHandler()

import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

// ...

conn, err := grpc.NewClient(
  *addr,
  grpc.WithTransportCredentials(insecure.NewCredentials()),
  grpc.WithStatsHandler(otelgrpc.NewClientHandler()), // 設置 StatsHandler
)

完整代碼:

package main

import (
 "context"
 "flag"
 "log"
 "time"

 "hello_client/pb"

 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
 "go.opentelemetry.io/otel"
 "go.opentelemetry.io/otel/attribute"
 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
 "go.opentelemetry.io/otel/propagation"
 "go.opentelemetry.io/otel/sdk/resource"
 sdktrace "go.opentelemetry.io/otel/sdk/trace"
 semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
 "go.opentelemetry.io/otel/trace"
 "google.golang.org/grpc"
 "google.golang.org/grpc/credentials/insecure"
 "google.golang.org/grpc/metadata"
)

const (
 serviceName    = "gRPC-Jaeger-Demo"
 jaegerEndpoint = "127.0.0.1:4317"
)

var tracer = otel.Tracer("grpc-client-example")

// newJaegerTraceProvider 創建一個 Jaeger Trace Provider
func newJaegerTraceProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
 // 創建一個使用 HTTP 協議連接本機Jaeger的 Exporter
 exp, err := otlptracegrpc.New(ctx,
  otlptracegrpc.WithEndpoint(jaegerEndpoint),
  otlptracegrpc.WithInsecure())
 if err != nil {
  return nil, err
 }
 res, err := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(serviceName)))
 if err != nil {
  return nil, err
 }
 traceProvider := sdktrace.NewTracerProvider(
  sdktrace.WithResource(res),
  sdktrace.WithSampler(sdktrace.AlwaysSample()), // 採樣
  sdktrace.WithBatcher(exp, sdktrace.WithBatchTimeout(time.Second)),
 )
 return traceProvider, nil
}

// initTracer 初始化 Tracer
func initTracer(ctx context.Context) (*sdktrace.TracerProvider, error) {
 tp, err := newJaegerTraceProvider(ctx)
 if err != nil {
  return nil, err
 }

 otel.SetTracerProvider(tp)
 otel.SetTextMapPropagator(
  propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
 )
 return tp, nil
}

const (
 defaultName = "world"
)

var (
 addr = flag.String("addr""127.0.0.1:8972""the address to connect to")
 name = flag.String("name", defaultName, "Name to greet")
)

func main() {
 flag.Parse()

 tp, err := initTracer(context.Background())
 if err != nil {
  log.Fatal(err)
 }
 defer func() {
  if err := tp.Shutdown(context.Background()); err != nil {
   log.Printf("Error shutting down tracer provider: %v", err)
  }
 }()

 // 連接到server端,此處禁用安全傳輸
 conn, err := grpc.NewClient(
  *addr,
  grpc.WithTransportCredentials(insecure.NewCredentials()),
  grpc.WithStatsHandler(otelgrpc.NewClientHandler()), // 設置 StatsHandler
 )
 if err != nil {
  log.Fatalf("did not connect: %v", err)
 }
 defer func() { _ = conn.Close() }()

 c := pb.NewGreeterClient(conn)

 // 執行RPC調用並打印收到的響應數據
 md := metadata.Pairs(
  "timestamp", time.Now().Format(time.StampNano),
  "client-id""hello-client-q1mi",
  "user-id""7",
 )
 ctx := metadata.NewOutgoingContext(context.Background(), md)
 ctx, span := tracer.Start(ctx, "c.SayHello", trace.WithAttributes(attribute.String("name""gRPC-client")))
 defer span.End()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
 if err != nil {
  log.Fatalf("could not greet: %v", err)
 }
 log.Printf("Greeting: %s", r.GetReply())
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/K5OzjpfeumBB8LD-0SLn8Q