Golang 百萬級併發 Server

【導讀】本文作者介紹了幾種服務端實現,並記錄瞭如何迭代、讓應用能夠服務更大流量的操作。

  1. go 語言最大的優勢在於能夠支持百萬級高併發,最關鍵的在於 go 使用了比線程更輕量級的類似與協程的 goroutine,每一個 goroutine 初始分配的棧空間只有 2k,開啓一個 goroutine 方式很簡單:
func main() {
 go func(s string) {
  fmt.Println(s)
 }("hello world!")
}

關鍵字 go 後面加上方法,第一種處理請求的方式就是每收到一個請求,開啓一個 goroutine,並且該 goroutine 處理請求,直到請求處理完成:

func main() {
 router := gin.Default()
 router.Handle("POST""/submit", submit)
 router.Run(":8080")
}

func submit(ctx *gin.Context) {
 if err := ctx.Request.ParseForm(); err != nil {
  ctx.String(http.StatusBadRequest, "%s""failure")
  return
 }
 message := ctx.PostForm("message")
 go func(msg string) {
  fmt.Println("上傳信息的處理 ", msg)
 }(message)
}

這種方式最簡單,但是隻適用於中小流量的業務,一旦業務請求的處理內容比較多、流量比較大,程序很快就撐不住了。

  1. goroutine 之間進行通信使用 channel:
const MAX_QUEUE = 256

func main() {
 channel := make(chan string, MAX_QUEUE)

 go func(msg string) {
  channel <- msg //向信道中存消息
 }("ping")

 msg := <- channel //從信道中去消息
 fmt.Println(msg)
}

第二種方式就是使用 channel,加入緩衝隊列的內容,每次收到一個請求,將一些工作放入到隊列中,每次從隊列中拿出工作進行處理:

const MAX_QUEUE = 256

var channel chan string

func main() {
 go startProcessor()
 router := gin.Default()
 router.Handle("POST""/submit", submit)
 router.Run(":8080")
}

func init() {
 channel = make(chan string, MAX_QUEUE)
}

func submit(ctx *gin.Context) {
 if err := ctx.Request.ParseForm(); err != nil {
  ctx.String(http.StatusBadRequest, "%s""failure")
  return
 }
 message := ctx.PostForm("message")
 channel <- message
 ctx.String(http.StatusOK, "%s""success")
}

func startProcessor() {
 for {
  select {
  case msg := <-channel:
   fmt.Println("上傳信息的處理 ", msg)
  }
 }
}

這種方式適用於請求訪問的速率小於等於隊列執行的任務速率的情況,如果請求訪問的速率遠遠大於隊列執行任務的速率,很快緩衝隊列就滿了,後續的請求就會阻塞。

  1. 使用 Job/Worker 模式,這種可以認爲是 go 使用 channel 實現了一個工作線程池,兩層的通道系統,一個通道用作任務隊列,一個通道用到控制處理任務時的併發量
const (
 MAX_QUEUE = 256
 MAX_WORKER = 32
 MAX_WORKER_POOL_SIZE = 5
)

var JobQueue chan string

//用來管理執行管道中的任務
type Worker struct {
 WorkerPool chan chan string
 JobChannel chan string
 quit chan bool
}

func NewWorker(workerPool chan chan string) *Worker {
 return &Worker{
  WorkerPool:workerPool,
  JobChannel:make(chan string),
  quit:    make(chan bool),
 }
}

//開啓當前的 worker,循環上傳 channel 中的 job;並同時監聽 quit 退出標識位,判斷是否關閉該 channel
func (w *Worker) Start() {
 go func() {
  for {
   w.WorkerPool <- w.JobChannel
   select {
   case job := <-w.JobChannel:
    fmt.Println(job)
   case <-w.quit://收到停止信號,退出
    return
   }
  }
 }()
}

//關閉該 channel,這裏是將 channel 標識位設置爲關閉,實際在 worker 執行中關閉
func (w *Worker) Stop() {
 go func() {
  w.quit <- true
 }()
}

type Dispatcher struct {
 WorkerPool chan chan string
 quit       chan bool
}

func NewDispatcher(maxWorkers int) *Dispatcher {
 pool := make(chan chan string, maxWorkers)
 return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) dispatcher() {
 for {
  select {
  case job := <-JobQueue:
   go func(job string) {
    jobChannel := <-d.WorkerPool
    jobChannel <- job
   }(job)
  case <-d.quit:
   return
  }
 }
}

func (d *Dispatcher) Run() {
 for i := 0; i < MAX_WORKER_POOL_SIZE; i++ {
  worker := NewWorker(d.WorkerPool)
  worker.Start()
 }
 go d.dispatcher()
}

func main() {
 dispatcher := NewDispatcher(MAX_WORKER)//創建任務分派器
 dispatcher.Run()//任務分派器開始工作

 router := gin.Default()
 router.Handle("POST""/submit", submit)
 router.Run(":8080")
}

func init() {
 JobQueue = make(chan string, MAX_QUEUE)
}

func submit(ctx *gin.Context) {
 if err := ctx.Request.ParseForm(); err != nil {
  ctx.String(http.StatusBadRequest, "%s""failure")
  return
 }
 message := ctx.PostForm("message")
 JobQueue <- message
 ctx.String(http.StatusOK, "%s""success")
}

轉自:

github.com/guishenbumie/MyBlog/wiki

Go 開發大全

參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/WrdVH530EJHYzkvJBgAMIw