使用 Nginx 對 LLM 服務進行負載均衡實踐

想象一下有很多的算法服務實例,負載不均衡會造成:

需要將請求分發到不同的節點進行處理,讓每個節點的負載在合適的水平,這就是負載均衡。

1. 簡介

nginx 是一款開源的、高性能的 Web 服務器,同時也廣泛用作 反向代理服務器、負載均衡器 和 HTTP 緩存。 它的設計目標是解決傳統服務器(如 Apache)在高併發場景下的性能瓶頸,現已成爲全球最流行的 Web 服務器之一。

特點:

2. Nginx 安裝

以 ubuntu 爲例

sudo apt install nginx

編輯配置文件 /etc/nginx/nginx.conf,向 http 添加以下內容

http {
    upstream backend {
        least_conn;  # 均衡算法
        server 127.0.0.1:8001;  # 後端服務1
        server 127.0.0.1:8002;  # 後端服務2
    }
    server {
        listen 80;
        location / {
            proxy_pass http://backend;
        }
    }
    
    log_format upstream_log '$remote_addr - $remote_user [$time_local] "$request" '
                           '$status $body_bytes_sent "$http_referer" '
                           '"$http_user_agent" "$http_x_forwarded_for" '
                           'to: $upstream_addr';

    access_log /var/log/nginx/upstream.log upstream_log;
}

完整配置如下:

