consul 服務註冊與發現

Consul 介紹

服務註冊與發現採用 consul 組件,是 google 開源的一個使用 go 語言開發的服務發現、配置管理中心服務。內置了服務註冊與發現框架、分佈一致性協議實現、健康檢查、Key/Value 存儲、多數據中心方案等。

Consul 使用 gossip 協議管理成員關係、廣播消息到整個集羣,他有兩個 gossip  pool(LAN pool 和 WAN pool),LAN pool 是同一個數據中心內部通信的,WAN pool 是多個數據中心通信的,LAN pool 有多個,WAN pool 只有一個。

項目中應用

完整代碼:

https://github.com/Justin02180218/micro-kit

配置文件

consul:
  addr: "http://consul-server:8500"
  interval: "10s"
  timeout: "1s"
grpc:
  retrymax: 3
  retrytimeout: 500 
  name: "book-rpc-service"

在 /etc/hosts 中配置 consul-server 域名

config.go

type ConsulConfig struct {
    Addr     string `json:"addr" yaml:"addr"`
    Interval string `json:"interval" yaml:"interval"`
    Timeout  string `json:"timeout" yaml:"timeout"`
    Client   struct {
        RetryMax     int `json:"retrymax" yaml:"retrymax"`
        RetryTimeout int `json:"retrytimeout" yaml:"retrytimeout"`
    }
}
type GRPCConfig struct {
    RetryMax     int    `json:"retrymax" yaml:"retrymax"`
    RetryTimeout int    `json:"retrytimeout" yaml:"retrytimeout"`
    Name         string `json:"name" yaml:"name"`
}

pkg/registers

在 pkg 下新建目錄 registers,創建 consul.go 文件:

代碼如下:

// 鏈接 consul
func connectConsul(consulAddr string) (client consul.Client) {
    consulConfig := api.DefaultConfig()
    consulConfig.Address = consulAddr
    consulClient, err := api.NewClient(consulConfig)
    if err != nil {
        panic(err)
    }
    client = consul.NewClient(consulClient)
    return
}
// 向 consul 註冊服務
func InitRegister(cfg *configs.AppConfig, check api.AgentServiceCheck, logger log.Logger) (registrar sd.Registrar) {
    rand.Seed(time.Now().UnixNano())
    name := cfg.ServerConfig.Name
    addr := utils.LocalIP()
    port := cfg.ServerConfig.Port
    consulAddr := cfg.ConsulConfig.Addr
    client := connectConsul(consulAddr)
    num := rand.Intn(100)
    asr := api.AgentServiceRegistration{
        ID:      name + "-" + strconv.Itoa(num),
        Name:    name,
        Address: addr,
        Port:    port,
        Tags:    []string{name},
        Check:   &check,
    }
    registrar = consul.NewRegistrar(client, &asr, logger)
    return
}
// restful 服務檢測
func HttpCheck(port int, interval, timeout string) api.AgentServiceCheck {
    return api.AgentServiceCheck{
        HTTP:     "http://" + utils.LocalIP() + ":" + strconv.Itoa(port) + "/health",
        Interval: interval,
        Timeout:  timeout,
        Notes:    "Http Health Check",
    }
}
// gRPC 服務檢測
func GRPCCheck(port int, interval, timeout string) api.AgentServiceCheck {
    return api.AgentServiceCheck{
        GRPC:     utils.LocalIP() + ":" + strconv.Itoa(port) + "/health",
        Interval: interval,
        Timeout:  timeout,
        Notes:    "GRPC Health Check",
    }
}

service 層

以 library-user-service 爲例,其他微服務相同。

// 增加函數
HealthCheck() bool
// 實現
func (u *UserServiceImpl) HealthCheck() bool {
    return true
}

endpoint 層

以 library-user-service 爲例,其他微服務相同。

// 增加屬性
HealthEndpoint            endpoint.Endpoint
func MakeHealthEndpoint(svc service.UserService) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (response interface{}, err error) {
        status := svc.HealthCheck()
        return status, nil
    }
}

transport 層

以 library-user-service 爲例,其他微服務相同。

r.GET("/health", func(c *gin.Context) {
    kithttp.NewServer(
        endpoints.HealthEndpoint,
        decodeHealthRequest,
        utils.EncodeJsonResponse,
    ).ServeHTTP(c.Writer, c.Request)
})

main.go

