高性能消息中間件 NSQ 解析 - nsqd 的實現介紹
nsq 中單個 nsqd 可以有多個 topic,每個 topic 可以有多個 channel。channel 接收這個 topic 所有消息的副本,從而實現多播分發,而 channel 上的每個消息被均勻的分發給它的訂閱者,從而實現負載均衡。
入口函數
首先看下 nsqd 的入口函數:
1//位於 apps/nsqd/main.go:26
2func main() {
3 prg := &program{}
4 if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
5 logFatal("%s", err)
6 }
7}
8
9func (p *program) Init(env svc.Environment) error {
10 if env.IsWindowsService() {
11 dir := filepath.Dir(os.Args[0])
12 return os.Chdir(dir)
13 }
14 return nil
15}
16
17func (p *program) Start() error {
18 opts := nsqd.NewOptions()
19
20 flagSet := nsqdFlagSet(opts)
21 flagSet.Parse(os.Args[1:])
22 ...
23}
24
25
通過第三方 svc 包進行優雅的後臺進程管理,svc.Run() -> svc.Init() -> svc.Start(),啓動 nsqd 實例。
配置項初始化
初始化配置項 (opts, cfg),加載歷史數據 (nsqd.LoadMetadata)、持久化最新數據 (nsqd.PersistMetadata),然後開啓協程,進入 nsqd.Main() 主函數。
1// 位於 apps/nsqd/main.go:64
2options.Resolve(opts, flagSet, cfg)
3 nsqd, err := nsqd.New(opts)
4 if err != nil {
5 logFatal("failed to instantiate nsqd - %s", err)
6 }
7 p.nsqd = nsqd
8
9 err = p.nsqd.LoadMetadata()
10 if err != nil {
11 logFatal("failed to load metadata - %s", err)
12 }
13 err = p.nsqd.PersistMetadata()
14 if err != nil {
15 logFatal("failed to persist metadata - %s", err)
16 }
17
18 go func() {
19 err := p.nsqd.Main()
20 if err != nil {
21 p.Stop()
22 os.Exit(1)
23 }
24 }()
25
26
27
接着就是初始化 tcpServer, httpServer, httpsServer,然後循環監控隊列信息 (n.queueScanLoop)、節點信息管理(n.lookupLoop)、統計信息(n.statsdLoop) 輸出。
1// 位於 nsqd/nsqd.go:262
2 n.tcpServer.nsqd = n
3 n.waitGroup.Wrap(func() {
4 exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
5 })
6 httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
7 n.waitGroup.Wrap(func() {
8 exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
9 })
10 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
11 httpsServer := newHTTPServer(ctx, true, true)
12 n.waitGroup.Wrap(func() {
13 exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
14 })
15 }
16
17 n.waitGroup.Wrap(n.queueScanLoop)
18 n.waitGroup.Wrap(n.lookupLoop)
19 if n.getOpts().StatsdAddress != "" {
20 n.waitGroup.Wrap(n.statsdLoop)
21 }
22
23
處理請求
分別處理 tcp/http 請求,開啓 handler 協程進行併發處理,其中 newHTTPServer 註冊路由採用了 Decorate 裝飾器模式 (後面會進一步解析);
1// 位於 nsqd/http.go:44
2router := httprouter.New()
3 router.HandleMethodNotAllowed = true
4 router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
5 router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
6 router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
7 s := &httpServer{
8 ctx: ctx,
9 tlsEnabled: tlsEnabled,
10 tlsRequired: tlsRequired,
11 router: router,
12 }
13
14 router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
15 router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
16
17 // v1 negotiate
18 router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
19 router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
20 router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))
21
22 // only v1
23 router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
24 router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
25
26
http-Decorate 路由分發
1// 位於 internal/protocol/tcp_server.go:22
2for {
3 clientConn, err := listener.Accept()
4 if err != nil {
5 if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
6 logf(lg.WARN, "temporary Accept() failure - %s", err)
7 runtime.Gosched()
8 continue
9 }
10 // theres no direct way to detect this error because it is not exposed
11 if !strings.Contains(err.Error(), "use of closed network connection") {
12 return fmt.Errorf("listener.Accept() error - %s", err)
13 }
14 break
15 }
16 go handler.Handle(clientConn)
17 }
18
19
如上的實現爲 tcp-handler 處理的主要代碼。
tcp 解析協議
tcp 解析 V2 協議,走內部協議封裝的 prot.IOLoop(conn) 進行處理;
1// 位於 nsqd/tcp.go:34
2var prot protocol.Protocol
3 switch protocolMagic {
4 case " V2":
5 prot = &protocolV2{ctx: p.ctx}
6 default:
7 protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
8 clientConn.Close()
9 p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
10 clientConn.RemoteAddr(), protocolMagic)
11 return
12 }
13
14 err = prot.IOLoop(clientConn)
15 if err != nil {
16 p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
17 return
18 }
19
20
消息生成與消費
通過內部協議進行 p.Exec(執行命令)、p.Send(發送結果),保證每個 nsqd 節點都能正確的進行消息生成與消費,一旦上述過程有 error 都會被捕獲處理,確保分佈式投遞的可靠性。
1// 位於 nsqd/protocol_v2.go:79
2params := bytes.Split(line, separatorBytes)
3
4 p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
5
6 var response []byte
7 response, err = p.Exec(client, params)
8 if err != nil {
9 ctx := ""
10 if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
11 ctx = " - " + parentErr.Error()
12 }
13 p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
14
15 sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
16 if sendErr != nil {
17 p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
18 break
19 }
20
21 // errors of type FatalClientErr should forceably close the connection
22 if _, ok := err.(*protocol.FatalClientErr); ok {
23 break
24 }
25 continue
26 }
27
28 if response != nil {
29 err = p.Send(client, frameTypeResponse, response)
30 if err != nil {
31 err = fmt.Errorf("failed to send response - %s", err)
32 break
33 }
34 }
35
36
nsqd 也會同時開啓 tcp 和 http 服務,兩個服務都可以提供給生產者和消費者,http 服務還提供給 nsqadmin 獲取該 nsqd 本地 topic 和 channel 信息。
小結
本文主要介紹 nsqd,總的來說 nsqd 的實現並不複雜。nsqd 是一個守護進程,負責接收(生產者 producer )、排隊(最小堆實現)、投遞(消費者 consumer )消息給客戶端。nsqd 可以獨立運行,但通常是由 nsqlookupd 實例所在集羣配置的。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Ym4Xp0IUFI5MEVV6PLCDig