golang 實現簡單網關

【導讀】使用 golang 做網關開發,本文展示了一種最簡單版本的網管落地實現。

golang 實現簡單網關

網關 = 反向代理 + 負載均衡 + 各種策略,技術實現也有多種多樣,有基於 nginx 使用 lua 的實現,比如 openresty、kong;也有基於 zuul 的通用網關;還有就是 golang 的網關,比如 tyk。

這篇文章主要是講如何基於 golang 實現一個簡單的網關。

  1. 預備

1.1. 準備兩個後端 web 服務

啓動兩個後端 web 服務(代碼)

type RealServer struct {
 Addr string
}

func (r *RealServer) Run() {
 log.Println("start http server at " + r.Addr)
 mux := http.NewServeMux()
 mux.HandleFunc("/", r.EchoHandler)
 mux.HandleFunc("/base/error", r.ErrorHandler)
 mux.HandleFunc("/timeout", r.TimeoutHandler)

 server := &http.Server{
  Addr:         r.Addr,
  WriteTimeout: time.Second * 3,
  Handler:      mux,
 }

 go func() {
  log.Fatal(server.ListenAndServe())
 }()
}

func (r *RealServer) EchoHandler(w http.ResponseWriter, req *http.Request) {
 upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path)
 realIP := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-Ip=%v\n", req.RemoteAddr, req.Header.Get("X-Forwarded-For"), req.Header.Get("X-Real-Ip"))
 header := fmt.Sprintf("headers =%v\n", req.Header)
 io.WriteString(w, upath)
 io.WriteString(w, realIP)
 io.WriteString(w, header)
}

func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
 w.WriteHeader(500)
 io.WriteString(w, "error handler")
}

func (r *RealServer) TimeoutHandler(w http.ResponseWriter, req *http.Request) {
 time.Sleep(6 * time.Second)
 w.WriteHeader(200)
 io.WriteString(w, "timeout handler")
}

func main() {
 rs1 := &RealServer{Addr: "127.0.0.1:2003"}
 rs1.Run()
 rs2 := &RealServer{Addr: "127.0.0.1:2004"}
 rs2.Run()

 quit := make(chan os.Signal)
 signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
 <-quit
}

1.2. 訪問工具

curl -v http://localhost:2002/base
  1. 反向代理

2.1. 單後端(target)反向代理

具體代碼

package main

import (
 "log"
 "net/http"
 "net/http/httputil"
 "net/url"
)

var (
 addr = "127.0.0.1:2002"
)

func main()  {
 rsUrl, _:=url.Parse("http://127.0.0.1:2003/base")
 reversePorxy := httputil.NewSingleHostReverseProxy(rsUrl)
 log.Println("Starting Httpserver at " + addr)
 log.Fatal(http.ListenAndServe(addr, reversePorxy))
}

直接使用基礎庫 httputil 提供的NewSingleHostReverseProxy即可,返回的reverseProxy對象實現了serveHttp方法,因此可以直接作爲 handler。

2.2. 分析反向代理代碼,並添加修改 response 內容

具體代碼

package main

import (
 "bytes"
 "fmt"
 "io/ioutil"
 "log"
 "net/http"
 "net/http/httputil"
 "net/url"
 "strings"
)

var (
 addr = "127.0.0.1:2002"
)

func main()  {
 rsUrl, _:=url.Parse("http://127.0.0.1:2003/base")
 reversePorxy := NewSingleHostReverseProxy(rsUrl)
 log.Println("Starting httpserver at " + addr)
 log.Fatal(http.ListenAndServe(addr, reversePorxy))
}