以 library-user-service 爲例,其他微服務相同。

// 配置日誌
var logger log.Logger
{
    logger = log.NewLogfmtLogger(os.Stderr)
    logger = log.With(logger, "ts", log.DefaultTimestampUTC)
    logger = log.With(logger, "caller", log.DefaultCaller)
}
// consul 註冊
check := registers.HttpCheck(configs.Conf.ServerConfig.Port, configs.Conf.ConsulConfig.Interval, configs.Conf.ConsulConfig.Timeout)
registrar := registers.InitRegister(configs.Conf, check, logger)
go func() {
    registrar.Register()
    errChan <- r.Run(fmt.Sprintf(":%s", strconv.Itoa(configs.Conf.ServerConfig.Port)))
}()
registrar.Deregister()

啓動 consul

使用 Docker 啓動 consul

docker pull progrium/consul

docker run --rm -p 8400:8400 -p 8500:8500 -p 8600:53/udp -h node1 progrium/consul -server -bootstrap -ui-dir /ui

啓動服務,然後在瀏覽器地址欄輸入:http://consul-server:8500/

可以看到三個微服務都以註冊到 consul 上,健康檢測也都已經通過。

調用

以前 user-service 調用 book-rpc-service 的接口,使用的是 gPRC Client 的方式。現在服務註冊的 consul 上,user-service 通過 consul 獲取已經註冊的 book-rpc-service 的一個實例,然後調用這個實例上的接口。

在 consul.go 文件中增加如下代碼:

func GRPCClient(cfg *configs.AppConfig, makeEndpoint func(string) endpoint.Endpoint, logger log.Logger) endpoint.Endpoint {
    consulAddr := cfg.ConsulConfig.Addr
    retryMax := cfg.GRPCConfig.RetryMax
    retryTimeout := cfg.GRPCConfig.RetryTimeout
    name := cfg.GRPCConfig.Name
    client := connectConsul(consulAddr)
    instance := consul.NewInstancer(client, logger, name, []string{name}, true)
    factory := factoryFor(makeEndpoint)
    endpointer := sd.NewEndpointer(instance, factory, logger)
    // 負載均衡採用輪詢策略
    balancer := lb.NewRoundRobin(endpointer)
    retry := lb.Retry(retryMax, time.Millisecond*time.Duration(retryTimeout), balancer)
    return retry
}
func factoryFor(makeEndpoint func(string,) endpoint.Endpoint) sd.Factory {
    return func(instance string) (endpoint.Endpoint, io.Closer, error) {
        endpoint := makeEndpoint(instance)
        return endpoint, nil, nil
    }
}

在 endpoint 目錄下增加文件 book_rpc_endpoint.go,代碼如下:

type BookRPCEndpoints struct {
    FindBooksEndpoint endpoint.Endpoint
}
func MakeFindBooksEndpoint(instance string) endpoint.Endpoint {
    conn, err := grpc.Dial(instance, grpc.WithInsecure())
    if err != nil {
        fmt.Println(err)
        return nil
    }
    findBooksEndpoint := grpctransport.NewClient(
        conn, "book.Book", "FindBooksByUserID",
        encodeGRPCFindBooksRequest,
        decodeGRPCFindBooksResponse,
        pbbook.BooksResponse{},
    ).Endpoint()
    return findBooksEndpoint
}

修改 service.go

type UserServiceImpl struct {
    userDao    dao.UserDao
    grpcClient kitendpoint.Endpoint
}
func NewUserServiceImpl(userDao dao.UserDao, grpcClient kitendpoint.Endpoint) UserService {
    return &UserServiceImpl{
        userDao:    userDao,
        grpcClient: grpcClient,
    }
}
func (u *UserServiceImpl) FindBooksByUserID(ctx context.Context, id uint64) (interface{}, error) {
    res, err := u.grpcClient(ctx, id)
    if err != nil {
        return nil, err
    }
    return res, nil
}

修改 main.go

findBooksEndpoint := endpoint.MakeFindBooksEndpoint
grpcClient := registers.GRPCClient(configs.Conf, findBooksEndpoint, tracer, logger)
userService := service.NewUserServiceImpl(userDao, grpcClient)

完整代碼:

https://github.com/Justin02180218/micro-kit

更多**【分佈式專輯】**【架構實戰專輯】******系列文章,請關注公衆號**

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Lrb8PH-iZiXOZ3aSEDh_8A