用 consul 做 grpc 的服務發現
服務發現與負載均衡
當 server 端是集羣部署時,client 調用 server 就需要用到服務發現與負載均衡。通常有兩總方式:
- 一種方式是在 client 與 server 之間加代理,由代理來做負載均衡
- 一種方式是將服務註冊到一個數據中心,client 通過數據中心查詢到所有服務的節點信息,然後自己選擇負載均衡的策略。
第一種方式常見的就是用 nginx 給 http 服務做負載均衡,client 端不直接與 server 交互,而是把請求並給 nginx,nginx 再轉給後端的服務。
這種方式的優點是:
- client 和 server 無需做改造,client 看不到 server 的集羣,就像單點一樣調用就可以
這種方式有幾個缺點:
- 所有的請求都必須經過代理,代理側容易出現性能瓶頸
- 代理不能出故障,一旦代理掛了服務就沒法訪問了。
第二種方式可以參考 dubbo 的 rpc 方式,所有的服務都註冊在 zookeeper 上,client 端從 zookeeper 訂閱 server 的列表,然後自己選擇把請求發送到哪個 server 上。對於上面提到的兩個缺點,這種方式都很好的避免了:
- client 與 server 端是直接交互的,server 可以做任意的水平擴展,不會出現性能瓶頸
- 註冊中心 (zookeeper) 通過 raft 算法實現分佈式高可用,不用擔心註冊中心掛了服務信息丟失的情況。
這種方式的缺點就是實現起來比較複雜。
用第一種方式做 grpc 的負載均衡時可以有以下的選擇:
用第二種方式時,可以選擇的數據中心中間件有:
他們都實現了 raft 算法,都可以用來做註冊中心,本篇文章選擇 consul 是因爲 consul 的特點就是做服務發現,有現成的 api 可以用。
用 consul 給 golang 的 grpc 做服務註冊與發現
grpc 的 resolver
grpc 的 Dial() 和 DialContent() 方法中都可以添加 Load-Balance 的選項,Dial 方法已經被廢棄了,本篇文章介紹使用 DialContext 的方法。
grpc 官方實現了 dns_resolver 用來做 dns 的負載均衡。我們通過例子看看 grpc client 端的代碼是怎麼寫的,然後再理解 dns_resolver 的源碼,最後參照 dns_resolver 來寫自己的 consul_resovler。
dns 的負載均衡的例子:
package main
import (
"context"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/resolver"
)
const (
address = "dns:///dns-record-name:443"
defaultName = "world"
)
func main() {
// The secret sauce
resolver.SetDefaultScheme("dns")
// Set up a connection to the server.
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the servers in round-robin manner.
for i := 0; i < 3; i++ {
ctx := context.Background()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.Message)
}
}
DialContext 的定義如下:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error)
下面這行代碼指明瞭用 dns_resolver,實際上也可以不寫,grpc 會根據 DialContext 的第二個參數 target 來判斷選用哪個 resolver,例子中傳給 DialContext 的 target 是 dns:///dns-record-name:443,grpc 會自動選擇 dns_resolver
resolver.SetDefaultScheme("dns")
下面的這個選項,指明瞭 grpc 用輪詢做爲負載均衡的策略
grpc.WithBalancerName(roundrobin.Name)
調用 grpc.DialContext 之後,grpc 會找到對應的 resovler,拿到服務的地址列表,然後在調用服務提供的接口時,根據指定的輪詢策略選擇一個服務。
gRPC Name Resolution 裏面說了,可以實現自定義的 resolver 作爲插件。
先看看 resolver.go 的源碼,源碼路徑是 $GOPATH/src/google.golang.org/grpc/resolver/resolver.go
m = make(map[string]Builder) //scheme到Builder的map
func Register(b Builder) { //用於resolver註冊的接口,dns_resolver.go的init方中調用了這個方法,實際就是更新了map
m[b.Scheme()] = b
}
type Resolver interface {
ResolveNow(ResolveNowOption) //立即resolve,重新查詢服務信息
Close() //關閉這個Resolver
}
type Target struct {//uri解析之後的對象, uri的格式詳見RFC3986
Scheme string
Authority string
Endpoint string
}
type Address struct {//描述一個服務的地址信息
Addr string //格式是 host:port
Type AddressType
ServerName string
Metadata interface{}
}
type ClientConn interface {//定義了兩個callback函數,用於通知服務信息的更新
NewAddress(addresses []Address)
NewServiceConfig(serviceConfig string)
}
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) //返回一個Resolver
Scheme() string //返回scheme如 "dns", "passthrough", "consul"
}
func Get(scheme string) Builder { //grpc.ClientConn會高用這個方法獲取指定的Builder接口的實例
if b, ok := m[scheme]; ok {
return b
}
return nil
}
即使加了註釋,估計也很難馬上理解這個其中的具體含意,博主也是結合 dns_resolver.go,反覆讀了好幾遍才理解 resolver.go。其大致的意思是,grpc.DialContext 方法調用之後:
-
解析 target(例如 dns:///dns-record-name:443) 獲取 scheme
-
調用 resolver.Get 方法根據 scheme 拿到對應的 Builder
-
調用 Builder.Build 方法
- 解析 target
- 獲取服務地址的信息
- 調用 ClientConn.NewAddress 和 NewServiceConfig 這兩個 callback 把服務信息傳遞給上層的調用方
- 返回 Resolver 接口實例給上層
-
上層可以通過 Resolver.ResolveNow 方法主動刷新服務信息
瞭解了 resolver 源碼的意思之後,再看一下 dns_resolver.go 就比較清晰了
//註冊一個Builder到resolver的map裏面
//這個方法會被默認調用,瞭解go的init可以自行百度
func init() {
resolver.Register(NewBuilder())
}
func NewBuilder() resolver.Builder {//創建一個resolver.Builder的實例
return &dnsBuilder{minFreq: defaultFreq}
}
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
//解析target拿到ip和端口
host, port, err := parseTarget(target.Endpoint, defaultPort)
if err != nil {
return nil, err
}
// IP address.
if net.ParseIP(host) != nil {
host, _ = formatIP(host)
addr := []resolver.Address{{Addr: host + ":" + port}}
i := &ipResolver{
cc: cc,
ip: addr,
rn: make(chan struct{}, 1),
q: make(chan struct{}),
}
cc.NewAddress(addr)
go i.watcher()
return i, nil
}
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
d := &dnsResolver{
freq: b.minFreq,
backoff: backoff.Exponential{MaxDelay: b.minFreq},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
if target.Authority == "" {
d.resolver = defaultResolver
} else {
d.resolver, err = customAuthorityResolver(target.Authority)
if err != nil {
return nil, err
}
}
d.wg.Add(1)
go d.watcher()//起一個goroutine,因爲watcher這個方法是個死循環,當定時器
return d, nil
}
func (d *dnsResolver) watcher() {
defer d.wg.Done()
for {
//這個select沒有default,當沒有case滿足時會一直阻塞
//結束阻塞的條件是定時器超時d.t.C,或者d.rn這個channel中有數據可讀
select {
case <-d.ctx.Done():
return
case <-d.t.C:
case <-d.rn:
}
result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
if len(result) == 0 {
d.retryCount++
d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
d.retryCount = 0
d.t.Reset(d.freq)
}
//resolver.ClientConn的兩個callback的調用,實現服務信息傳入上層
d.cc.NewServiceConfig(sc)
d.cc.NewAddress(result)
}
}
//向channel中寫入,用於結束watcher中那個select的阻塞狀態,後面的代碼就是重新查詢服務信息的邏輯
func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
select {
case i.rn <- struct{}{}:
default:
}
}
實現 consul_resovler
上面我們瞭解了 grpc 的 resolver 的機制,接下來實現 consul_resolver, 我們先把代碼的架子搭起來
init() //返回一個resolver.Builder的實例
//實現resolver.Builder的接口中的所有方法就是一個resolver.Builder
type consulBuidler strcut {
}
func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
//TODO 解析target, 拿到consul的ip和端口
//TODO 用consul的go api連接consul,查詢服務結點信息,並且調用resolver.ClientConn的兩個callback
}
func (cb *consulBuilder) Scheme() string {
return "consul"
}
//ResolverNow方法什麼也不做,因爲和consul保持了發佈訂閱的關係
//不需要像dns_resolver那個定時的去刷新
func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}
//暫時先什麼也不做吧
func (cr *consulResolver) Close() {
}
現在來看,實現 consul_resolver.go 最大的問題就是怎麼用 consul 提供的 go api 了,參考這篇文章就可以了,然後 consul_resolver.go 的代碼就出來了
package consul
import (
"errors"
"fmt"
"github.com/hashicorp/consul/api"
"google.golang.org/grpc/resolver"
"regexp"
"sync"
)
const (
defaultPort = "8500"
)
var (
errMissingAddr = errors.New("consul resolver: missing address")
errAddrMisMatch = errors.New("consul resolver: invalied uri")
errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon")
regexConsul, _ = regexp.Compile("^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$")
)
func Init() {
fmt.Printf("calling consul init\n")
resolver.Register(NewBuilder())
}
type consulBuilder struct {
}
type consulResolver struct {
address string
wg sync.WaitGroup
cc resolver.ClientConn
name string
disableServiceConfig bool
lastIndex uint64
}
func NewBuilder() resolver.Builder {
return &consulBuilder{}
}
func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
fmt.Printf("calling consul build\n")
fmt.Printf("target: %v\n", target)
host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint))
if err != nil {
return nil, err
}
cr := &consulResolver{
address: fmt.Sprintf("%s%s", host, port),
name: name,
cc: cc,
disableServiceConfig: opts.DisableServiceConfig,
lastIndex: 0,
}
cr.wg.Add(1)
go cr.watcher()
return cr, nil
}
func (cr *consulResolver) watcher() {
fmt.Printf("calling consul watcher\n")
config := api.DefaultConfig()
config.Address = cr.address
client, err := api.NewClient(config)
if err != nil {
fmt.Printf("error create consul client: %v\n", err)
return
}
for {
services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex})
if err != nil {
fmt.Printf("error retrieving instances from Consul: %v", err)
}
cr.lastIndex = metainfo.LastIndex
var newAddrs []resolver.Address
for _, service := range services {
addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port)
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
fmt.Printf("adding service addrs\n")
fmt.Printf("newAddrs: %v\n", newAddrs)
cr.cc.NewAddress(newAddrs)
cr.cc.NewServiceConfig(cr.name)
}
}
func (cb *consulBuilder) Scheme() string {
return "consul"
}
func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}
func (cr *consulResolver) Close() {
}
func parseTarget(target string) (host, port, name string, err error) {
fmt.Printf("target uri: %v\n", target)
if target == "" {
return "", "", "", errMissingAddr
}
if !regexConsul.MatchString(target) {
return "", "", "", errAddrMisMatch
}
groups := regexConsul.FindStringSubmatch(target)
host = groups[1]
port = groups[2]
name = groups[3]
if port == "" {
port = defaultPort
}
return host, port, name, nil
}
到此,grpc 客戶端服務發現就搞定了。
consul 的服務註冊
服務註冊直接用 consul 的 go api 就可以了,也是參考前一篇文章,簡單的封裝一下,consul_register.go 的代碼如下:
package consul
import (
"fmt"
"github.com/hashicorp/consul/api"
"time"
)
type ConsulService struct {
IP string
Port int
Tag []string
Name string
}
func RegitserService(ca string, cs *ConsulService) {
//register consul
consulConfig := api.DefaultConfig()
consulConfig.Address = ca
client, err := api.NewClient(consulConfig)
if err != nil {
fmt.Printf("NewClient error\n%v", err)
return
}
agent := client.Agent()
interval := time.Duration(10) * time.Second
deregister := time.Duration(1) * time.Minute
reg := &api.AgentServiceRegistration{
ID: fmt.Sprintf("%v-%v-%v", cs.Name, cs.IP, cs.Port), // 服務節點的名稱
Name: cs.Name, // 服務名稱
Tags: cs.Tag, // tag,可以爲空
Port: cs.Port, // 服務端口
Address: cs.IP, // 服務 IP
Check: &api.AgentServiceCheck{ // 健康檢查
Interval: interval.String(), // 健康檢查間隔
GRPC: fmt.Sprintf("%v:%v/%v", cs.IP, cs.Port, cs.Name), // grpc 支持,執行健康檢查的地址,service 會傳到 Health.Check 函數中
DeregisterCriticalServiceAfter: deregister.String(), // 註銷時間,相當於過期時間
},
}
fmt.Printf("registing to %v\n", ca)
if err := agent.ServiceRegister(reg); err != nil {
fmt.Printf("Service Register error\n%v", err)
return
}
}
改造一下 grpc 的 helloworld
把 grpc 的 helloworld 的 demo 改一下,用 consul 來做服務註冊和發現。
server 端代碼:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"log"
"net"
"server/internal/consul"
pb "server/proto/helloworld"
)
const (
port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.Name)
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func RegisterToConsul() {
consul.RegitserService("127.0.0.1:8500", &consul.ConsulService{
Name: "helloworld",
Tag: []string{"helloworld"},
IP: "127.0.0.1",
Port: 50051,
})
}
//health
type HealthImpl struct{}
// Check 實現健康檢查接口,這裏直接返回健康狀態,這裏也可以有更復雜的健康檢查策略,比如根據服務器負載來返回
func (h *HealthImpl) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
fmt.Print("health checking\n")
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}
func (h *HealthImpl) Watch(req *grpc_health_v1.HealthCheckRequest, w grpc_health_v1.Health_WatchServer) error {
return nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
grpc_health_v1.RegisterHealthServer(s, &HealthImpl{})
RegisterToConsul()
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
client 端代碼:
package main
import (
"client/internal/consul"
pb "client/proto/helloworld"
"context"
"google.golang.org/grpc"
"log"
"os"
"time"
)
const (
target = "consul://127.0.0.1:8500/helloworld"
defaultName = "world"
)
func main() {
consul.Init()
// Set up a connection to the server.
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
conn, err := grpc.DialContext(ctx, target, grpc.WithBlock(), grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
for {
ctx, _ := context.WithTimeout(context.Background(), time.Second)
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.Message)
time.Sleep(time.Second * 2)
}
}
運行一把
啓動 consul
consul agent -dev
啓動 hello server
cd server
go run cmd/main.go
啓動 hello client
cd client
go run cmd/main.go
運行結果:
//client
2019/03/07 17:22:04 Greeting: Hello world
2019/03/07 17:22:06 Greeting: Hello world
//server
2019/03/07 17:22:04 Received: world
2019/03/07 17:22:06 Received: world
完整工程的 git 地址
工程使用方法:
cd server
go mod tidy
go run cmd/main.go
cd client
go mod tidy
go run cmd/main.go
請自行解決防火牆的問題
參考文章
- grpc 名稱發現與負載均衡
- golang consul-grpc 服務註冊與發現
- gRPC Client-Side Load Balancing in Go
- godoc grpc
- consul go api
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://segmentfault.com/a/1190000018424798