func NewSingleHostReverseProxy(target *url.URL) *httputil.ReverseProxy {
 targetQuery := target.RawQuery
 director := func(req *http.Request) {
  req.URL.Scheme = target.Scheme
  req.URL.Host = target.Host
  req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
  if targetQuery == "" || req.URL.RawQuery == "" {
   req.URL.RawQuery = targetQuery + req.URL.RawQuery
  } else {
   req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
  }
  if _, ok := req.Header["User-Agent"]; !ok {
   // explicitly disable User-Agent so it's not set to default value
   req.Header.Set("User-Agent""")
  }
  // add when the reverseproxy is the first rp
  req.Header.Set("X-Real-Ip", strings.Split(req.RemoteAddr, ":")[0])
 }

 modifyFunc := func(res *http.Response) error {
  if res.StatusCode != http.StatusOK {
      oldPayLoad, err := ioutil.ReadAll(res.Body)

      if err != nil {
       return err
      }
      newPayLoad := []byte("hello " + string(oldPayLoad))

      res.Body = ioutil.NopCloser(bytes.NewBuffer(newPayLoad))
      res.ContentLength = int64(len(newPayLoad))
      res.Header.Set("Content-Length",fmt.Sprint(len(newPayLoad)))
  }
  return nil
 }
 return &httputil.ReverseProxy{Director: director, ModifyResponse: modifyFunc}
}

func singleJoiningSlash(a, b string) string {
 aslash := strings.HasSuffix(a, "/")
 bslash := strings.HasPrefix(b, "/")
 switch {
 case aslash && bslash:
  return a + b[1:]
 case !aslash && !bslash:
  return a + "/" + b
 }
 return a + b
}

director中定義回調函數,入參爲*http.Request,決定如何構造向後端的請求,比如 host 是否向後傳遞,是否進行 url 重寫,對於 header 的處理,後端 target 的選擇等,都可以在這裏完成。

director在這裏具體做了:

  1. 根據後端 target,構造到後端請求的 url

  2. 選擇性傳遞必要的 header

  3. 設置代理相關的 header,比如X-Forwarded-ForX-Real-Ip

  4. X-Forwarded-For記錄經過的所有代理,以proxyIp01, proxyIp02, proxyIp03的格式記錄,由於是追加,可能被篡改,當然,如果第一代理以覆蓋該頭的方式進行記錄,也是可信的

  5. X-Real-Ip用於記錄客戶端 IP,一般放在第一代理上,用於記錄客戶端的來源公網 IP,可信

modifyResponse中定義回調函數,入參爲*http.Response,用於修改響應的信息,比如響應的 Body,響應的 Header 等信息。

最終依舊是返回一個ReverseProxy,然後將這個對象作爲 handler 傳入即可。

2.3. 支持多個後端服務器

參考 2.2 中的NewSingleHostReverseProxy,只需要實現一個類似的、支持多 targets 的方法即可,具體實現見後面。

  1. 負載均衡

作爲一個網關服務,在上面 2.3 的基礎上,需要支持必要的負載均衡策略,比如:

3.1. 負載均衡算法

3.1.1. 隨機

隨便 random 一個整數作爲索引,然後取對應的地址即可,實現比較簡單。

具體代碼

type RandomN struct {
 rss []string
}

func (r *RandomN) Add(params ...string) error {
 if len(params) != 1 {
  return fmt.Errorf("param length should be one")
 }

 r.rss = append(r.rss, params[0])

 return nil
}

func (r *RandomN) Next() string {
 if len(r.rss) == 0 {
  return ""
 }

 return r.rss[rand.Intn(len(r.rss))]
}

func (r *RandomN) Get(key string) (string, error) {
 return r.Next(), nil
}

3.1.2. 輪詢

使用curIndex進行累加計數,一旦超過 rss 數組的長度,則重置。

具體代碼

type RR struct {
 curIndex int
 rss []string
}

func (r *RR) Add(params ...string) error {
 if len(params) != 1 {
  return fmt.Errorf("param length should be one")
 }

 r.rss = append(r.rss, params[0])

 return nil
}

func (r *RR) Next() string {
 if len(r.rss) == 0 {
  return ""
 }

 if r.curIndex == len(r.rss) {
  r.curIndex = 0
 }

 node := r.rss[r.curIndex]

 r.curIndex++
 return node
}

func (r *RR) Get(key string) (string, error) {
 return r.Next(), nil
}

3.1.3. 加權輪詢

輪詢帶權重,如果使用計數遞減的方式,如果權重是5,1,1那麼後端 rs 依次爲a,a,a,a,a,b,c,a,a,a,a...,其中 a 後端會瞬間壓力過大;參考 nginx 內部的加權輪詢,或者應該稱之爲平滑加權輪詢,思路是:

後端真實節點包含三個權重:

操作步驟:

  1. 計算 curWeight

  2. 選取最大 curWeight 的節點

  3. 重新計算 curWeight