# cat /etc/nginx/nginx.conf
user www-data;
worker_processes auto;
pid /run/nginx.pid;
include /etc/nginx/modules-enabled/*.conf;

events {
        worker_connections 768;
        # multi_accept on;
}

http {
    ##
    # Basic Settings
    ##

    sendfile on;
    tcp_nopush on;
    types_hash_max_size 2048;
    # server_tokens off;

    # server_names_hash_bucket_size 64;
    # server_name_in_redirect off;

    include /etc/nginx/mime.types;
    default_type application/octet-stream;

    ##
    # Logging Settings
    ##

    log_format upstream_log '$remote_addr:$remote_port $request_uri - $remote_user [$time_local] "$request" '
                           '$status $body_bytes_sent "$http_referer" '
                           '"$http_user_agent" "$http_x_forwarded_for" '
                           'to: $upstream_addr';

    access_log /var/log/nginx/access.log upstream_log;
    error_log /var/log/nginx/error.log;

    ##
    # SSL Settings
    ##

    ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3; # Dropping SSLv3, ref: POODLE
    ssl_prefer_server_ciphers on;

    ##
    # Gzip Settings
    ##

    gzip on;
    # gzip_vary on;
    # gzip_proxied any;
    # gzip_comp_level 6;
    # gzip_buffers 16 8k;
    # gzip_http_version 1.1;
    # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;

    ##
    # Upstream Servers
    ##

    upstream backend {
        least_conn;
        server 127.0.0.1:8001;  # 後端服務1
        server 127.0.0.1:8002;  # 後端服務2
    }

    ##
    # Server Blocks
    ##

    server {
        listen 80;
        server_name localhost;
        location / {
            proxy_pass http://backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            #proxy_http_version 1.1;          # 確保使用 HTTP/1.1 <button data-index="7">
            #proxy_set_header Connection '';
        }
    }
    add_header X-Upstream $upstream_addr always;  # 在響應頭中暴露後端地址
    ##
    # Virtual Host Configs
    ##

    include /etc/nginx/conf.d/*.conf;
    include /etc/nginx/sites-enabled/*;
}

#mail {
#       # See sample authentication script at:
#       # http://wiki.nginx.org/ImapAuthenticateWithApachePhpScript
#
#       # auth_http localhost/auth.php;
#       # pop3_capabilities "TOP" "USER";
#       # imap_capabilities "IMAP4rev1" "UIDPLUS";
#
#       server {
#               listen     localhost:110;
#               protocol   pop3;
#               proxy      on;
#       }
#
#       server {
#               listen     localhost:143;
#               protocol   imap;
#               proxy      on;
#       }
#}

編寫完配置,systemctl reload nginx 重啓,nginx -t 確認配置 ok

3. 均衡算法

以上配置中 least_conn 是均衡算法,可以有多種算法可選

upstream backend {
    server 127.0.0.1:8001 weight=3;  # 60%的請求
    server 127.0.0.1:8002 weight=2;  # 40%的請求
}
upstream backend {
    least_conn;
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
}
upstream backend {
    ip_hash;
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
}

一致性哈希通過哈希環和虛擬節點機制,有效解決了傳統哈希算法在動態環境下的數據遷移和負載不均問題。 其核心在於減少節點變動的影響範圍,並利用虛擬節點實現數據分佈的平衡性,是分佈式系統中實現高可用和可擴展性的關鍵技術

upstream backend {
    hash $request_uri consistent;  # 按請求URI分配
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
}

所有算法均可配合權重使用:

server 127.0.0.1:8001 weight=5 max_fails=3 fail_timeout=30s;

4. 測試負載均衡

開啓兩個 http 服務(端口 8001,8002),使用 python 腳本 server.py 啓動

# server.py
from http.server import BaseHTTPRequestHandler, HTTPServer
import socket

class DebugRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 打印客戶端信息
        client_ip, client_port = self.client_address
        print(f"\n--- 新請求 @{self.server.server_port} ---")
        print(f"[客戶端] {client_ip}:{client_port}")
        print(f"[方法] {self.command}")
        print(f"[路徑] {self.path}")
        print(f"[請求頭] {self.headers}")

        # 返回響應 
        self.send_response(200)
        self.send_header("Content-type", "text/plain")
        self.end_headers()
        self.wfile.write(f"響應來自: {self.server.server_port}".encode())

    def do_POST(self):
        # 處理 POST 請求 
        content_length = int(self.headers["Content-Length"])
        post_data = self.rfile.read(content_length).decode("utf-8")
        
        print(f"\n--- POST 請求 @{self.server.server_port} ---")
        print(f"[數據] {post_data}")
        self.do_GET()  # 複用 GET 響應邏輯

def run(server_class=HTTPServer, handler_class=DebugRequestHandler, port=8001):
    server_address = ("0.0.0.0", port)  # 綁定到所有接口
    httpd = server_class(server_address, handler_class)
    print(f"服務啓動於 0.0.0.0:{port}...")
    httpd.serve_forever()

if __name__ == "__main__":
    # 啓動兩個服務實例(分別在 8001 和 8002 端口)
    import threading
    threading.Thread(target=run, kwargs={"port": 8001}).start()
    threading.Thread(target=run, kwargs={"port": 8002}).start()
python3 server.py

壓力測試

apt install apache2-utils
ab -n 1000 -c 10 http://localhost/

ab 是 Apache Bench 的縮寫,它是 Apache HTTP 服務器項目中的一個性能測試工具,用於對 Web 服務器發起壓力測試。

命令分解

  1. ab:工具名稱(Apache Bench)。

  2. -n 1000:總請求數爲 1000(-n 表示 number of requests)。

  3. -c 10:併發用戶數爲 10(-c 表示 concurrency,即同時發送的請求數)。

  4. http://localhost/:目標測試地址(可以是本地服務或遠程 URL)。

模擬 10 個併發用戶 同時訪問 http://localhost/,每個用戶連續發送請求,直到總請求數達到 1000。通過該命令可以測試:

示例輸出解讀

運行命令後,輸出結果示例如下:

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests


Server Software:        nginx/1.18.0
Server Hostname:        localhost
Server Port:            80

Document Path:          /
Document Length:        18 bytes

Concurrency Level:      10
Time taken for tests:   1.201 seconds
Complete requests:      1000
Failed requests:        0
Total transferred:      178000 bytes
HTML transferred:       18000 bytes
Requests per second:    832.30 [#/sec] (mean)
Time per request:       12.015 [ms] (mean)
Time per request:       1.201 [ms] (mean, across all concurrent requests)
Transfer rate:          144.68 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.0      0       1
Processing:     2   12   1.9     12      26
Waiting:        1   12   1.9     12      25
Total:          2   12   1.9     12      26

Percentage of the requests served within a certain time (ms)
  50%     12
  66%     12
  75%     13
  80%     13
  90%     14
  95%     15
  98%     16
  99%     18
 100%     26 (longest request)

關鍵指標

  1. Requests per second813.01 [#/sec]
    表示服務器每秒能處理 832 個請求(吞吐量越高,性能越好)。

  2. Time per request

    • 12.015 [ms] (mean):每個請求的平均耗時(從用戶角度看)。

    • 1.201 [ms] (mean, across all concurrent requests):服務器平均處理每個請求的時間。

  3. Connection Times
    顯示連接、處理、等待時間的分佈(單位:毫秒)。

  4. Percentage served within a certain time
    響應時間的分佈情況(如 90% 的請求在 14ms 內完成)。

服務端的日誌輸出:我們去統計下兩個服務端的被請求次數:

tail -f /var/log/nginx/access.log
127.0.0.1:48878 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8001
127.0.0.1:48882 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8002
127.0.0.1:48892 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8001
127.0.0.1:48904 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8002
root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 1000 |  grep ":8001" | wc -l
506
root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 1000 |  grep ":8002" | wc -l
494

或者

root@MichaelMing:~# awk '{print $NF}' /var/log/nginx/access.log | sort | uniq -c
    300 "ApacheBench/2.3"
      3 "curl/7.81.0"
  93576 -
    984 127.0.0.1:8001
   1018 127.0.0.1:8002

我們去查看兩個端口號,發現兩個端口的服務被請求的次數是基本均衡的(506/494),Nginx 起到了負載均衡的作用

5. chat 服務例子

假設 MichaelAI有兩個 api 接口 chatcompletions

from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from urllib.parse import urlparse, parse_qs
from datetime import datetime

class MultiAPIHandler(BaseHTTPRequestHandler):
    def send_json_response(self, data, status=200):
        self.send_response(status)
        self.send_header("Content-type", "application/json")
        self.end_headers()
        self.wfile.write(json.dumps(data).encode("utf-8"))
    
    def do_POST(self):
        # 統一入口路由分發
        parsed_path = urlparse(self.path)
        path = parsed_path.path
        
        try:
            content_length = int(self.headers["Content-Length"])
            post_data = self.rfile.read(content_length).decode("utf-8")
            
            print(f"\n--- {path} 請求 @{self.server.server_port} ---")
            print(f"[客戶端] {self.client_address[0]}:{self.client_address[1]}")
            print(f"[數據] {post_data}")
            
            if path == "/chat":
                self.handle_chat(post_data)
            elif path == "/completions":
                self.handle_completions(post_data)
            else:
                self.send_error(404, "API not found")
                
        except Exception as e:
            self.send_json_response({
                "error": str(e),
                "server": self.server.server_port,
                "timestamp": datetime.now().isoformat()
            }, 400)

    def handle_chat(self, data):
        # 模擬對話接口處理
        try:
            input_data = json.loads(data)
            message = input_data.get("message", "")
            
            response = {
                "original": message,
                "response": f"Processed by chat API: {message.upper()}",
                "server": self.server.server_port,
                "timestamp": datetime.now().isoformat()
            }
            self.send_json_response(response)
            
        except json.JSONDecodeError:
            raise ValueError("Invalid JSON format")

    def handle_completions(self, data):
        # 文本補全接口
        response = {
            "completions": [
                {"text": f"Completion 1: {data[:5]}...", "index": 0},
                {"text": f"Completion 2: {data[-5:]}...", "index": 1}
            ],
            "server": self.server.server_port,
            "timestamp": datetime.now().isoformat()
        }
        self.send_json_response(response)

def run(server_class=HTTPServer, handler_class=MultiAPIHandler, port=8001):
    server_address = ("0.0.0.0", port)
    httpd = server_class(server_address, handler_class)
    print(f"多API服務啓動於 0.0.0.0:{port}...")
    httpd.serve_forever()

if __name__ == "__main__":
    import threading
    # 啓動兩個服務實例
    threading.Thread(target=run, kwargs={"port": 8001}).start()
    threading.Thread(target=run, kwargs={"port": 8002}).start()

chat_data.json 示例: {"message": "Hello, this is a test message"}

ab -n 100 -c 10 -T "application/json" -p json_data \
http://localhost:8001/chat

服務端日誌

--- /chat 請求 @8001 ---
[客戶端] 127.0.0.1:54724
[數據] {"message""Hello, this is a test message"}

127.0.0.1 - - [02/Mar/2025 22:41:22] "POST /chat HTTP/1.0" 200 -
text.txt 示例:
你好,我是Michael阿明開發的智能助手!
ab -n 10000 -c 100 -T "text/plain" -p text.txt \
http://localhost:8002/completions

服務端日誌

--- /completions 請求 @8002 ---
[客戶端] 127.0.0.1:47246
[數據] 你好,我是Michael阿明開發的智能助手!

127.0.0.1 - - [02/Mar/2025 22:49:26] "POST /completions HTTP/1.0" 200
# 同時壓測兩個接口
echo "chat接口壓測結果:" && \
ab -n 500 -c 50 -T "application/json" -p json_data http://localhost:80/chat && \
echo "completions接口壓測結果:" && \
ab -n 500 -c 50 -T "text/plain" -p text.txt http://localhost:80/completions

completions 接口調用分佈:兩個機器基本接近

root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 20000 |  grep "completions" | grep 8001 | wc -l
747
root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 20000 |  grep "completions" | grep 8002 | wc -l
755
import requests
import asyncio
import aiohttp

# 同步客戶端(基於requests)
class SyncAPIClient:
    def __init__(self, base_url):
        self.base_url = base_url

    def call_chat(self, message):
        url = f"{self.base_url}/chat"
        data = {"message": message}
        response = requests.post(url, json=data)
        return response.json()

    def call_completions(self, text):
        url = f"{self.base_url}/completions"
        response = requests.post(url, data=text)
        return response.json()

# 異步客戶端(基於aiohttp)
class AsyncAPIClient:
    def __init__(self, base_url):
        self.base_url = base_url

    async def call_chat(self, message):
        url = f"{self.base_url}/chat"
        data = {"message": message}
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json=data) as response:
                return await response.json()

    async def call_completions(self, text):
        url = f"{self.base_url}/completions"
        async with aiohttp.ClientSession() as session:
            async with session.post(url, data=text) as response:
                return await response.json()

# 同步調用示例
sync_client = SyncAPIClient("http://localhost")
chat_response = sync_client.call_chat("Hello, sync chat!")
print("Chat響應:", chat_response)

completion_response = sync_client.call_completions("test text")
print("Completions響應:", completion_response)

# 異步調用示例
async def main():
    async_client = AsyncAPIClient("http://localhost")

    # 併發調用兩個接口 
    chat_task = asyncio.create_task(async_client.call_chat("Hello, async chat!"))
    completion_task = asyncio.create_task(async_client.call_completions("async test text"))

    results = await asyncio.gather(chat_task, completion_task)
    print("異步Chat響應:", results[0])
    print("異步Completions響應:", results[1])

asyncio.run(main())

輸出:

root@MichaelMing:~# python3 /mnt/d/opt/client.py
Chat響應: {'original''Hello, sync chat!''response''Processed by chat API: HELLO, SYNC CHAT!''server': 8001, 'timestamp''2025-03-02T23:11:54.426626'}
Completions響應: {'completions': [{'text''Completion 1: test ...''index': 0}, {'text''Completion 2:  text...''index': 1}]'server': 8002, 'timestamp''2025-03-02T23:11:54.430562'}
異步Chat響應: {'original''Hello, async chat!''response''Processed by chat API: HELLO, ASYNC CHAT!''server': 8001, 'timestamp''2025-03-02T23:11:54.444508'}
異步Completions響應: {'completions': [{'text''Completion 1: async...''index': 0}, {'text''Completion 2:  text...''index': 1}]'server': 8002, 'timestamp''2025-03-02T23:11:54.444973'}

用代碼進行壓力測試

# 併發壓測示例(使用異步客戶端)
async def stress_test():
    client = AsyncAPIClient("http://localhost")
    tasks = []

    for _ in range(100):
        tasks.append(client.call_chat("test message"))
        tasks.append(client.call_completions("test text"))

    responses = await asyncio.gather(*tasks)
    print(f"成功處理 {len(responses)} 個請求")

asyncio.run(stress_test())

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ClT_r1PuZvdMRIBdpB6JWA