讓我們使用 Go 實現基本的服務發現
我們已經知道,要請求一個服務實例(服務器),我們必須知道它的網絡位置(IP
地址和端口)。
隨着當今世界微服務的突破,越來越多的用戶、請求和需求使得這項工作變得非常困難。在基於雲的微服務時代,我們的服務會因爲自動伸縮、故障、升級等各種不同的情況而不斷變化。由於這些變化,他們不斷獲得新的 IP
。
這就是服務發現進入微服務場景的地方。我們需要一些系統來隨時關注所有服務,並隨時跟蹤哪個服務部署在哪個 IP / 端口組合上,以便微服務的客戶端可以進行相應的無縫路由。[1]
服務發現在概念上非常簡單:它的關鍵組件是服務註冊表,它是應用程序服務實例的網絡位置的數據庫。[2] 該機制在服務實例啓動和停止時更新服務註冊表。
實現服務發現主要有兩種方式:
-
服務及其客戶端直接與服務註冊中心交互。
-
部署基礎設施(
k8s
等)處理服務發現。
我們將使用3rd
方註冊模式來實現我們的服務發現。多虧了這種模式,一個名爲註冊器的第三方(在我們的例子中非常基本的 go
函數docker ps -a
以特定的時間間隔運行)而不是向服務註冊中心註冊自己的服務。
讓我們更詳細地回顧一下我們的應用程序。
反向代理
爲了實現反向代理,我使用了httputil
包。我實現這個提供負載平衡的主要目的。
爲了以循環方式實現客戶端請求路由,我進行了基本的數學運算,計算了獲取請求的數量,並對服務註冊表列表的長度進行了模塊化操作,以便我輕鬆找到後端並代理請求。
package main
import (
"fmt"
"net/http"
"sync/atomic"
)
type Application struct {
RequestCount uint64
SRegistry *ServiceRegistry
}
func (a *Application) Handle(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&a.RequestCount, 1)
if a.SRegistry.Len() == 0 {
w.Write([]byte(`No backend entry in the service registry`))
return
}
backendIndex := int(atomic.LoadUint64(&a.RequestCount) % uint64(a.SRegistry.Len()))
fmt.Printf("Request routing to instance %d\n", backendIndex)
a.SRegistry.GetByIndex(backendIndex).
proxy.
ServeHTTP(w, r)
}
Registrar
我使用time.Tick
來實現特定時間間隔(默認 3
秒)之間的輪詢。在每個滴答聲中,代碼都docker ps -a
使用官方 docker go SDK
運行。(我使用 -a
是因爲我需要知道哪些容器處於停機狀態,因此我從我們的服務註冊表列表中刪除了不健康的容器 IP
和端口)。如果添加了一個新容器並處於運行狀態,請檢查它是否已存在於服務註冊表中,如果不存在,則將其地址添加到服務註冊表中。
package main
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"time"
)
type Registrar struct {
Interval time.Duration
DockerCLI *client.Client
SRegistry *ServiceRegistry
}
const (
HelloServiceImageName = "hello"
ContainerRunningState = "running"
)
func (r *Registrar) Observe() {
for range time.Tick(r.Interval) {
cList, _ := r.DockerCLI.ContainerList(context.Background(), types.ContainerListOptions{
All: true,
})
if len(cList) == 0 {
r.SRegistry.RemoveAll()
continue
}
for _, c := range cList {
if c.Image != HelloServiceImageName {
continue
}
_, exist := r.SRegistry.GetByContainerID(c.ID)
if c.State == ContainerRunningState {
if !exist {
addr := fmt.Sprintf("http://localhost:%d", c.Ports[0].PublicPort)
r.SRegistry.Add(c.ID, addr)
}
} else {
if exist {
r.SRegistry.RemoveByContainerID(c.ID)
}
}
}
}
}
服務註冊
這是一個非常基本的結構切片,具有併發訪問的安全性,這要歸功於sync.RWMutex
並且如上所述,它保留了所有健康的後端地址列表。該列表由註冊商每3
秒更新一次。
package main
import (
"fmt"
"net/http/httputil"
"net/url"
"sync"
)
type backend struct {
proxy *httputil.ReverseProxy
containerID string
}
type ServiceRegistry struct {
mu sync.RWMutex
backends []backend
}
func (s *ServiceRegistry) Init() {
s.mu = sync.RWMutex{}
s.backends = []backend{}
}
func (s *ServiceRegistry) Add(containerID, addr string) {
s.mu.Lock()
defer s.mu.Unlock()
URL, _ := url.Parse(addr)
s.backends = append(s.backends, backend{
proxy: httputil.NewSingleHostReverseProxy(URL),
containerID: containerID,
})
}
func (s *ServiceRegistry) GetByContainerID(containerID string) (backend, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, b := range s.backends {
if b.containerID == containerID {
return b, true
}
}
return backend{}, false
}
func (s *ServiceRegistry) GetByIndex(index int) backend {
s.mu.RLock()
defer s.mu.RUnlock()
return s.backends[index]
}
func (s *ServiceRegistry) RemoveByContainerID(containerID string) {
s.mu.Lock()
defer s.mu.Unlock()
var backends []backend
for _, b := range s.backends {
if b.containerID == containerID {
continue
}
backends = append(backends, b)
}
s.backends = backends
}
func (s *ServiceRegistry) RemoveAll() {
s.mu.Lock()
defer s.mu.Unlock()
s.backends = []backend{}
}
func (s *ServiceRegistry) Len() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.backends)
}
func (s *ServiceRegistry) List() {
s.mu.RLock()
defer s.mu.RUnlock()
for i := range s.backends {
fmt.Println(s.backends[i].containerID)
}
}
源代碼
https://github.com/Abdulsametileri/simple-service-discovery
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/tE_5DmXaofNa1LuPoBaLHA