【性能】性能比較:REST vs gRPC vs 異步通信

微服務之間的通信方式對微服務架構內的各種軟件質量因素有重大影響(有關微服務網絡內通信的關鍵作用的更多信息)。溝通方式會影響軟件的性能和效率等功能性需求,以及可變性、可擴展性和可維護性等非功能性需求。因此,有必要考慮不同方法的所有優缺點,以便在具體用例中合理選擇正確的溝通方式。
本文比較了以下樣式:REST、gRPC 和使用消息代理 (RabbitMQ) 的異步通信,在微服務網絡中瞭解它們對軟件的性能影響。溝通方式的一些最重要的屬性(反過來會影響整體表現)是:

數據傳輸格式

雖然使用 AMQP 協議(高級消息隊列協議)的異步通信和 gRPC 通信使用二進制協議進行數據傳輸,但 REST-API 通常以文本格式傳輸數據。與基於文本的協議相比,二進制協議的效率要高得多 [1,2]。因此,使用 gRPC 和 AMQP 進行通信會導致較低的網絡負載,而使用 REST API 時可以預期更高的網絡負載。

連接處理

REST-API 通常建立在 HTTP/1.1 協議之上,而 gRPC 依賴於 HTTP/2 協議的使用。HTTP/1.1、HTTP/2 以及 AMQP 都在傳輸層使用 TCP 來確保穩定的連接。要建立這樣的連接,需要在客戶端和服務器之間進行詳細的通信。這些性能影響同樣適用於所有溝通方式。但是,對於 AMQP 或 HTTP/2 連接,通信連接的初始建立只需要執行一次,因爲這兩種協議的請求都可以多路複用。這意味着可以將現有連接重用於使用異步或 gRPC 通信的後續請求。另一方面,使用 HTTP/1.1 的 REST-API 爲與遠程服務器的每個請求建立新連接。

Necessary communication to establish a TCP-Connection

消息序列化

通常,在通過網絡傳輸消息之前,使用 JSON 執行 REST 和異步通信以進行消息序列化。另一方面,gRPC 默認以協議緩衝區格式傳輸數據。協議緩衝區通過允許使用更高級的序列化和反序列化方法來編碼和使用消息內容 [1] 來提高通信速度。然而,選擇正確的消息序列化格式取決於工程師。關於性能,protocol buffers 有很多優勢,但是當必須調試微服務之間的通信時,依賴人類可讀的 JSON 格式可能是更好的選擇。

緩存

有效的緩存策略可以顯着減少服務器的負載和必要的計算資源。由於其架構,REST-API 是唯一允許有效緩存的通信方式。REST-API 響應可以被其他服務器和緩存代理(如 Varnish)緩存和複製。這減少了 REST 服務的負載並允許處理大量的 HTTP 流量 [1]。但是,這隻有在基礎架構上部署更多服務(緩存代理)或使用第三方集成後纔有可能。gRPC 官方文檔和 RabbitMQ 文檔都沒有介紹任何形式的緩存。

負載均衡

除了臨時存儲響應之外,還有其他技術可以提高服務速度。負載均衡器(例如 mod_proxy)可以高效透明的方式在服務之間分配 HTTP 流量 [1]。這可以實現使用 REST API 的服務的水平擴展。Kubernetes 作爲容器編排解決方案,無需任何調整即可對 HTTP/1.1 流量進行負載均衡。另一方面,對於 gRPC,需要在網絡上提供另一個服務(linkerd)[3]。異步通信無需進一步的幫助即可支持負載平衡。消息代理本身扮演負載均衡器的角色,因爲它能夠將請求分發到同一服務的多個實例。消息代理爲此目的進行了優化,並且它們的設計已經考慮到它們必須具有特別可擴展性的事實 [1]。

實驗

爲了能夠評估各個通信方法對軟件質量特性的影響,開發了四個微服務來模擬電子商務平臺的訂單場景。

微服務部署在由三個不同服務器組成的自託管 Kubernetes 集羣上。服務器通過千兆 (1000 Mbit/s) 網絡連接,位於同一數據中心,服務器之間的平均延遲爲 0.15 毫秒。每次實驗運行時,各個服務都部署在相同的服務器上。這種行爲是通過 pod 親和性來實現的。
所有微服務都是用 GO 編程語言實現的。個別服務的實際業務邏輯,例如與數據庫的通信,爲了不被選擇的通信方法之外的其他影響,故意不實現。因此,收集的結果不能代表這種類型的微服務架構,但可以使實驗中的通信方法具有可比性。相反,業務邏輯的實現是通過將程序流程延遲 100 毫秒來模擬的。因此,在通信中,總延遲爲 400 毫秒。
開源軟件 k6 用於實現負載測試。

