用 Go 實現 TCP 連接的雙向拷貝

最簡單的實現

每次來一個 Server 的連接,就新開一個 Client 的連接。用一個 goroutine 從 server 拷貝到 client,再用另外一個 goroutine 從 client 拷貝到 server。任何一方斷開連接,雙向都斷開連接。

func main() {
    runtime.GOMAXPROCS(1)
    listener, err := net.Listen("tcp", "127.0.0.1:8848")
    if err != nil {
        panic(err)
    }
    for {
        conn, err := listener.Accept()
        if err != nil {
            panic(err)
        }
        go handle(conn.(*net.TCPConn))
    }
}

func handle(server *net.TCPConn) {
    defer server.Close()
    client, err := net.Dial("tcp", "127.0.0.1:8849")
    if err != nil {
        fmt.Print(err)
        return
    }
    defer client.Close()
    go func() {
        defer server.Close()
        defer client.Close()
        buf := make([]byte, 2048)
        io.CopyBuffer(server, client, buf)
    }()
    buf := make([]byte, 2048)
    io.CopyBuffer(client, server, buf)
}

一個值得注意的地方是 io.Copy 的默認 buffer 比較大,給一個小的 buffer 可以支持更多的併發連接。

這兩個 goroutine 並序在一個退出之後,另外一個也退出。這個的實現是通過關閉 server 或者 client 的 socket 來實現的。因爲 socket 被關閉了,io.CopyBuffer 就會退出。

Client 端實現連接池

一個顯而易見的問題是,每次 Server 的連接進來之後都需要臨時去建立一個新的 Client 的端的連接。這樣在代理的總耗時裏就包括了一個 tcp 連接的握手時間。如果能夠讓 Client 端實現連接池複用已有連接的話,可以縮短端到端的延遲。

var pool = make(chan net.Conn, 100)

func borrow() (net.Conn, error) {
    select {
    case conn := <- pool:
        return conn, nil
    default:
        return net.Dial("tcp", "127.0.0.1:8849")
    }
}

func release(conn net.Conn) error {
    select {
    case pool <- conn:
        // returned to pool
        return nil
    default:
        // pool is overflow
        return conn.Close()
    }
}

func handle(server *net.TCPConn) {
    defer server.Close()
    client, err := borrow()
    if err != nil {
        fmt.Print(err)
        return
    }
    defer release(client)
    go func() {
        defer server.Close()
        defer release(client)
        buf := make([]byte, 2048)
        io.CopyBuffer(server, client, buf)
    }()
    buf := make([]byte, 2048)
    io.CopyBuffer(client, server, buf)
}

這個版本的實現是顯而易見有問題的。因爲連接在歸還到池裏的時候並不能保證是還保持連接的狀態。另外一個更嚴重的問題是,因爲 client 的連接不再被關閉了,當 server 端關閉連接時,從 client 向 server 做 io.CopyBuffer 的 goroutine 就無法退出了。

所以,有以下幾個問題要解決:

通過 SetDeadline 中斷 Goroutine

一個普遍的觀點是 Goroutine 是無法被中斷的。當一個 Goroutine 在做 conn.Read 時,這個協程就被阻塞在那裏了。實際上並不是毫無辦法的,我們可以通過 conn.Close 來中斷 Goroutine。但是在連接池的情況下,又無法 Close 鏈接。另外一種做法就是通過 SetDeadline 爲一個過去的時間戳來中斷當前正在進行的阻塞讀或者阻塞寫。

var pool = make(chan net.Conn, 100)

type client struct {
    conn net.Conn
    inUse *sync.WaitGroup
}

func borrow() (clt *client, err error) {
    var conn net.Conn
    select {
    case conn = <- pool:
    default:
        conn, err = net.Dial("tcp", "127.0.0.1:18849")
    }
    if err != nil {
        return nil, err
    }
    clt = &client{
        conn: conn,
        inUse: &sync.WaitGroup{},
    }
    return
}

func release(clt *client) error {
    clt.conn.SetDeadline(time.Now().Add(-time.Second))
    clt.inUse.Done()
    clt.inUse.Wait()
    select {
    case pool <- clt.conn:
        // returned to pool
        return nil
    default:
        // pool is overflow
        return clt.conn.Close()
    }
}

