Golang context 示例及分析

爲什麼需要 contex?

在任何語言編程過程中, 都希望有一個上下文對象記錄全局需要的變量信息以及控制信息, 例如下面的例子:

func main() {
    messages := make(chan int, 10)
    done := make(chan bool)

    defer close(messages)
    // consumer
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        for _ = range ticker.C {
            select {
            case <-done:
                fmt.Println("child process interrupt...")
                return
            default:
                fmt.Printf("send message: %d\n", <-messages)
            }
        }
    }()

    // producer
    for i := 0; i < 10; i++ {
        messages <- i
    }
    time.Sleep(5 * time.Second)
    close(done)
    time.Sleep(1 * time.Second)
    fmt.Println("main process exit!")
}

此示例中主要還是需要一個控制信息, 在主協程退出之前, 發出信號關閉子協程. 因爲程序比較簡單, 還看不出 context 的必要性. 如果協程數量增加或者全局變量也逐漸遞增 (實際應用中很常見), 則是否可以使用統一的對象整合這些信息呢?

Context 接口

type Context interface {

    Deadline() (deadline time.Time, ok bool)

    Done() <-chan struct{}

    Err() error

    Value(key interface{}) interface{}
}

符合此 interface 的對象 context_instance, 通過它可以得到 deadline(父協程希望自己 -- 子協程 -- 結束的時間); 通過 Done() 函數可以得到一個 channel, 監聽它判斷是否退出; Err() 函數提供了退出原因; Value() 則可以得到 key 的鍵值.

emptyCtx 存在的意義

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}

Context 接口定義了上下文需要包含的功能屬性, 至於如何實現, 完全是靈活的. 但是試想, 某種情況下, 我只需要設置和獲取 key-value 的功能怎麼辦呢? 或者我只需要控制功能不需要 Value(xxx) 功能? 因爲接口是很霸道的, 定義的功能項必須都要實現, 才能轉化爲接口實例.

如此, 一個巧妙的設計產生了, 定義一個空的祖對象, 實現了空函數 (看似實現卻只是空函數) 用來欺騙編譯器和運行時. emptyCtx 的意義就在此.

valueCtx

// WithValue returns a copy of parent in which the value associated with key is val.
func WithValue(parent Context, key, val interface{}) Context {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    if key == nil {
        panic("nil key")
    }
    if !reflectlite.TypeOf(key).Comparable() {
        panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
    Context
    key, val interface{}
}

valueCtx 通過包含內部類型 Context, 得到了所有 Context 的功能屬性. 類似繼承的概念, valueCtx 繼承了父 Context 的功能. 另外我們知道, 外部類型如果實現了同樣的接口, 則會覆蓋內部類型實現 (類似重寫). 如下:

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {
        return c.val
    }
    return c.Context.Value(key)
}

如此 valueCtx 相當於實現了自己的設置和獲取 key-value 的功能. 往父 Context 直到祖 Context 是另一個巧妙設置, 但是原理比較簡單, 這裏不贅述.

示例

提供一個示例, 但是沒有實際意義, 只是測試:

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"
)

var context_root = context.Background()

func main() {
    context_t := context_root

    for i := 0; i < 10; i++ {
        context_t = context.WithValue(context_t, i, strconv.Itoa(i))
    }

    messages := make(chan string, 10)
    done := make(chan bool)

    defer close(messages)
    // consumer
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        for _ = range ticker.C {
            select {
            case <-done:
                fmt.Println("child process interrupt...")
                return
            default:
                fmt.Printf("send message: %s\n", <-messages)
            }
        }
    }()

    // producer
    for i := 0; i < 10; i++ {
        messages <- context_t.Value(i).(string)
    }
    time.Sleep(5 * time.Second)
    close(done)
    time.Sleep(1 * time.Second)
    fmt.Println("main process exit!")
}

cancelCtx

type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

同上, cancelCtx 也是繼承了 Context 功能, 但是着重實現了 cancel 功能, 可以控制子協程的中斷.

