使用 Go 構建分佈式系統:基於 gRPC 的主從節點架構

在現代軟件開發領域,分佈式系統已經變得至關重要。它們使服務能夠擴展、處理大量數據並提供高可用性。本文將指導您使用 Golang 構建一個簡單的分佈式系統,該系統利用主節點和單個工作節點,並使用 gRPC 協議進行通信。

這種架構非常適合數據處理、並行計算和大規模處理工作負載等分佈式任務。我們將介紹如何設置主從結構、建立基於 gRPC 的通信,以及實現簡單的任務分配和執行流程。

系統概述

我們的分佈式系統包含以下組件:

前提條件

go install google.golang.org/grpc
go install google.golang.org/protobuf/cmd/protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go-grpc

設置 gRPC 服務定義

創建基於 gRPC 的分佈式系統的第一步是在 .proto 文件中定義 gRPC 服務和消息。此文件概述了用於通信的服務、RPC 方法和消息結構。

1. 在 Proto 文件中定義 gRPC 服務

創建一個名爲 node.proto 的文件,內容如下:

syntax = "proto3";
package core;
option go_package = ".;core";

message Request {
    string action = 1;
}

message Response {
    string data = 1;
}

service NodeService {
    rpc ReportStatus(Request) returns (Response){};
    rpc AssignTask(Request) returns (stream Response){};
}

2. 從 Proto 文件生成 gRPC 代碼

使用 protoc 爲我們的 gRPC 服務生成 Go 代碼:

mkdir core
protoc --go_out=./core --go-grpc_out=./core node.proto

實現 gRPC 服務端代碼

我們設置了一個 gRPC 服務器來報告狀態,並通過命令通道持續發送客戶端任務。它使用 Go 的併發特性來處理實時命令通知。

package core

import "context"

type NodeServiceGrpcServer struct {
 UnimplementedNodeServiceServer
 CmdChannel chan string
}

func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
 return &Response{Data: "ok"}, nil
}

func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
 for {
  select {
  case cmd := <-n.CmdChannel:
   if err := server.Send(&Response{Data: cmd}); err != nil {
    return err
   }
  }
 }
}

var server *NodeServiceGrpcServer

func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
 if server == nil {
  server = &NodeServiceGrpcServer{
   CmdChannel: make(chan string),
  }
 }
 return server
}

實現主節點

主節點負責將任務分配給工作節點。它通過 gRPC 連接到工作節點,並使用 AssignTask 方法分配任務。

現在,讓我們在名爲 node.go 的文件中實現主節點:我們使用 API 框架 gin 創建一個簡單的 API 服務,該服務允許對 /tasks 的 POST 請求將命令發送到通道 CmdChannel 並傳遞給 NodeServiceGrpcServer。

package core

import (
 "net"
 "net/http"

 "github.com/gin-gonic/gin"
 "google.golang.org/grpc"
)

type MasterNode struct {
 api     *gin.Engine
 ln      net.Listener
 svr     *grpc.Server
 nodeSvr *NodeServiceGrpcServer
}

func (n *MasterNode) Init() (err error) {
 n.ln, err = net.Listen("tcp"":50051")
 if err != nil {
  return err
 }
 n.svr = grpc.NewServer()
 n.nodeSvr = GetNodeServiceGrpcServer()
 RegisterNodeServiceServer(node.svr, n.nodeSvr)
 n.api = gin.Default()
 n.api.POST("/tasks", func(c *gin.Context) {
  var payload struct {
   Cmd string `json:"cmd"`
  }
  if err := c.ShouldBindBodyWithJSON(&payload); err != nil {
   c.AbortWithStatus(http.StatusBadRequest)
   return
  }
  n.nodeSvr.CmdChannel <- payload.Cmd
  c.AbortWithStatusJSON(200, http.StatusOK)
 })
 return nil
}

func (n *MasterNode) Start() {
 go n.svr.Serve(n.ln)
 _ = n.api.Run(":9092")
 n.svr.Stop()
}

var node *MasterNode

func GetMasterNode() *MasterNode {
 if node == nil {
  node = &MasterNode{}
  if err := node.Init(); err != nil {
   panic(err)
  }
 }
 return node
}

實現工作節點

工作節點的職責是從主節點接收任務、處理任務並返回結果。

現在,讓我們在名爲 worker_node.go 的文件中實現工作服務器:工作節點通過獲取的流從服務器(主節點)連續接收數據並執行命令。

package core

import (
 "context"
 "fmt"
 "os/exec"
 "strings"

 "google.golang.org/grpc"
)

type WokerNode struct {
 conn *grpc.ClientConn
 c    NodeServiceClient
}

func (n *WokerNode) Init() (err error) {
 n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
 if err != nil {
  return err
 }
 n.c = NewNodeServiceClient(n.conn)
 return nil
}

func (n *WokerNode) Start() {
 fmt.Println("worker node started")
 _, _ = n.c.ReportStatus(context.Background()&Request{})
 stream, _ := n.c.AssignTask(context.Background()&Request{})
 for {
  res, err := stream.Recv()
  if err != nil {
   return
  }
  fmt.Print("received command: ", res.Data)
  parts := strings.Split(res.Data, " ")
  if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
   fmt.Println(err)
  }
 }
}

var workerNode *WokerNode

func GetWorkerNode() *WokerNode {
 if workerNode == nil {
  workerNode = &WokerNode{}
  if err := workerNode.Init(); err != nil {
   panic(err)
  }
 }
 return workerNode
}

整合主從節點

我們創建一個 main.go,它位於 core 文件夾之外。main 函數接受一個參數,並將其與 switch 語句進行比較,以確定是運行主節點還是工作節點。

package main

import (
 "go-master-worker-node/core"
 "os"
)

func main() {
 nodeType := os.Args[1]
 switch nodeType {
 case "master":
  core.GetMasterNode().Start()
 case "worker":
  core.GetWorkerNode().Start()
 default:
  panic("invalid node type")
 }
}

運行主節點和工作節點

啓動主節點:

go run main.go master

啓動工作節點:

go run main.go worker

使用 Curl 發送 POST 請求

我們可以使用 curl POST 方法發送命令,如下所示,我們向本地主機 9092 發送一個 touch 命令,路徑設置爲 “tasks”,這是主節點當前運行的位置。

發送 touch 命令:

curl -X POST -H "Content-Type: application/json" -d '{"cmd": "touch test.txt"}' http://localhost:9092/tasks

結論

我們使用 Golang 構建了一個基本的分佈式系統,該系統採用主從架構並使用 gRPC 進行高效通信。在實際場景中,您可以使用更復雜的任務分配、負載均衡和錯誤處理來擴展此模型,以處理生產級別的分佈式任務。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VgJjkdPgN0x_W1Hxn63OSg