func handle(server *net.TCPConn) {
    defer server.Close()
    clt, err := borrow()
    if err != nil {
        fmt.Print(err)
        return
    }
    clt.inUse.Add(1)
    defer release(clt)
    go func() {
        clt.inUse.Add(1)
        defer server.Close()
        defer release(clt)
        buf := make([]byte, 2048)
        io.CopyBuffer(server, clt.conn, buf)
    }()
    buf := make([]byte, 2048)
    io.CopyBuffer(clt.conn, server, buf)
}

通過 SetDeadline 實現了 goroutine 的中斷,然後通過 sync.WaitGroup 來保證這些使用方都退出了之後再歸還給連接池。否則一個連接被複用的時候,之前的使用方可能還沒有退出。

連接有效性

爲了保證在歸還給 pool 之前,連接仍然是有效的。連接在被讀寫的過程中如果發現了 error,我們就要標記這個連接是有問題的,會釋放之後直接 close 掉。但是 SetDeadline 必然會導致讀取或者寫入的時候出現一次 timeout 的錯誤,所以還需要把 timeout 排除掉。

var pool = make(chan net.Conn, 100)

type client struct {
    conn net.Conn
    inUse *sync.WaitGroup
    isValid int32
}

const maybeValid = 0
const isValid = 1
const isInvalid = 2

func (clt *client) Read(b []byte) (n int, err error) {
    n, err = clt.conn.Read(b)
    if err != nil {
        if !isTimeoutError(err) {
            atomic.StoreInt32(&clt.isValid, isInvalid)
        }
    } else {
        atomic.StoreInt32(&clt.isValid, isValid)
    }
    return
}

func (clt *client) Write(b []byte) (n int, err error) {
    n, err = clt.conn.Write(b)
    if err != nil {
        if !isTimeoutError(err) {
            atomic.StoreInt32(&clt.isValid, isInvalid)
        }
    } else {
        atomic.StoreInt32(&clt.isValid, isValid)
    }
    return
}

type timeoutErr interface {
    Timeout() bool
}

func isTimeoutError(err error) bool {
    timeoutErr, _ := err.(timeoutErr)
    if timeoutErr == nil {
        return false
    }
    return timeoutErr.Timeout()
}

func borrow() (clt *client, err error) {
    var conn net.Conn
    select {
    case conn = <- pool:
    default:
        conn, err = net.Dial("tcp", "127.0.0.1:18849")
    }
    if err != nil {
        return nil, err
    }
    clt = &client{
        conn: conn,
        inUse: &sync.WaitGroup{},
        isValid: maybeValid,
    }
    return
}

func release(clt *client) error {
    clt.conn.SetDeadline(time.Now().Add(-time.Second))
    clt.inUse.Done()
    clt.inUse.Wait()
    if clt.isValid == isValid {
        return clt.conn.Close()
    }
    select {
    case pool <- clt.conn:
        // returned to pool
        return nil
    default:
        // pool is overflow
        return clt.conn.Close()
    }
}

func handle(server *net.TCPConn) {
    defer server.Close()
    clt, err := borrow()
    if err != nil {
        fmt.Print(err)
        return
    }
    clt.inUse.Add(1)
    defer release(clt)
    go func() {
        clt.inUse.Add(1)
        defer server.Close()
        defer release(clt)
        buf := make([]byte, 2048)
        io.CopyBuffer(server, clt, buf)
    }()
    buf := make([]byte, 2048)
    io.CopyBuffer(clt, server, buf)
}

判斷 error 是否是 timeout 需要類型強轉來實現。

對於連接池裏的 conn 是否仍然是有效的,如果用後臺不斷 ping 的方式來實現成本比較高。因爲不同的協議要連接保持需要不同的 ping 的方式。一個最簡單的辦法就是下次用的時候試一下。如果連接不好用了,則改成新建一個連接,避免連續拿到無效的連接。通過這種方式把無效的連接給淘汰掉。

轉自:zhuanlan.zhihu.com/p/29657180

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/sEkPeOx-UXwCfkZk5PbdlQ