具體代碼

type WeightedRR struct {
 rss []*WeightedNode
}

type WeightedNode struct {
 addr string
 weight int
 curWeight int
 effectiveWeight int
}

func (r *WeightedRR) Add(params ...string) error {
 if len(params) != 2 {
  return fmt.Errorf("param length should be two")
 }

 addr := params[0]
 weight, err := strconv.ParseInt(params[1], 10, 64)

 if err != nil {
  return err
 }

 node := &WeightedNode{
  addr:            addr,
  weight:          int(weight),
  curWeight:       int(weight),
  effectiveWeight: int(weight),
 }

 r.rss = append(r.rss, node)

 return nil
}

func (r *WeightedRR) Next() string {
 // 平滑加權輪詢 --> 1 計算 total, 2 變更臨時權重 3. 選擇最大臨時權重 4。 變更臨時權重
 total := 0
 var best *WeightedNode

 for _, node := range r.rss {
  n := node

  total += n.effectiveWeight

  n.curWeight += n.effectiveWeight

  if best == nil || n.curWeight > best.curWeight {
   best = n
  }
 }

 if best == nil {
  return ""
 }

 best.curWeight -= total

 return best.addr
}

func (r *WeightedRR) Get(key string) (string, error) {
 return r.Next(), nil
}

3.1.4. 一致性 hash

一致性 hash 算法,主要是用於分佈式 cache 熱點 / 命中問題;這裏用於基於某 key 的 hash 值,路由到固定後端,但是隻能是基本滿足流量綁定,一旦後端目標節點故障,會自動平移到環上最近的那麼個節點。

實現:

具體代碼

type Keys []uint32

func (k Keys) Less(i,  j int) bool {
 return k[i] < k[j]
}

func (k Keys) Swap(i, j int)  {
 k[i], k[j] = k[j], k[i]
}

func (k Keys) Len() int {
 return len(k)
}

type ConsistentHash struct {
 mux sync.RWMutex
 hash func(data []byte) uint32
 replicas int
 keys Keys
 hashMap map[uint32]string
}

func NewConsistentHash(replicas int, fn func(data []byte) uint32) *ConsistentHash {
 m := &ConsistentHash{
  hash:     fn,
  replicas: replicas,
  hashMap:  make((map[uint32]string)),
 }

 if m.hash == nil {
  m.hash = crc32.ChecksumIEEE
 }

 return m
}

func (c *ConsistentHash) Add(params ...string) error {
 if len(params) == 0 {
  return errors.New("param len 1 at least")
 }
 addr := params[0]
 c.mux.Lock()
 defer c.mux.Unlock()

 for i := 0; i < c.replicas; i++ {
  hash := c.hash([]byte(strconv.Itoa(i) + addr))
  c.keys = append(c.keys, hash)
  c.hashMap[hash] = addr
 }

 sort.Sort(c.keys)
 return nil
}

func (c *ConsistentHash) IsEmpty() bool {
 return len(c.keys) == 0
}

func (c *ConsistentHash) Get(key string) (string, error) {
 if c.IsEmpty() {
  err := fmt.Errorf("nodes empty")
  return "", err
 }

 hash := c.hash([]byte(key))

 idx := sort.Search(len(c.keys), func(i int) bool {
  return c.keys[i] >= hash
 })

 if idx == len(c.keys) {
  idx = 0
 }
 return c.hashMap[c.keys[idx]], nil
}

3.2. 通用接口 / 工廠模式

type LoadBalanceStrategy interface {
 Add(...string) error
 Get(string) (string, error)
}

每一種不同的負載均衡算法,只需要實現添加以及獲取的接口即可。

type LbType int

const (
 LbRandom LbType = iota
 LbRoundRobin
 LbWeightRoundRobin
 LbConsistentHash
)

func LoadBanlanceFactory(lbType LbType) LoadBalanceStrategy {
 switch lbType {
 case LbRandom:
  return &RandomN{}
 case LbConsistentHash:
  return NewConsistentHash(10, nil)
 case LbRoundRobin:
  return &RR{}
 case LbWeightRoundRobin:
  return &WeightedRR{}
 default:
  return &RR{}
 }
}

然後使用工廠方法,根據傳入的參數,決定使用哪種負載均衡策略。