實現

Golang 標準庫中包含的 net/http 模塊用於提供 REST 接口。使用標準庫中也包含的 encoding/json 模塊對請求進行序列化和反序列化。所有請求都使用 HTTP POST 方法。
“談話很便宜。給我看看密碼。”

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"

    "github.com/google/uuid"
    "gitlab.com/timbastin/bachelorarbeit/common"
    "gitlab.com/timbastin/bachelorarbeit/config"
)

type restServer struct {
    httpClient http.Client
}

func (server *restServer) handler(res http.ResponseWriter, req *http.Request) {
    // only allow post request.
    if req.Method != http.MethodPost {
        bytes, _ := json.Marshal(map[string]string{
            "error": "invalid request method",
        })
        http.Error(res, string(bytes), http.StatusBadRequest)
        return
    }

    reqId := uuid.NewString()

    // STEP 1 / 4
    log.Println("(REST) received new order", reqId)

    var submitOrderDTO common.SubmitOrderRequestDTO

    b, _ := ioutil.ReadAll(req.Body)

    err := json.Unmarshal(b, &submitOrderDTO)
    if err != nil {
        log.Fatalf(err.Error())
    }

    checkIfInStock(1)

    invoiceRequest, _ := http.NewRequest(http.MethodPost, 
    fmt.Sprintf("%s/invoices", config.MustGet("customerservice.rest.address").
     (string)), bytes.NewReader(b))
    // STEP 2
    r, err := server.httpClient.Do(invoiceRequest)
    // just close the response body
    r.Body.Close()
    if err != nil {
        panic(err)
    }

    shippingRequest, _ := http.NewRequest(http.MethodPost, 
    fmt.Sprintf("%s/shipping-jobs", config.MustGet("shippingservice.rest.address").
     (string)), bytes.NewReader(b))

    // STEP 3
    r, err = server.httpClient.Do(shippingRequest)
    // just close the response body
    r.Body.Close()
    if err != nil {
        panic(err)
    }

    handleProductDecrement(1)
    // STEP 5
    res.WriteHeader(201)
    res.Write(common.NewJsonResponse(map[string]string{
        "state": "success",
    }))
}

func startRestServer() {
    server := restServer{
        httpClient: http.Client{},
    }
    http.HandleFunc("/orders", server.handler)
    done := make(chan int)
    go http.ListenAndServe(config.MustGet("orderservice.rest.port").(string), nil)
    log.Println("started rest server")
    <-done
}

RabbitMQ 消息代理用於異步通信,部署在同一個 Kubernetes 集羣上。消息代理和各個微服務之間的通信使用 github.com/spreadway/amqp 庫進行。該庫是 GO 編程語言官方文檔推薦的。

package main

import (
    "encoding/json"
    "log"

    "github.com/streadway/amqp"
    "gitlab.com/timbastin/bachelorarbeit/common"
    "gitlab.com/timbastin/bachelorarbeit/config"
    "gitlab.com/timbastin/bachelorarbeit/utils"
)

func handleMsg(message amqp.Delivery, ch *amqp.Channel) {
    log.Println("(AMQP) received new order")
    var submitOrderRequest common.SubmitOrderRequestDTO
    err := json.Unmarshal(message.Body, &submitOrderRequest)
    utils.FailOnError(err, "could not unmarshal message")

    checkIfInStock(1)

    handleProductDecrement(1)
    ch.Publish(config.MustGet("amqp.billingRequestExchangeName").(string), "", 
     false, false, amqp.Publishing{
        ContentType: "application/json",
        Body:        message.Body,
    })

}

func getNewOrderChannel(conn *amqp.Connection) (*amqp.Channel, string) {
    ch, err := conn.Channel()
    utils.FailOnError(err, "could not create channel")

    ch.ExchangeDeclare(config.MustGet("amqp.newOrderExchangeName").
    (string), "fanout", false, false, false, false, nil)

    queue, err := ch.QueueDeclare(config.MustGet("orderservice.amqp.consumerName").
    (string), false, false, false, false, nil)

    utils.FailOnError(err, "could not create queue")

    ch.QueueBind(queue.Name, "", config.MustGet("amqp.newOrderExchangeName").
    (string), false, nil)
    return ch, queue.Name
}

