如何實現 ChatGPT 的打字機效果

打字機效果

最近在搭建 chat gpt 代理的時候,發現自己的配置雖然能夠調通接口,返回數據,但是結果是一次性顯示出來的,不像之前的 chat gpt 的官網 demo 那樣實現了打字機效果,一個字一個字出來。

所以研究了一下 chat gpt 打字機效果的原理,後續如果要實現類似的效果可以借鑑。

純前端實現打字機效果

最開始我搜索打字機效果時,出現的結果大多數是純前端的方案。其原理也很簡單,通過 js 定時把內容輸出到屏幕。下面是 chat gpt 的答案:

前端實現打字機效果可以通過以下步驟:

  1. 將文本內容嵌入到 HTML 元素中,如 div 或 span

  2. 通過 CSS 樣式設置元素的顯示方式爲隱藏(如 display: none;)。

  3. 使用 JavaScript 獲取該元素,並逐個顯示其中的字符。

  4. 使用定時器(如 setInterval() 函數)控制每個字符的出現時間間隔,從而實現逐個逐個顯示的效果。

  5. 當所有字符都被顯示後,停止定時器以避免不必要的計算開銷。

下面是一個簡單的示例代碼:

HTML:

<div id="typewriter">Hello World!</div>

CSS:

#typewriter {
  display: none;
}

JS:

const element = document.getElementById('typewriter');

let i = 0;
const interval = setInterval(() ={
  element.style.display = 'inline';
  element.textContent = element.textContent.slice(0, i++) + '_';

  if (i > element.textContent.length) {
    clearInterval(interval);
    element.textContent = element.textContent.slice(0, -1);
  }
}, 100);

該代碼會將 idtypewriter 的元素中的文本逐個顯示,每個字符之間相隔 100 毫秒。最終顯示完畢後,會將最後一個字符的下劃線去除。

流式輸出

後面我抓包以及查看了 chat gpt 的官方文檔之後,發現事情並沒有這麼簡單。chat gpt 的打字機效果並不是後端一次性返回後,純前端的樣式。而是後端通過流式輸出不斷向前端輸出內容。

在 chat gpt 官方文檔中,有一個參數可以讓它實現流式輸出:

這是一個叫 “event_stream_format” 的協議規範。

event_stream_format(簡稱 ESF)是一種基於 HTTP/1.1 的、用於實現服務器推送事件的協議規範。它定義了一種數據格式,可以將事件作爲文本流發送給客戶端。ESF 的設計目標是提供簡單有效的實時通信方式,以及支持衆多平臺和編程語言。

ESF 數據由多行文本組成,每行用 \n(LF)分隔。其中,每個事件由以下三部分組成:

例如:

event: message 
data: Hello, world! 
id: 123

這個例子表示一個名爲 message 的事件,攜帶着消息內容 Hello, world!,並提供了一個標識符爲 123 的可選參數。

ESF 還支持以下兩種特殊事件類型:

例如:

: This is a comment

retry: 10000

event: update
data: {"status""OK"}

ESF 協議還支持 Last-Event-ID 頭部,它允許客戶端在斷線後重新連接,並從上次連接中斷處恢復。當客戶端連接時,可以通過該頭部將上次最新的事件 ID 傳遞給服務器,以便服務器根據該 ID 繼續發送事件。

ESF 是一種簡單的、輕量級的協議,適用於需要實時數據交換和多方通信的場景。由於其使用了標準的 HTTP/1.1 協議,因此可以輕易地在現有的 Web 基礎設施上實現。

抓包可以發現這個響應長這樣:

可以看到是 data: 加上一個 json,每次的流式數據在 delta 裏面。

http response 中有幾個重要的頭:

其中,keep-alive 是保持客戶端和服務端的雙向通信,這個大家應該都比較瞭解。下面解釋一下另外兩個頭.

這裏其實 openai 返回的是text/event-streamtext/event-stream 是一種流媒體協議,用於在 Web 應用程序中推送實時事件。它的內容是文本格式的,每個事件由一個或多個字段組成,以換行符(\n)分隔。這個 MIME 類型通常用於服務器到客戶端的單向通信,例如服務器推送最新的新聞、股票報價等信息給客戶端。

我這裏使用的開源項目 chatgpt-web 抓的包,請求被 nodejs 包了一層,返回了application/octet-stream (不太清楚這麼做的動機是什麼),它是一種 MIME 類型,通常用於指示某個資源的內容類型爲二進制文件,也就是未知的二進制數據流。該類型通常不會執行任何自定義處理,並且可以由客戶端根據需要進行下載或保存。

Transfer-Encoding: chunked 是一種 HTTP 報文傳輸編碼方式,用於指示報文主體被分爲多個等大小的塊(chunks)進行傳輸。每個塊包含一個十六進制數字的長度字段,後跟一個 CRLF(回車換行符),然後是實際的數據內容,最後以另一個 CRLF 結束。

