高性能消息中間件 NSQ 解析 - nsqd 的實現介紹

nsq 中單個 nsqd 可以有多個 topic,每個 topic 可以有多個 channel。channel 接收這個 topic 所有消息的副本,從而實現多播分發,而 channel 上的每個消息被均勻的分發給它的訂閱者,從而實現負載均衡。

入口函數

首先看下 nsqd 的入口函數:

 1//位於 apps/nsqd/main.go:26
 2func main() {
 3  prg := &program{}
 4  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
 5    logFatal("%s", err)
 6  }
 7}
 8
 9func (p *program) Init(env svc.Environment) error {
10  if env.IsWindowsService() {
11    dir := filepath.Dir(os.Args[0])
12    return os.Chdir(dir)
13  }
14  return nil
15}
16
17func (p *program) Start() error {
18  opts := nsqd.NewOptions()
19
20  flagSet := nsqdFlagSet(opts)
21  flagSet.Parse(os.Args[1:])
22  ...
23}
24
25

通過第三方 svc 包進行優雅的後臺進程管理,svc.Run() -> svc.Init() -> svc.Start(),啓動 nsqd 實例。

配置項初始化

初始化配置項 (opts, cfg),加載歷史數據 (nsqd.LoadMetadata)、持久化最新數據 (nsqd.PersistMetadata),然後開啓協程,進入 nsqd.Main() 主函數。

 1// 位於 apps/nsqd/main.go:64
 2options.Resolve(opts, flagSet, cfg)
 3  nsqd, err := nsqd.New(opts)
 4  if err != nil {
 5    logFatal("failed to instantiate nsqd - %s", err)
 6  }
 7  p.nsqd = nsqd
 8
 9  err = p.nsqd.LoadMetadata()
10  if err != nil {
11    logFatal("failed to load metadata - %s", err)
12  }
13  err = p.nsqd.PersistMetadata()
14  if err != nil {
15    logFatal("failed to persist metadata - %s", err)
16  }
17
18  go func() {
19    err := p.nsqd.Main()
20    if err != nil {
21      p.Stop()
22      os.Exit(1)
23    }
24  }()
25
26
27

接着就是初始化 tcpServer, httpServer, httpsServer,然後循環監控隊列信息 (n.queueScanLoop)、節點信息管理(n.lookupLoop)、統計信息(n.statsdLoop) 輸出。

 1// 位於 nsqd/nsqd.go:262
 2  n.tcpServer.nsqd = n
 3  n.waitGroup.Wrap(func() {
 4    exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
 5  })
 6  httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
 7  n.waitGroup.Wrap(func() {
 8    exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
 9  })
10  if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
11    httpsServer := newHTTPServer(ctx, true, true)
12    n.waitGroup.Wrap(func() {
13      exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
14    })
15  }
16
17  n.waitGroup.Wrap(n.queueScanLoop)
18  n.waitGroup.Wrap(n.lookupLoop)
19  if n.getOpts().StatsdAddress != "" {
20    n.waitGroup.Wrap(n.statsdLoop)
21  }
22
23

處理請求

分別處理 tcp/http 請求,開啓 handler 協程進行併發處理,其中 newHTTPServer 註冊路由採用了 Decorate 裝飾器模式 (後面會進一步解析);

 1// 位於 nsqd/http.go:44
 2router := httprouter.New()
 3  router.HandleMethodNotAllowed = true
 4  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
 5  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
 6  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
 7  s := &httpServer{
 8    ctx:         ctx,
 9    tlsEnabled:  tlsEnabled,
10    tlsRequired: tlsRequired,
11    router:      router,
12  }
13
14  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
15  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
16
17  // v1 negotiate
18  router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
19  router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
20  router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))
21
22  // only v1
23  router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
24  router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
25
26

http-Decorate 路由分發

 1// 位於 internal/protocol/tcp_server.go:22
 2for {
 3    clientConn, err := listener.Accept()
 4    if err != nil {
 5      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
 6        logf(lg.WARN, "temporary Accept() failure - %s", err)
 7        runtime.Gosched()
 8        continue
 9      }
10      // theres no direct way to detect this error because it is not exposed
11      if !strings.Contains(err.Error(), "use of closed network connection") {
12        return fmt.Errorf("listener.Accept() error - %s", err)
13      }
14      break
15    }
16    go handler.Handle(clientConn)
17  }
18
19

如上的實現爲 tcp-handler 處理的主要代碼。

tcp 解析協議

tcp 解析 V2 協議,走內部協議封裝的 prot.IOLoop(conn) 進行處理;

 1// 位於 nsqd/tcp.go:34
 2var prot protocol.Protocol
 3  switch protocolMagic {
 4  case "  V2":
 5    prot = &protocolV2{ctx: p.ctx}
 6  default:
 7    protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
 8    clientConn.Close()
 9    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
10      clientConn.RemoteAddr(), protocolMagic)
11    return
12  }
13
14  err = prot.IOLoop(clientConn)
15  if err != nil {
16    p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
17    return
18  }
19
20

消息生成與消費

通過內部協議進行 p.Exec(執行命令)、p.Send(發送結果),保證每個 nsqd 節點都能正確的進行消息生成與消費,一旦上述過程有 error 都會被捕獲處理,確保分佈式投遞的可靠性。

 1// 位於 nsqd/protocol_v2.go:79
 2params := bytes.Split(line, separatorBytes)
 3
 4    p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
 5
 6    var response []byte
 7    response, err = p.Exec(client, params)
 8    if err != nil {
 9      ctx := ""
10      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
11        ctx = " - " + parentErr.Error()
12      }
13      p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
14
15      sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
16      if sendErr != nil {
17        p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
18        break
19      }
20
21      // errors of type FatalClientErr should forceably close the connection
22      if _, ok := err.(*protocol.FatalClientErr); ok {
23        break
24      }
25      continue
26    }
27
28    if response != nil {
29      err = p.Send(client, frameTypeResponse, response)
30      if err != nil {
31        err = fmt.Errorf("failed to send response - %s", err)
32        break
33      }
34    }
35
36

nsqd 也會同時開啓 tcp 和 http 服務,兩個服務都可以提供給生產者和消費者,http 服務還提供給 nsqadmin 獲取該 nsqd 本地 topic 和 channel 信息。

小結

本文主要介紹 nsqd,總的來說 nsqd 的實現並不複雜。nsqd 是一個守護進程,負責接收(生產者 producer )、排隊(最小堆實現)、投遞(消費者 consumer )消息給客戶端。nsqd 可以獨立運行,但通常是由 nsqlookupd 實例所在集羣配置的。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Ym4Xp0IUFI5MEVV6PLCDig