golang 實現簡單網關
【導讀】使用 golang 做網關開發,本文展示了一種最簡單版本的網管落地實現。
golang 實現簡單網關
網關 = 反向代理 + 負載均衡 + 各種策略,技術實現也有多種多樣,有基於 nginx 使用 lua 的實現,比如 openresty、kong;也有基於 zuul 的通用網關;還有就是 golang 的網關,比如 tyk。
這篇文章主要是講如何基於 golang 實現一個簡單的網關。
- 預備
1.1. 準備兩個後端 web 服務
啓動兩個後端 web 服務(代碼)
type RealServer struct {
Addr string
}
func (r *RealServer) Run() {
log.Println("start http server at " + r.Addr)
mux := http.NewServeMux()
mux.HandleFunc("/", r.EchoHandler)
mux.HandleFunc("/base/error", r.ErrorHandler)
mux.HandleFunc("/timeout", r.TimeoutHandler)
server := &http.Server{
Addr: r.Addr,
WriteTimeout: time.Second * 3,
Handler: mux,
}
go func() {
log.Fatal(server.ListenAndServe())
}()
}
func (r *RealServer) EchoHandler(w http.ResponseWriter, req *http.Request) {
upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path)
realIP := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-Ip=%v\n", req.RemoteAddr, req.Header.Get("X-Forwarded-For"), req.Header.Get("X-Real-Ip"))
header := fmt.Sprintf("headers =%v\n", req.Header)
io.WriteString(w, upath)
io.WriteString(w, realIP)
io.WriteString(w, header)
}
func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(500)
io.WriteString(w, "error handler")
}
func (r *RealServer) TimeoutHandler(w http.ResponseWriter, req *http.Request) {
time.Sleep(6 * time.Second)
w.WriteHeader(200)
io.WriteString(w, "timeout handler")
}
func main() {
rs1 := &RealServer{Addr: "127.0.0.1:2003"}
rs1.Run()
rs2 := &RealServer{Addr: "127.0.0.1:2004"}
rs2.Run()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
1.2. 訪問工具
curl -v http://localhost:2002/base
- 反向代理
2.1. 單後端(target)反向代理
具體代碼
package main
import (
"log"
"net/http"
"net/http/httputil"
"net/url"
)
var (
addr = "127.0.0.1:2002"
)
func main() {
rsUrl, _:=url.Parse("http://127.0.0.1:2003/base")
reversePorxy := httputil.NewSingleHostReverseProxy(rsUrl)
log.Println("Starting Httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, reversePorxy))
}
直接使用基礎庫 httputil 提供的NewSingleHostReverseProxy
即可,返回的reverseProxy
對象實現了serveHttp
方法,因此可以直接作爲 handler。
2.2. 分析反向代理代碼,並添加修改 response 內容
具體代碼
package main
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httputil"
"net/url"
"strings"
)
var (
addr = "127.0.0.1:2002"
)
func main() {
rsUrl, _:=url.Parse("http://127.0.0.1:2003/base")
reversePorxy := NewSingleHostReverseProxy(rsUrl)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, reversePorxy))
}
func NewSingleHostReverseProxy(target *url.URL) *httputil.ReverseProxy {
targetQuery := target.RawQuery
director := func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
// explicitly disable User-Agent so it's not set to default value
req.Header.Set("User-Agent", "")
}
// add when the reverseproxy is the first rp
req.Header.Set("X-Real-Ip", strings.Split(req.RemoteAddr, ":")[0])
}
modifyFunc := func(res *http.Response) error {
if res.StatusCode != http.StatusOK {
oldPayLoad, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
newPayLoad := []byte("hello " + string(oldPayLoad))
res.Body = ioutil.NopCloser(bytes.NewBuffer(newPayLoad))
res.ContentLength = int64(len(newPayLoad))
res.Header.Set("Content-Length",fmt.Sprint(len(newPayLoad)))
}
return nil
}
return &httputil.ReverseProxy{Director: director, ModifyResponse: modifyFunc}
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
director
中定義回調函數,入參爲*http.Request
,決定如何構造向後端的請求,比如 host 是否向後傳遞,是否進行 url 重寫,對於 header 的處理,後端 target 的選擇等,都可以在這裏完成。
director
在這裏具體做了:
-
根據後端 target,構造到後端請求的 url
-
選擇性傳遞必要的 header
-
設置代理相關的 header,比如
X-Forwarded-For
、X-Real-Ip
-
X-Forwarded-For
記錄經過的所有代理,以proxyIp01, proxyIp02, proxyIp03
的格式記錄,由於是追加,可能被篡改,當然,如果第一代理以覆蓋該頭的方式進行記錄,也是可信的 -
X-Real-Ip
用於記錄客戶端 IP,一般放在第一代理上,用於記錄客戶端的來源公網 IP,可信
modifyResponse
中定義回調函數,入參爲*http.Response
,用於修改響應的信息,比如響應的 Body,響應的 Header 等信息。
最終依舊是返回一個ReverseProxy
,然後將這個對象作爲 handler 傳入即可。
2.3. 支持多個後端服務器
參考 2.2 中的NewSingleHostReverseProxy
,只需要實現一個類似的、支持多 targets 的方法即可,具體實現見後面。
- 負載均衡
作爲一個網關服務,在上面 2.3 的基礎上,需要支持必要的負載均衡策略,比如:
-
隨機
-
輪詢
-
加權輪詢
-
一致性 hash
3.1. 負載均衡算法
3.1.1. 隨機
隨便 random 一個整數作爲索引,然後取對應的地址即可,實現比較簡單。
具體代碼
type RandomN struct {
rss []string
}
func (r *RandomN) Add(params ...string) error {
if len(params) != 1 {
return fmt.Errorf("param length should be one")
}
r.rss = append(r.rss, params[0])
return nil
}
func (r *RandomN) Next() string {
if len(r.rss) == 0 {
return ""
}
return r.rss[rand.Intn(len(r.rss))]
}
func (r *RandomN) Get(key string) (string, error) {
return r.Next(), nil
}
3.1.2. 輪詢
使用curIndex
進行累加計數,一旦超過 rss 數組的長度,則重置。
具體代碼
type RR struct {
curIndex int
rss []string
}
func (r *RR) Add(params ...string) error {
if len(params) != 1 {
return fmt.Errorf("param length should be one")
}
r.rss = append(r.rss, params[0])
return nil
}
func (r *RR) Next() string {
if len(r.rss) == 0 {
return ""
}
if r.curIndex == len(r.rss) {
r.curIndex = 0
}
node := r.rss[r.curIndex]
r.curIndex++
return node
}
func (r *RR) Get(key string) (string, error) {
return r.Next(), nil
}
3.1.3. 加權輪詢
輪詢帶權重,如果使用計數遞減的方式,如果權重是5,1,1
那麼後端 rs 依次爲a,a,a,a,a,b,c,a,a,a,a...
,其中 a 後端會瞬間壓力過大;參考 nginx 內部的加權輪詢,或者應該稱之爲平滑加權輪詢,思路是:
後端真實節點包含三個權重:
-
本身權重 weight —— 設置的權重
-
有效權重 effectiveWeight —— 根據後端節點健康狀態動態變化,當異常時,減一;當正常時,加一,最多到 weight 值
-
當前權重 curWeight —— 初始值爲 weight,計算時
curWeight += effectiveWeight
,如果curWeight
最大,則被選中,然後curWeight -= total
操作步驟:
-
計算 curWeight
-
選取最大 curWeight 的節點
-
重新計算 curWeight
具體代碼
type WeightedRR struct {
rss []*WeightedNode
}
type WeightedNode struct {
addr string
weight int
curWeight int
effectiveWeight int
}
func (r *WeightedRR) Add(params ...string) error {
if len(params) != 2 {
return fmt.Errorf("param length should be two")
}
addr := params[0]
weight, err := strconv.ParseInt(params[1], 10, 64)
if err != nil {
return err
}
node := &WeightedNode{
addr: addr,
weight: int(weight),
curWeight: int(weight),
effectiveWeight: int(weight),
}
r.rss = append(r.rss, node)
return nil
}
func (r *WeightedRR) Next() string {
// 平滑加權輪詢 --> 1 計算 total, 2 變更臨時權重 3. 選擇最大臨時權重 4。 變更臨時權重
total := 0
var best *WeightedNode
for _, node := range r.rss {
n := node
total += n.effectiveWeight
n.curWeight += n.effectiveWeight
if best == nil || n.curWeight > best.curWeight {
best = n
}
}
if best == nil {
return ""
}
best.curWeight -= total
return best.addr
}
func (r *WeightedRR) Get(key string) (string, error) {
return r.Next(), nil
}
3.1.4. 一致性 hash
一致性 hash 算法,主要是用於分佈式 cache 熱點 / 命中問題;這裏用於基於某 key 的 hash 值,路由到固定後端,但是隻能是基本滿足流量綁定,一旦後端目標節點故障,會自動平移到環上最近的那麼個節點。
實現:
-
首先存在一個環,環上的每個點都能被選擇的 hash 函數映射到
-
然後將後端真實節點 + 序號(副本數)映射到環上
-
當請求進來的時候,使用某 ** 特定組成的 key ** 代入 hash 函數計算得到一個位置
-
如果 key 是由 url 組成,那就是 ** url hash**
-
如果 key 是由 remoteIp 組成,那麼就是 ** IP hash**
-
使用二分查找,找到其在環上的下一個節點
具體代碼
type Keys []uint32
func (k Keys) Less(i, j int) bool {
return k[i] < k[j]
}
func (k Keys) Swap(i, j int) {
k[i], k[j] = k[j], k[i]
}
func (k Keys) Len() int {
return len(k)
}
type ConsistentHash struct {
mux sync.RWMutex
hash func(data []byte) uint32
replicas int
keys Keys
hashMap map[uint32]string
}
func NewConsistentHash(replicas int, fn func(data []byte) uint32) *ConsistentHash {
m := &ConsistentHash{
hash: fn,
replicas: replicas,
hashMap: make((map[uint32]string)),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
func (c *ConsistentHash) Add(params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
c.mux.Lock()
defer c.mux.Unlock()
for i := 0; i < c.replicas; i++ {
hash := c.hash([]byte(strconv.Itoa(i) + addr))
c.keys = append(c.keys, hash)
c.hashMap[hash] = addr
}
sort.Sort(c.keys)
return nil
}
func (c *ConsistentHash) IsEmpty() bool {
return len(c.keys) == 0
}
func (c *ConsistentHash) Get(key string) (string, error) {
if c.IsEmpty() {
err := fmt.Errorf("nodes empty")
return "", err
}
hash := c.hash([]byte(key))
idx := sort.Search(len(c.keys), func(i int) bool {
return c.keys[i] >= hash
})
if idx == len(c.keys) {
idx = 0
}
return c.hashMap[c.keys[idx]], nil
}
3.2. 通用接口 / 工廠模式
type LoadBalanceStrategy interface {
Add(...string) error
Get(string) (string, error)
}
每一種不同的負載均衡算法,只需要實現添加以及獲取的接口即可。
type LbType int
const (
LbRandom LbType = iota
LbRoundRobin
LbWeightRoundRobin
LbConsistentHash
)
func LoadBanlanceFactory(lbType LbType) LoadBalanceStrategy {
switch lbType {
case LbRandom:
return &RandomN{}
case LbConsistentHash:
return NewConsistentHash(10, nil)
case LbRoundRobin:
return &RR{}
case LbWeightRoundRobin:
return &WeightedRR{}
default:
return &RR{}
}
}
然後使用工廠方法,根據傳入的參數,決定使用哪種負載均衡策略。
3.3. 支持負載均衡算法的反向代理實現
-
使用
LoadBanlanceFactory
工廠函數,傳入負載均衡類型,獲取負載均衡對象 -
添加後端真實節點
-
然後初始化
NewMultiTargetsReverseProxy
,在 director 回調函數中,根據負載均衡策略獲取要請求的後端真實節點 -
剩下的邏輯同
2.2
具體代碼
func NewMultiTargetsReverseProxy(lb lb_strategy.LoadBalanceStrategy) *httputil.ReverseProxy {
director := func(req *http.Request) {
remoteIP := strings.Split(req.RemoteAddr, ":")[0]
nextAddr, err := lb.Get(remoteIP)
if err != nil {
log.Fatal("get next addr fail")
}
target, err := url.Parse(nextAddr)
if err != nil {
log.Fatal(err)
}
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "user-agent")
}
}
modifyFunc := func(resp *http.Response) error {
//請求以下命令:curl 'http://127.0.0.1:2002/error'
if resp.StatusCode != 200 {
//獲取內容
oldPayload, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
//追加內容
newPayload := []byte("StatusCode error:" + string(oldPayload))
resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload))
resp.ContentLength = int64(len(newPayload))
resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))
}
return nil
}
errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
//todo 如果是權重的負載則調整臨時權重
http.Error(w, "ErrorHandler error:"+err.Error(), 500)
}
return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc}
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
func main() {
rb := lb_strategy.LoadBanlanceFactory(lb_strategy.LbConsistentHash)
rb.Add("http://127.0.0.1:2003/base")
rb.Add("http://127.0.0.1:2004/base")
rb.Add("http://127.0.0.1:2005/base")
proxy := NewMultiTargetsReverseProxy(rb)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
- 中間件
作爲網關,中間件必不可少,這類包括請求響應的模式,一般稱作洋蔥模式,每一層都是中間件,一層層進去,然後一層層出來。
中間件的實現一般有兩種,一種是使用數組,然後配合 index 計數;一種是鏈式調用。
4.1. 基於數組的中間件實現
NewSliceRouterHandler
獲取SliceRouterHandler
對象,該對象實現了Hanlder
接口,可以作爲handler
傳入 http 服務
-
ServeHTTP
方法中,調用newSliceRouterContext
初始化SliceRouterContext
,並且根據req
中的 url,按照最長 url 前綴匹配
的規則尋找groups
中滿足條件的SliceGroup
丟給SliceRouterContext
-
ServeHTTP
方法中,調用Next
方法開始整個handlers
數組的handler
調用
-
SliceRouterHandler
包含coreFunc
以及SliceRouter 對象
-
SliceRouter
包含SliceGroup
列表 -
SliceGroup
對象包含path
以及handlers
-
使用
Use
方法來添加中間件,並且去重添加到SliceRouter
中的groups
中去 -
使用
Group
方法初始化一個SliceGroup
- 貫穿整條調用鏈的是
SliceRouterContext
對象,包含:
-
SliceGroup
指針 -
ResponseWriter
-
Request
指針 -
Context
-
index
索引
-
中間件中可以調用
SliceRouterContext
中的Next
方法繼續,也可以調用Abort
方法進行終止 -
Abort
終止的方式就是設置索引 index 爲abortIndex
具體代碼
const abortIndex int8 = math.MaxInt8 / 2
type HandlerFunc func(*SliceRouterContext)
type SliceRouter struct {
groups []* SliceGroup
}
type SliceGroup struct {
*SliceRouter
path string
handlers []HandlerFunc
}
// slice router context
type SliceRouterContext struct {
*SliceGroup
RespW http.ResponseWriter
Req *http.Request
Ctx context.Context
index int8
}
func newSliceRouterContext(rw http.ResponseWriter, req *http.Request, r *SliceRouter) *SliceRouterContext {
newSliceGroup := &SliceGroup{}
matchUrlLen := 0
for _, group := range r.groups {
if strings.HasPrefix(req.RequestURI, group.path) {
pathLen := len(group.path)
if pathLen > matchUrlLen {
matchUrlLen = pathLen
*newSliceGroup = *group //淺拷貝數組指針
}
}
}
c := &SliceRouterContext{RespW: rw, Req: req, SliceGroup: newSliceGroup, Ctx: req.Context()}
c.Reset()
return c
}
// 獲取上下文值
func (ctx *SliceRouterContext) Get(key interface{}) interface{} {
return ctx.Ctx.Value(key)
}
// 設置上下文值
func (ctx *SliceRouterContext) Set(key, val interface{}) {
ctx.Ctx = context.WithValue(ctx.Ctx, key, val)
}
//
func (ctx *SliceRouterContext) Next() {
ctx.index++
for ctx.index < int8(len(ctx.groups)) {
ctx.handlers[ctx.index](ctx)
ctx.index++
}
}
// 重置 handlers 數組計數
func (ctx *SliceRouterContext) Reset() {
ctx.index = -1
}
func (ctx *SliceRouterContext) Abort() {
ctx.index = abortIndex
}
// 是否跳過了回調
func (ctx *SliceRouterContext) IsAborted() bool {
return ctx.index >= abortIndex
}
// sliceRouterHandler
type SliceRouterHandler struct {
coreFunc func(*SliceRouterContext) http.Handler
router *SliceRouter
}
func NewSliceRouterHandler(coreFunc func(*SliceRouterContext) http.Handler, router *SliceRouter) *SliceRouterHandler {
return &SliceRouterHandler{
coreFunc: coreFunc,
router: router,
}
}
func (w *SliceRouterHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
c := newSliceRouterContext(rw, req, w.router)
if w.coreFunc != nil {
c.handlers = append(c.handlers, func(c *SliceRouterContext) {
w.coreFunc(c).ServeHTTP(rw, req)
})
}
c.Reset()
c.Next()
}
// 構造 router
func NewSliceRouter() *SliceRouter {
return &SliceRouter{}
}
// 創建 Group
func (g *SliceRouter) Group(path string) *SliceGroup {
return &SliceGroup{
SliceRouter: g,
path: path,
}
}
// 構造回調方法
func (g *SliceGroup) Use(middlewares ...HandlerFunc) *SliceGroup {
g.handlers = append(g.handlers, middlewares...)
existsFlag := false
for _, oldGroup := range g.SliceRouter.groups {
if oldGroup == g {
existsFlag = true
}
}
if !existsFlag {
g.SliceRouter.groups = append(g.SliceRouter.groups, g)
}
return g
}
``` tracelog 中間件 具體代碼
```go
func TraceLogSliceMiddleware() func(c *SliceRouterContext) {
return func(c *SliceRouterContext) {
log.Println("trace_in")
c.Abort()
log.Println("trace_out")
}
}
``` 中間件使用 具體代碼
```go
var addr = "127.0.0.1:2002"
func main() {
reverseProxy := func(c *middleware.SliceRouterContext) http.Handler {
rs1 := "http://127.0.0.1:2003/base"
url1, err1 := url.Parse(rs1)
if err1 != nil {
log.Println(err1)
}
rs2 := "http://127.0.0.1:2004/base"
url2, err2 := url.Parse(rs2)
if err2 != nil {
log.Println(err2)
}
urls := []*url.URL{url1, url2}
return proxy.NewMultipleHostsReverseProxy(c, urls)
}
log.Println("Starting httpserver at " + addr)
sliceRouter := middleware.NewSliceRouter()
sliceRouter.Group("/base").Use(middleware.TraceLogSliceMiddleware(), func(c *middleware.SliceRouterContext) {
c.RespW.Write([]byte("test func"))
})
sliceRouter.Group("/").Use(middleware.TraceLogSliceMiddleware(), func(c *middleware.SliceRouterContext) {
fmt.Println("reverseProxy")
reverseProxy(c).ServeHTTP(c.RespW, c.Req)
})
routerHandler := middleware.NewSliceRouterHandler(nil, sliceRouter)
log.Fatal(http.ListenAndServe(addr, routerHandler))
}
轉自:
troy.wang/docs/golang/posts/golang-gateway/
Go 開發大全
參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ZqrlLRPGBadzFONWUeoMew