使用 chunked 編碼方式可以使服務器在發送未知大小的數據時更加靈活,同時也可以避免一些限制整個響應主體大小的限制。當接收端收到所有塊後,會將它們組合起來,解壓縮(如果需要),並形成原始的響應主體。

總之,Transfer-Encoding: chunked 允許服務器在發送 HTTP 響應時,動態地生成報文主體,而不必事先確定其大小,從而提高了通信效率和靈活性。

服務端的實現

作爲 chat gpt 代理

如果寫一個 golang http 服務作爲 chat gpt 的代理,只需要循環掃描 chat gpt 返回的每行結果,每行作爲一個事件輸出給前端就行了。核心代碼如下:

// 設置Content-Type標頭爲text/event-stream  
w.Header().Set("Content-Type""text/event-stream")  
// 設置緩存控制標頭以禁用緩存  
w.Header().Set("Cache-Control""no-cache")  
w.Header().Set("Connection""keep-alive")  
w.Header().Set("Keep-Alive""timeout=5")  
// 循環讀取響應體並將每行作爲一個事件發送到客戶端  
scanner := bufio.NewScanner(resp.Body)  
for scanner.Scan() {  
   eventData := scanner.Text()  
   if eventData == "" {  
      continue  
   }  
   fmt.Fprintf(w, "%s\n\n", eventData)  
   flusher, ok := w.(http.Flusher)  
   if ok {  
      flusher.Flush()  
   } else {  
      log.Println("Flushing not supported")  
   }  
}

自己作爲服務端

這裏模仿 openai 的數據結構,自己作爲服務端,返回流式輸出:

  
const Text = `  
proxy_cache:通過這個模塊,Nginx 可以緩存代理服務器從後端服務器請求到的響應數據。當下一個客戶端請求相同的資源時,Nginx 可以直接從緩存中返回響應,而不必去請求後端服務器。這大大降低了代理服務器的負載,同時也能提高客戶端訪問速度。需要注意的是,使用 proxy_cache 模塊時需要謹慎配置緩存策略,避免出現緩存不一致或者過期的情況。  
  
proxy_buffering:通過這個模塊,Nginx 可以將後端服務器響應數據緩衝起來,並在完整的響應數據到達之後再將其發送給客戶端。這種方式可以減少代理服務器和客戶端之間的網絡連接數,提高併發處理能力,同時也可以防止後端服務器過早關閉連接,導致客戶端無法接收到完整的響應數據。  
  
綜上所述, proxy_cache 和 proxy_buffering 都可以通過緩存技術提高代理服務器性能和安全性,但需要注意合理的配置和使用,以避免潛在的緩存不一致或者過期等問題。同時, proxy_buffering 還可以通過緩衝響應數據來提高代理服務器的併發處理能力,從而更好地服務於客戶端。  
`  
  
type ChatCompletionChunk struct {  
   ID      string `json:"id"`  
   Object  string `json:"object"`  
   Created int64  `json:"created"`  
   Model   string `json:"model"`  
   Choices []struct {  
      Delta struct {  
         Content string `json:"content"`  
      } `json:"delta"`  
      Index        int     `json:"index"`  
      FinishReason *string `json:"finish_reason"`  
   } `json:"choices"`  
}  
  
func handleSelfRequest(w http.ResponseWriter, r *http.Request) {  
   // 設置Content-Type標頭爲text/event-stream  
   w.Header().Set("Content-Type""text/event-stream")  
   // 設置緩存控制標頭以禁用緩存  
   w.Header().Set("Cache-Control""no-cache")  
   w.Header().Set("Connection""keep-alive")  
   w.Header().Set("Keep-Alive""timeout=5")  
   w.Header().Set("Transfer-Encoding""chunked")  
   // 生成一個uuid  
   uid := uuid.NewString()  
   created := time.Now().Unix()  
  
   for i, v := range Text {  
      eventData := fmt.Sprintf("%c", v)  
      if eventData == "" {  
         continue  
      }  
      var finishReason *string  
      if i == len(Text)-1 {  
         temp := "stop"  
         finishReason = &temp  
      }  
      chunk := ChatCompletionChunk{  
         ID:      uid,  
         Object:  "chat.completion.chunk",  
         Created: created,  
         Model:   "gpt-3.5-turbo-0301",  
         Choices: []struct {  
            Delta struct {  
               Content string `json:"content"`  
            } `json:"delta"`  
            Index        int     `json:"index"`  
            FinishReason *string `json:"finish_reason"`  
         }{  
            {               Delta: struct {  
                  Content string `json:"content"`  
               }{  
                  Content: eventData,  
               },  
               Index:        0,  
               FinishReason: finishReason,  
            },  
         },  
      }  
  
      fmt.Println("輸出:" + eventData)  
      marshal, err := json.Marshal(chunk)  
      if err != nil {  
         return  
      }  
  
      fmt.Fprintf(w, "data: %v\n\n", string(marshal))  
      flusher, ok := w.(http.Flusher)  
      if ok {  
         flusher.Flush()  
      } else {  
         log.Println("Flushing not supported")  
      }  
      if i == len(Text)-1 {  
         fmt.Fprintf(w, "data: [DONE]")  
         flusher, ok := w.(http.Flusher)  
         if ok {  
            flusher.Flush()  
         } else {  
            log.Println("Flushing not supported")  
         }  
      }      time.Sleep(100 * time.Millisecond)  
   }  
}