3.3. 支持負載均衡算法的反向代理實現

具體代碼

func NewMultiTargetsReverseProxy(lb lb_strategy.LoadBalanceStrategy) *httputil.ReverseProxy {
 director := func(req *http.Request) {
  remoteIP := strings.Split(req.RemoteAddr, ":")[0]
  nextAddr, err := lb.Get(remoteIP)

  if err != nil {
   log.Fatal("get next addr fail")
  }

  target, err := url.Parse(nextAddr)
  if err != nil {
   log.Fatal(err)
  }
  targetQuery := target.RawQuery
  req.URL.Scheme = target.Scheme
  req.URL.Host = target.Host
  req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
  if targetQuery == "" || req.URL.RawQuery == "" {
   req.URL.RawQuery = targetQuery + req.URL.RawQuery
  } else {
   req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
  }
  if _, ok := req.Header["User-Agent"]; !ok {
   req.Header.Set("User-Agent""user-agent")
  }
 }

 modifyFunc := func(resp *http.Response) error {
  //請求以下命令:curl 'http://127.0.0.1:2002/error'
  if resp.StatusCode != 200 {
   //獲取內容
   oldPayload, err := ioutil.ReadAll(resp.Body)
   if err != nil {
    return err
   }
   //追加內容
   newPayload := []byte("StatusCode error:" + string(oldPayload))
   resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload))
   resp.ContentLength = int64(len(newPayload))
   resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))
  }
  return nil
 }

 errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
  //todo 如果是權重的負載則調整臨時權重
  http.Error(w, "ErrorHandler error:"+err.Error(), 500)
 }

 return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc}
}

func singleJoiningSlash(a, b string) string {
 aslash := strings.HasSuffix(a, "/")
 bslash := strings.HasPrefix(b, "/")
 switch {
 case aslash && bslash:
  return a + b[1:]
 case !aslash && !bslash:
  return a + "/" + b
 }
 return a + b
}

func main()  {
 rb := lb_strategy.LoadBanlanceFactory(lb_strategy.LbConsistentHash)
 rb.Add("http://127.0.0.1:2003/base")
 rb.Add("http://127.0.0.1:2004/base")
 rb.Add("http://127.0.0.1:2005/base")

 proxy := NewMultiTargetsReverseProxy(rb)

 log.Println("Starting httpserver at " + addr)
 log.Fatal(http.ListenAndServe(addr, proxy))
}
  1. 中間件

作爲網關,中間件必不可少,這類包括請求響應的模式,一般稱作洋蔥模式,每一層都是中間件,一層層進去,然後一層層出來。

中間件的實現一般有兩種,一種是使用數組,然後配合 index 計數;一種是鏈式調用。

4.1. 基於數組的中間件實現

  1. NewSliceRouterHandler 獲取SliceRouterHandler對象,該對象實現了Hanlder接口,可以作爲handler傳入 http 服務
  1. SliceRouterHandler包含coreFunc以及SliceRouter 對象

  2. SliceRouter包含SliceGroup列表

  3. SliceGroup對象包含path以及handlers

  1. 貫穿整條調用鏈的是SliceRouterContext對象,包含:
  1. 中間件中可以調用SliceRouterContext中的Next方法繼續,也可以調用Abort方法進行終止

  2. Abort終止的方式就是設置索引 index 爲abortIndex

具體代碼

const abortIndex int8 = math.MaxInt8 / 2

type HandlerFunc func(*SliceRouterContext)

type SliceRouter struct {
   groups []* SliceGroup
}

type SliceGroup struct {
   *SliceRouter
   path string
   handlers []HandlerFunc
}

// slice router context
type SliceRouterContext struct {
   *SliceGroup
   RespW http.ResponseWriter
   Req *http.Request
   Ctx context.Context
   index int8
}

func newSliceRouterContext(rw http.ResponseWriter, req *http.Request, r *SliceRouter) *SliceRouterContext  {
   newSliceGroup := &SliceGroup{}

   matchUrlLen := 0

   for _, group := range r.groups {
      if strings.HasPrefix(req.RequestURI, group.path) {
         pathLen := len(group.path)
         if pathLen > matchUrlLen {
            matchUrlLen = pathLen
            *newSliceGroup = *group //淺拷貝數組指針
         }
      }
   }

   c := &SliceRouterContext{RespW: rw, Req: req, SliceGroup: newSliceGroup, Ctx: req.Context()}
   c.Reset()
   return c
}

