consul 服務註冊與發現
Consul 介紹
服務註冊與發現採用 consul 組件,是 google 開源的一個使用 go 語言開發的服務發現、配置管理中心服務。內置了服務註冊與發現框架、分佈一致性協議實現、健康檢查、Key/Value 存儲、多數據中心方案等。
Consul 使用 gossip 協議管理成員關係、廣播消息到整個集羣,他有兩個 gossip pool(LAN pool 和 WAN pool),LAN pool 是同一個數據中心內部通信的,WAN pool 是多個數據中心通信的,LAN pool 有多個,WAN pool 只有一個。
項目中應用
完整代碼:
https://github.com/Justin02180218/micro-kit
配置文件
consul:
addr: "http://consul-server:8500"
interval: "10s"
timeout: "1s"
grpc:
retrymax: 3
retrytimeout: 500
name: "book-rpc-service"
在 /etc/hosts 中配置 consul-server 域名
config.go
type ConsulConfig struct {
Addr string `json:"addr" yaml:"addr"`
Interval string `json:"interval" yaml:"interval"`
Timeout string `json:"timeout" yaml:"timeout"`
Client struct {
RetryMax int `json:"retrymax" yaml:"retrymax"`
RetryTimeout int `json:"retrytimeout" yaml:"retrytimeout"`
}
}
type GRPCConfig struct {
RetryMax int `json:"retrymax" yaml:"retrymax"`
RetryTimeout int `json:"retrytimeout" yaml:"retrytimeout"`
Name string `json:"name" yaml:"name"`
}
pkg/registers
在 pkg 下新建目錄 registers,創建 consul.go 文件:
代碼如下:
// 鏈接 consul
func connectConsul(consulAddr string) (client consul.Client) {
consulConfig := api.DefaultConfig()
consulConfig.Address = consulAddr
consulClient, err := api.NewClient(consulConfig)
if err != nil {
panic(err)
}
client = consul.NewClient(consulClient)
return
}
// 向 consul 註冊服務
func InitRegister(cfg *configs.AppConfig, check api.AgentServiceCheck, logger log.Logger) (registrar sd.Registrar) {
rand.Seed(time.Now().UnixNano())
name := cfg.ServerConfig.Name
addr := utils.LocalIP()
port := cfg.ServerConfig.Port
consulAddr := cfg.ConsulConfig.Addr
client := connectConsul(consulAddr)
num := rand.Intn(100)
asr := api.AgentServiceRegistration{
ID: name + "-" + strconv.Itoa(num),
Name: name,
Address: addr,
Port: port,
Tags: []string{name},
Check: &check,
}
registrar = consul.NewRegistrar(client, &asr, logger)
return
}
// restful 服務檢測
func HttpCheck(port int, interval, timeout string) api.AgentServiceCheck {
return api.AgentServiceCheck{
HTTP: "http://" + utils.LocalIP() + ":" + strconv.Itoa(port) + "/health",
Interval: interval,
Timeout: timeout,
Notes: "Http Health Check",
}
}
// gRPC 服務檢測
func GRPCCheck(port int, interval, timeout string) api.AgentServiceCheck {
return api.AgentServiceCheck{
GRPC: utils.LocalIP() + ":" + strconv.Itoa(port) + "/health",
Interval: interval,
Timeout: timeout,
Notes: "GRPC Health Check",
}
}
service 層
以 library-user-service 爲例,其他微服務相同。
// 增加函數
HealthCheck() bool
// 實現
func (u *UserServiceImpl) HealthCheck() bool {
return true
}
endpoint 層
以 library-user-service 爲例,其他微服務相同。
// 增加屬性
HealthEndpoint endpoint.Endpoint
func MakeHealthEndpoint(svc service.UserService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
status := svc.HealthCheck()
return status, nil
}
}
transport 層
以 library-user-service 爲例,其他微服務相同。
r.GET("/health", func(c *gin.Context) {
kithttp.NewServer(
endpoints.HealthEndpoint,
decodeHealthRequest,
utils.EncodeJsonResponse,
).ServeHTTP(c.Writer, c.Request)
})
main.go
以 library-user-service 爲例,其他微服務相同。
// 配置日誌
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
// consul 註冊
check := registers.HttpCheck(configs.Conf.ServerConfig.Port, configs.Conf.ConsulConfig.Interval, configs.Conf.ConsulConfig.Timeout)
registrar := registers.InitRegister(configs.Conf, check, logger)
go func() {
registrar.Register()
errChan <- r.Run(fmt.Sprintf(":%s", strconv.Itoa(configs.Conf.ServerConfig.Port)))
}()
registrar.Deregister()
啓動 consul
使用 Docker 啓動 consul
docker pull progrium/consul
docker run --rm -p 8400:8400 -p 8500:8500 -p 8600:53/udp -h node1 progrium/consul -server -bootstrap -ui-dir /ui
啓動服務,然後在瀏覽器地址欄輸入:http://consul-server:8500/
可以看到三個微服務都以註冊到 consul 上,健康檢測也都已經通過。
調用
以前 user-service 調用 book-rpc-service 的接口,使用的是 gPRC Client 的方式。現在服務註冊的 consul 上,user-service 通過 consul 獲取已經註冊的 book-rpc-service 的一個實例,然後調用這個實例上的接口。
在 consul.go 文件中增加如下代碼:
func GRPCClient(cfg *configs.AppConfig, makeEndpoint func(string) endpoint.Endpoint, logger log.Logger) endpoint.Endpoint {
consulAddr := cfg.ConsulConfig.Addr
retryMax := cfg.GRPCConfig.RetryMax
retryTimeout := cfg.GRPCConfig.RetryTimeout
name := cfg.GRPCConfig.Name
client := connectConsul(consulAddr)
instance := consul.NewInstancer(client, logger, name, []string{name}, true)
factory := factoryFor(makeEndpoint)
endpointer := sd.NewEndpointer(instance, factory, logger)
// 負載均衡採用輪詢策略
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, time.Millisecond*time.Duration(retryTimeout), balancer)
return retry
}
func factoryFor(makeEndpoint func(string,) endpoint.Endpoint) sd.Factory {
return func(instance string) (endpoint.Endpoint, io.Closer, error) {
endpoint := makeEndpoint(instance)
return endpoint, nil, nil
}
}
在 endpoint 目錄下增加文件 book_rpc_endpoint.go,代碼如下:
type BookRPCEndpoints struct {
FindBooksEndpoint endpoint.Endpoint
}
func MakeFindBooksEndpoint(instance string) endpoint.Endpoint {
conn, err := grpc.Dial(instance, grpc.WithInsecure())
if err != nil {
fmt.Println(err)
return nil
}
findBooksEndpoint := grpctransport.NewClient(
conn, "book.Book", "FindBooksByUserID",
encodeGRPCFindBooksRequest,
decodeGRPCFindBooksResponse,
pbbook.BooksResponse{},
).Endpoint()
return findBooksEndpoint
}
修改 service.go
type UserServiceImpl struct {
userDao dao.UserDao
grpcClient kitendpoint.Endpoint
}
func NewUserServiceImpl(userDao dao.UserDao, grpcClient kitendpoint.Endpoint) UserService {
return &UserServiceImpl{
userDao: userDao,
grpcClient: grpcClient,
}
}
func (u *UserServiceImpl) FindBooksByUserID(ctx context.Context, id uint64) (interface{}, error) {
res, err := u.grpcClient(ctx, id)
if err != nil {
return nil, err
}
return res, nil
}
修改 main.go
findBooksEndpoint := endpoint.MakeFindBooksEndpoint
grpcClient := registers.GRPCClient(configs.Conf, findBooksEndpoint, tracer, logger)
userService := service.NewUserServiceImpl(userDao, grpcClient)
完整代碼:
https://github.com/Justin02180218/micro-kit
更多**【分佈式專輯】**【架構實戰專輯】******系列文章,請關注公衆號**
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Lrb8PH-iZiXOZ3aSEDh_8A