父子協程示例

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"
)

var context_root = context.Background()

func main() {
    context_t := context_root

    for i := 0; i < 10; i++ {
        context_t = context.WithValue(context_t, i, strconv.Itoa(i))
    }

    conext_t, cancel := context.WithCancel(context_t)

    messages := make(chan string, 10)
    //done := make(chan bool)

    defer close(messages)
    // consumer
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        for _ = range ticker.C {
            select {
            //case <-done:
            case <-conext_t.Done():
                fmt.Println("child process interrupt...")
                return
            default:
                fmt.Printf("send message: %s\n", <-messages)
            }
        }
    }()

    // producer
    for i := 0; i < 10; i++ {
        messages <- context_t.Value(i).(string)
    }
    time.Sleep(5 * time.Second)
    //close(done)
    cancel()
    time.Sleep(1 * time.Second)
    fmt.Println("main process exit!")
}

孫協程示例

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"
)

var context_root = context.Background()

func main() {
    context_t := context_root

    for i := 0; i < 100; i++ {
        context_t = context.WithValue(context_t, i, strconv.Itoa(i))
    }

    conext_t, cancel := context.WithCancel(context_t)

    messages := make(chan string, 100)
    //done := make(chan bool)

    defer close(messages)
    // consumer
    go func() {
        conext_grandchild, cancel_g := context.WithCancel(context_t)
        defer cancel_g()
        go func() {
            ticker := time.NewTicker(1 * time.Second)
            for _ = range ticker.C {
                select {
                case <-conext_grandchild.Done():
                    fmt.Println("grandchild process interrupt...")
                    return
                default:
                    fmt.Printf("grandchild got message: %s\n", <-messages)
                }
            }
        }()

        ticker := time.NewTicker(1 * time.Second)
        for _ = range ticker.C {
            select {
            //case <-done:
            case <-conext_t.Done():
                fmt.Println("child process interrupt...")
                return
            default:
                fmt.Printf("child got message: %s\n", <-messages)
            }
        }
    }()

    // producer
    for i := 0; i < 100; i++ {
        messages <- context_t.Value(i).(string)
    }
    time.Sleep(5 * time.Second)
    //close(done)
    cancel()
    time.Sleep(2 * time.Second)
    fmt.Println("main process exit!")
}

此處有個問題, 理論上祖協程關閉父協程時, 應當同時關閉孫協程, 但是沒有?

timerCtx

type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

timerCtx 繼承了 cancelCtx, 另外實現了 deadline 功能

示例

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"
)

var context_root = context.Background()

func main() {
    context_t := context_root

    for i := 0; i < 100; i++ {
        context_t = context.WithValue(context_t, i, strconv.Itoa(i))
    }

    conext_t, cancel := context.WithTimeout(context_t, 5*time.Second)

    messages := make(chan string, 100)
    //done := make(chan bool)

    defer close(messages)
    defer cancel()
    // consumer
    go func() {
        conext_grandchild, cancel_g := context.WithCancel(context_t)
        defer cancel_g()
        go func() {
            ticker := time.NewTicker(1 * time.Second)
            for _ = range ticker.C {
                select {
                case <-conext_grandchild.Done():
                    fmt.Println("grandchild process interrupt...")
                    return
                default:
                    fmt.Printf("grandchild got message: %s\n", <-messages)
                }
            }
        }()

        ticker := time.NewTicker(1 * time.Second)
        for _ = range ticker.C {
            select {
            //case <-done:
            case <-conext_t.Done():
                fmt.Println("child process interrupt...")
                return
            default:
                fmt.Printf("child got message: %s\n", <-messages)
            }
        }
    }()

    // producer
    for i := 0; i < 100; i++ {
        messages <- context_t.Value(i).(string)
    }

    time.Sleep(10 * time.Second)
    fmt.Println("main process exit!")
}

轉自: zhuanlan.zhihu.com/p/443198512

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/AMchVoSG0NO9hGGydh9NFw