func startAmqpServer() {
    conn := common.NewAmqpConnection(config.MustGet("amqp.host").(string))
    defer conn.Close()

    orderChannel, queueName := getNewOrderChannel(conn)

    msgs, err := orderChannel.Consume(
        queueName,
        config.MustGet("orderservice.amqp.consumerName").(string),
        true,
        false,
        false,
        false,
        nil,
    )

    utils.FailOnError(err, "could not consume")

    forever := make(chan bool)
    log.Println("started amqp server:", queueName)
    go func() {
        for d := range msgs {
            go handleMsg(d, orderChannel)
        }
    }()
    <-forever
}

gRPC 客戶端和服務器使用 gRPC 文檔推薦的 google.golang.org/grpc 庫。數據的序列化是使用協議緩衝區完成的。

package main

import (
    "log"
    "net"

    "context"

    "gitlab.com/timbastin/bachelorarbeit/common"
    "gitlab.com/timbastin/bachelorarbeit/config"
    "gitlab.com/timbastin/bachelorarbeit/pb"
    "gitlab.com/timbastin/bachelorarbeit/utils"
    "google.golang.org/grpc"
)

type OrderServiceServer struct {
    CustomerService pb.CustomerServiceClient
    ShippingService pb.ShippingServiceClient
    pb.UnimplementedOrderServiceServer
}

func (s *OrderServiceServer) SubmitOrder(ctx context.Context, 
    request *pb.SubmitOrderRequest) (*pb.SuccessReply, error) {
    log.Println("(GRPC) received new order")
    if s.CustomerService == nil {
        s.CustomerService, _ = common.NewCustomerServiceClient()
    }
    if s.ShippingService == nil {
        s.ShippingService, _ = common.NewShippingServiceClient()
    }

    checkIfInStock(1)

    // call the product service on each iteration to decrement the product.
    _, err := s.CustomerService.CreateAndProcessBilling(ctx, &pb.BillingRequest{
        BillingInformation: request.BillingInformation,
        Products:           request.Products,
    })

    utils.FailOnError(err, "could not process billing")

    // trigger the shipping job.
    _, err = s.ShippingService.CreateShippingJob(ctx, &pb.ShippingJob{
        BillingInformation: request.BillingInformation,
        Products:           request.Products,
    })

    utils.FailOnError(err, "could not create shipping job")

    handleProductDecrement(1)

    return &pb.SuccessReply{Success: true}, nil
}

func startGrpcServer() {
    listen, err := net.Listen("tcp", config.MustGet("orderservice.grpc.port").(string))
    if err != nil {
        log.Fatalf("could not listen: %v", err)
    }

    grpcServer := grpc.NewServer()

    orderService := OrderServiceServer{}
    // inject the clients into the server
    pb.RegisterOrderServiceServer(grpcServer, &orderService)

    // start the server
    log.Println("started grpc server")
    if err := grpcServer.Serve(listen); err != nil {
        log.Fatalf("could not start grpc server: %v", err)
    }
}

收集數據

檢查成功和失敗的訂單處理的數量,以確認它們所經過的時間。如果直到確認的持續時間超過 900 毫秒,則訂單流程被解釋爲失敗。選擇此持續時間是因爲在實驗中可能會出現無限長的等待時間,尤其是在使用異步通信時。每次試驗都會報告失敗和成功訂單的數量。
每種架構總共進行了 12 次不同的測量,每種情況下同時請求的數量不同,傳輸的數據量也不同。首先,在低負載下測試每種通信方式,然後在中等負載下,最後在高負載下測試。低負載模擬 10 個,中等負載模擬 100 個,高負載模擬 300 個同時向系統發出的請求。在這六次測試運行之後,要傳輸的數據量會增加,以瞭解各個接口的序列化方法的效率。數據量的增加是通過訂購多個產品來實現的。

結果

gRPC API 架構是實驗中研究的性能最佳的通信方法。在低負載下,它可以接受的訂單數量是使用 REST 接口的系統的 3.41 倍。此外,平均響應時間比 REST-API 低 9.71 毫秒,比 AMQP-API 低 9.37 毫秒。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OuXSfRhMWl4RwuZY7JvY9A