核心是每次寫進一行數據data: xx \n\n,最終以data: [DONE]結尾。

前端的實現

前端代碼參考 https://github.com/Chanzhaoyu/chatgpt-web 的實現。

這裏核心是使用了 axios 的 onDownloadProgress 鉤子,當 stream 有輸出時,獲取 chunk 內容,更新到前端顯示。

await fetchChatAPIProcess<Chat.ConversationResponse>({  
  prompt: message,  
  options,  
  signal: controller.signal,  
  onDownloadProgress: ({ event }) ={  
    const xhr = event.target  
    const { responseText } = xhr  
    // Always process the final line  
    const lastIndex = responseText.lastIndexOf('\n')  
    let chunk = responseText  
    if (lastIndex !== -1)  
      chunk = responseText.substring(lastIndex)  
    try {  
      const data = JSON.parse(chunk)  
      updateChat(  
        +uuid,  
        dataSources.value.length - 1,  
        {  
          dateTime: new Date().toLocaleString(),  
          text: lastText + data.text ?? '',  
          inversion: false,  
          error: false,  
          loading: false,  
          conversationOptions: { conversationId: data.conversationId, parentMessageId: data.id },  
          requestOptions: { prompt: message, options: { ...options } },  
        },  
      )  
  
      if (openLongReply && data.detail.choices[0].finish_reason === 'length') {  
        options.parentMessageId = data.id  
        lastText = data.text  
        message = ''  
        return fetchChatAPIOnce()  
      }  
  
      scrollToBottom()  
    }  
    catch (error) {  
    //  
    }  
  },  
})

在底層的請求代碼中,設置對應的 header 和參數,監聽 data 內容,回調 onProgress 函數。

const responseP = new Promise((resolve, reject) ={  
  const url = this._apiReverseProxyUrl;  
  const headers = {  
    ...this._headers,  
    Authorization: `Bearer ${this._accessToken}`,  
    Accept: "text/event-stream",  
    "Content-Type""application/json"  
  };  
  if (this._debug) {  
    console.log("POST", url, { body, headers });  
  }  
  fetchSSE(  
    url,  
    {  
      method: "POST",  
      headers,  
      body: JSON.stringify(body),  
      signal: abortSignal,  
      onMessage: (data) ={  
        var _a, _b, _c;  
        if (data === "[DONE]") {  
          return resolve(result);  
        }  
        try {  
          const convoResponseEvent = JSON.parse(data);  
          if (convoResponseEvent.conversation_id) {  
            result.conversationId = convoResponseEvent.conversation_id;  
          }  
          if ((_a = convoResponseEvent.message) == null ? void 0 : _a.id) {  
            result.id = convoResponseEvent.message.id;  
          }  
          const message = convoResponseEvent.message;  
          if (message) {  
            let text2 = (_c = (_b = message == null ? void 0 : message.content) == null ? void 0 : _b.parts) == null ? void 0 : _c[0];  
            if (text2) {  
              result.text = text2;  
              if (onProgress) {  
                onProgress(result);  
              }  
            }  
          }  
        } catch (err) {  
        }  
      }  
    },  
    this._fetch  
  ).catch((err) ={  
    const errMessageL = err.toString().toLowerCase();  
    if (result.text && (errMessageL === "error: typeerror: terminated" || errMessageL === "typeerror: terminated")) {  
      return resolve(result);  
    } else {  
      return reject(err);  
    }  
  });  
});

nginx 配置

在搭建過程中,我還遇到另一個坑。因爲自己中間有一層 nginx 代理,而**「nginx 默認開啓了緩存,所以導致流式輸出到 nginx 這個地方被緩存了」**,最終前端拿到的數據是緩存後一次性輸出的。同時 gzip 也可能有影響。

這裏可以通過 nginx 配置,把 gzip 和緩存都關掉。

gzip off;

location / {
		proxy_set_header   Host             $host;
		proxy_set_header   X-Real-IP        $remote_addr;
		proxy_set_header   X-Forwarded-For  $proxy_add_x_forwarded_for;
		proxy_cache off;
		proxy_cache_bypass $http_pragma;
		proxy_cache_revalidate on;
		proxy_http_version 1.1;
		proxy_buffering off;
		proxy_pass http://xxx.com:1234;
}

proxy_cacheproxy_buffering 是 Nginx 的兩個重要的代理模塊。它們可以顯著提高代理服務器的性能和安全性。

實測只配置proxy_cache沒有用,配置了proxy_buffering後流式輸出才生效。

關於作者

我是 Yasin,一個愛寫博客的技術人

微信公衆號:編了個程 (blgcheng)

個人網站:https://yasinshaw.com

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ux3927AE-1TN-YYYBDpRtg