// 獲取上下文值
func (ctx *SliceRouterContext) Get(key interface{}) interface{} {
   return ctx.Ctx.Value(key)
}

// 設置上下文值
func (ctx *SliceRouterContext) Set(key, val interface{}) {
   ctx.Ctx = context.WithValue(ctx.Ctx, key, val)
}

//
func (ctx *SliceRouterContext) Next()  {
   ctx.index++

   for ctx.index < int8(len(ctx.groups)) {
      ctx.handlers[ctx.index](ctx)
      ctx.index++
   }
}

// 重置 handlers 數組計數
func (ctx *SliceRouterContext) Reset()  {
   ctx.index = -1
}

func (ctx *SliceRouterContext) Abort() {
   ctx.index = abortIndex
}

// 是否跳過了回調
func (ctx *SliceRouterContext) IsAborted() bool {
   return ctx.index >= abortIndex
}

// sliceRouterHandler
type SliceRouterHandler struct {
   coreFunc func(*SliceRouterContext) http.Handler
   router   *SliceRouter
}

func NewSliceRouterHandler(coreFunc func(*SliceRouterContext) http.Handler, router *SliceRouter) *SliceRouterHandler {
   return &SliceRouterHandler{
      coreFunc: coreFunc,
      router:   router,
   }
}

func (w *SliceRouterHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
   c := newSliceRouterContext(rw, req, w.router)
   if w.coreFunc != nil {
      c.handlers = append(c.handlers, func(c *SliceRouterContext) {
         w.coreFunc(c).ServeHTTP(rw, req)
      })
   }
   c.Reset()
   c.Next()
}

// 構造 router
func NewSliceRouter() *SliceRouter {
   return &SliceRouter{}
}

// 創建 Group
func (g *SliceRouter) Group(path string) *SliceGroup {
   return &SliceGroup{
      SliceRouter: g,
      path:        path,
   }
}

// 構造回調方法
func (g *SliceGroup) Use(middlewares ...HandlerFunc) *SliceGroup {
   g.handlers = append(g.handlers, middlewares...)
   existsFlag := false
   for _, oldGroup := range g.SliceRouter.groups {
      if oldGroup == g {
         existsFlag = true
      }
   }
   if !existsFlag {
      g.SliceRouter.groups = append(g.SliceRouter.groups, g)
   }
   return g
}
``` tracelog 中間件 具體代碼

```go
func TraceLogSliceMiddleware() func(c *SliceRouterContext) {
   return func(c *SliceRouterContext) {
      log.Println("trace_in")
      c.Abort()
      log.Println("trace_out")
   }
}
``` 中間件使用 具體代碼

```go
var addr = "127.0.0.1:2002"

func main() {
   reverseProxy := func(c *middleware.SliceRouterContext) http.Handler {
      rs1 := "http://127.0.0.1:2003/base"
      url1, err1 := url.Parse(rs1)
      if err1 != nil {
         log.Println(err1)
      }

      rs2 := "http://127.0.0.1:2004/base"
      url2, err2 := url.Parse(rs2)
      if err2 != nil {
         log.Println(err2)
      }

      urls := []*url.URL{url1, url2}
      return proxy.NewMultipleHostsReverseProxy(c, urls)
   }

   log.Println("Starting httpserver at " + addr)

   sliceRouter := middleware.NewSliceRouter()

   sliceRouter.Group("/base").Use(middleware.TraceLogSliceMiddleware(), func(c *middleware.SliceRouterContext) {
      c.RespW.Write([]byte("test func"))

   })

   sliceRouter.Group("/").Use(middleware.TraceLogSliceMiddleware(), func(c *middleware.SliceRouterContext) {
      fmt.Println("reverseProxy")
      reverseProxy(c).ServeHTTP(c.RespW, c.Req)
   })

   routerHandler := middleware.NewSliceRouterHandler(nil, sliceRouter)
   log.Fatal(http.ListenAndServe(addr, routerHandler))
}

轉自:

troy.wang/docs/golang/posts/golang-gateway/

Go 開發大全

參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ZqrlLRPGBadzFONWUeoMew