如何基於 Go 語言設計一個簡潔優雅的分佈式任務系統

在當今雲計算與微服務盛行的時代,分佈式任務系統已成爲支撐大規模業務的核心基礎設施。今天就來爲大家分享下如何基於 Go 語言從零設計和實現一個架構簡潔且擴展性強的分佈式任務系統。

前置概念

本文會設計並實現一個分佈式任務系統,這裏我們要先明確兩個概念。

有了這兩個前置概念,我們再來分析下在 Go 中如何實現分佈式和如何處理異步任務。

異步任務

在 Go 中,要處理異步任務有多種方式,比如原生支持的 time.Sleeptime.Timer 或 time.Ticke,再比如一些第三方包 go-co-op/gocron/v2robfig/cron/v3 或 bamzi/jobrunner 等。本項目在調研過後決定採用 robfig/cron/v3 包(以下簡稱 cron)來處理異步任務,原因如下:

對於 cron 包的使用,可以參考我的另一篇文章「在 Go 中使用 cron 執行定時任務」,裏面有詳細說明。

分佈式

既然我們的任務系統是分佈式的,那麼必然要考慮併發安全問題。當多個副本同時讀寫系統資源時,很容易產生競態問題。在分佈式場景中,解決競態問題最常用的手段當然是分佈式鎖。

Go 中的分佈式鎖解決方案也很多,常見的有基於 etcd、Redis、ZooKeeper 等中間件來實現的,因爲 Redis 在系統中更加常用,所以本項目採用基於 Redis 實現分佈式鎖的解決方案。Go 中有兩個比較常用的第三方包 bsm/redislock 和 go-redsync/redsync 都是基於 Redis 的分佈式鎖實現。本項目在調研過後決定採用 go-redsync/redsync 包(以下簡稱 redsync),原因如下:

對於 redsync 包的使用,可以參考我的另一篇文章「在 Go 中如何使用分佈式鎖解決併發問題?」,裏面有詳細說明。

分佈式任務系統

現在我們對分佈式任務系統中的分佈式和任務都有了明確的認識,並且找到了解決方案。那麼接下來就可以設計並實現分佈式任務系統了。

功能介紹

我們要實現的分佈式任務系統叫 nightwatch,nightwatch 是守夜、值班的意思,那麼這套系統的功能也就一目瞭然了,就是用來 24 小時不停機的執行異步任務的。

nightwatch 要實現的主要功能如下:

現在我們有一個系統,用戶可以在 Web 頁面通過表單提交一個 “任務” 到關係型數據庫的任務表中。然後 nightwatch 系統會定時的掃描任務表,取出待執行的任務,並根據任務中的配置,到 Kubernetes 中拉起 Job 資源對象,真正的執行任務。此外,nightwatch 還會取出已經開始執行的任務,然後去 Kubernetes 中獲取當前任務對應的 Job 實時狀態,並回寫到數據庫中。直到 Kubernetes 中的 Job 執行完成(或失敗),nightwatch 會標記 Job 在數據庫表中的任務狀態爲完成(或失敗)。當任務狀態爲完成(或失敗),則任務任務終止,nightwatch 不再掃描出這種狀態的數據。

系統整體架構如下:

nightwatch 是系統中一個非常核心的組件,用來控制任務的執行,並同步任務狀態。

架構設計

現在我們知道了 nightwatch 的作用,那麼就可以設計其實現架構了。

nightwatch 架構設計如下:

首先,我們需要思考一個問題,分佈式鎖應該在何時使用?

在分佈式任務系統中,我們有兩種方式使用分佈式鎖來保證併發安全。一種是在執行具體的定時任務時,多個副本之間進行競爭,誰搶到鎖,誰就可以執行任務,未搶到鎖的副本可以選擇性的跳過此次執行週期。另一種是在 nigthwatch 啓動時,就開始搶鎖,多個副本之間誰搶到鎖,誰就去執行任務調度,未搶到鎖的副本則進行週期性的嘗試搶鎖操作,如果當前執行任務調度的副本被終止,那麼其他副本就有機會搶到鎖,並執行任務調度。

這兩種方式個各自有不同的使用場景,第一種方式的優勢是能夠實現多副本之間的負載均衡,多個副本都在工作,都有可能搶到鎖並執行任務,不過這種方式不能嚴格控制執行任務的間隔時間,比較適合對間隔時間要求不嚴格的任務。第二種方式實際上只有一個副本在執行任務調度,其他副本是空載狀態,是主備設計,這種方式的好處是能夠嚴格控制任務執行的間隔時間。

nigthwatch 採用第二種方式來使用分佈式鎖保證併發安全。所以在 nigthwatch 的架構設計中,在啓動 nigthwatch 時,先將所有的定時任務註冊到任務調度器中,接着就會進行搶鎖操作,只有搶到鎖的副本才能夠執行任務調度。未搶到鎖,則使用一個循環週期性的嘗試搶鎖,直到搶鎖成功。對於搶到鎖的副本,當註冊的任務定時策略達到時,任務調度器就會執行任務。架構圖中的 task 就是我們要實現的異步任務,也是主要業務邏輯,task 組件會從數據庫表中讀取任務,然後在 Kubernetes 中啓動 Job,並同步數據庫和 Kubernetes 資源之間的狀態。

目錄結構

我們現在已經設計好了 nigthwatch 的架構,可以動手進行開發實現了。

以下是 nigthwatch 項目的目錄和文件:

$ tree nightwatch
nightwatch # 項目目錄
├── README.md # README 文件
├── assets # 項目相關的資源目錄
│   ├── docker-compose.yaml # 用於啓動項目依賴的 MariaDB 和 Redis
│   └── schema.sql # 測試數據 SQL
├── cmd # 項目啓動入口
│   └── main.go
├── go.mod
├── go.sum
├── internal # 項目內部包
│   ├── logger.go # 定製日誌
│   ├── nightwatch.go # nightwatch 的實現和啓動入口
│   └── watcher # 任務接口和實現
│       ├── all # 任務註冊入口
│       │   └── all.go
│       ├── config.go # 任務配置
│       ├── task # 任務實現,一個可以定時同步 MariaDB 和 Kubernetes 任務狀態的示例程序
│       │   ├── task.go
│       │   └── watcher.go
│       └── watcher.go # 任務接口
└── pkg # 項目公共包
    ├── db # 數據庫實例
    │   ├── mysql.go
    │   └── redis.go
    ├── meta # 元信息
    │   └── where.go # MariaDB where 查詢條件元信息封裝
    ├── model # 任務模型
    │   └── task.go
    ├── store # 數據庫操作接口
    │   ├── helper.go
    │   ├── store.go
    │   └── task.go
    └── util # 工具包
        └── reflect
            └── reflect.go

14 directories, 21 files

這裏主要的目錄和文件我都標明瞭其用途,不必完全記住,你先有個印象,大概知道整個項目的結構。

調用鏈路

爲了便於你理解代碼,我畫了一張 nigthwatch 項目的調用鏈路圖:

這個調用鏈路圖指明瞭 nigthwatch 項目中所有目錄之間的代碼調用關係。根據這張圖,可以看出這是一個非常簡潔的架構。cmd 中的入口函數main會調用internal中的nigthwatch包,nigthwatch是分佈式系統實現的關鍵所在,這裏實現了任務的註冊和調度,watcher定義了任務的接口,task就是任務的具體實現,task的業務邏輯中會依賴store層來讀寫數據庫,所以store會依賴modeldb

代碼實現

接下來就進入到真正的編碼階段了。

首先我們需要爲 nigthwatch 項目的業務設計一張任務表,建表 SQL 語句如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/assets/schema.sql

CREATE TABLE IF NOTEXISTS `task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(45) NOT NULLDEFAULT'' COMMENT '任務名稱',
  `namespace` varchar(45) NOT NULLDEFAULT'' COMMENT 'k8s namespace 名稱',
  `info` TEXT NOT NULL COMMENT '任務 k8s 相關信息',
  `status` varchar(45) NOT NULLDEFAULT'' COMMENT '任務狀態',
  `user_id` bigint(20) NOT NULLDEFAULT'0' COMMENT '用戶 ID',
  `created_at` datetime NOT NULLDEFAULTCURRENT_TIMESTAMP,
  `updated_at` datetime NOT NULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_name_namespace` (`name`, `namespace`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任務表';

爲了簡化你理解項目的成本,這裏僅定義了最小需要字段。

同時我們可以插入兩條測試數據,用來後續項目功能的驗證:

INSERT INTO `task` (`id`, `name`, `namespace`, `info`, `status`, `user_id`) VALUES (1, 'demo-task-1''default''{"image":"alpine","command":["sleep"],"args":["60"]}''Normal', 1);
INSERT INTO `task` (`id`, `name`, `namespace`, `info`, `status`, `user_id`) VALUES (2, 'demo-task-2''demo''{"image":"busybox","command":["sleep"],"args":["3600"]}''Normal', 2);

拿 ID1task數據舉例,任務名是demo-task-1namespacedefault,鏡像是alpine,執行命令是sleep,命令參數是60,狀態爲Normal表示待執行。當 nigthwatch 服務掃描到這條數據時,就會在 Kubernetes 中default這個namespace下創建一個namedemo-task-1的 Job,其啓動鏡像爲alpine,啓動命令爲sleep 60,即睡眠60 秒然後退出。

現在有了數據庫表和測試數據,我們來看看 nigthwatch 代碼是如何實現的。

入口文件 cmd/main.go 實現如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/cmd/main.go

package main

import (
    "flag"
    "log/slog"
    "path/filepath"
    "time"

    genericapiserver "k8s.io/apiserver/pkg/server"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"

    "github.com/jianghushinian/blog-go-example/nightwatch/internal"
    "github.com/jianghushinian/blog-go-example/nightwatch/pkg/db"
)

func main() {
    slog.SetLogLoggerLevel(slog.LevelDebug)

    var kubecfg *string
    if home := homedir.HomeDir(); home != "" {
        kubecfg = flag.String("kubeconfig", filepath.Join(home, ".kube""config")"Optional absolute path to kubeconfig")
    } else {
        kubecfg = flag.String("kubeconfig""""Absolute path to kubeconfig")
    }

    config, err := clientcmd.BuildConfigFromFlags("", *kubecfg)
    if err != nil {
        slog.Error(err.Error())
        return
    }
    config.QPS = 50
    config.Burst = 100
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        slog.Error(err.Error())
        return
    }

    cfg := nightwatch.Config{
        MySQLOptions: &db.MySQLOptions{
            Host:                  "127.0.0.1:33306",
            Username:              "root",
            Password:              "nightwatch",
            Database:              "nightwatch",
            MaxIdleConnections:    100,
            MaxOpenConnections:    100,
            MaxConnectionLifeTime: time.Duration(10) * time.Second,
        },
        RedisOptions: &db.RedisOptions{
            Addr:         "127.0.0.1:36379",
            Username:     "",
            Password:     "nightwatch",
            Database:     0,
            MaxRetries:   3,
            MinIdleConns: 0,
            DialTimeout:  5 * time.Second,
            ReadTimeout:  3 * time.Second,
            WriteTimeout: 3 * time.Second,
            PoolSize:     10,
        },
        Clientset: clientset,
    }

    nw, err := cfg.New()
    if err != nil {
        slog.Error(err.Error())
        return
    }

    stopCh := genericapiserver.SetupSignalHandler()
    nw.Run(stopCh)
}

因爲 main.go 非常重要,是整個程序的入口,所以我就把完整代碼都貼出來了,包括 import 部分,這是爲了讓你對項目文件之間的依賴關係有一個更清晰的認知,後續講解的其他模塊我就只會貼出核心代碼。

main 函數的核心功能如下:

首先會初始化各種依賴包,初始化 Kubernetes clientset 用於後續操作 Job,初始化 MySQL 用於從中讀取任務和更新任務狀態,初始化 Redis 用於實現分佈式鎖。接着會使用這些初始化的對象創建一個配置對象 nightwatch.Config。然後使用 cfg.New() 創建一個 nightwatch 實例對象 nw。最後調用 nw.Run(stopCh) 啓動服務。這裏爲了做優雅退出,還引用了 Kubernetes genericapiserver 優雅退出機制,你可以參考我的文章「Go 程序如何實現優雅退出?來看看 K8s 是怎麼做的——上篇」、「Go 程序如何實現優雅退出?來看看 K8s 是怎麼做的——下篇」查看其實現原理。

這裏涉及到的 Kubernetes clientset、MySQL 和 Redis 相關的具體配置細節我就不詳細講解了,咱們還是將主要精力聚焦在 nigthwatch 的主脈絡上。

接下來看下 cfg.New() 代碼實現如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// New 通過配置構造一個 nightWatch 對象
func (c *Config) New() (*nightWatch, error) {
    rdb, err := db.NewRedis(c.RedisOptions)
    if err != nil {
        slog.Error(err.Error()"Failed to create Redis client")
        returnnil, err
    }

    logger := newCronLogger()
    runner := cron.New(
        cron.WithSeconds(),
        cron.WithLogger(logger),
        cron.WithChain(cron.SkipIfStillRunning(logger), cron.Recover(logger)),
    )

    pool := goredis.NewPool(rdb)
    lockOpts := []redsync.Option{
        redsync.WithRetryDelay(50 * time.Microsecond),
        redsync.WithTries(3),
        redsync.WithExpiry(defaultExpiration),
    }
    locker := redsync.New(pool).NewMutex(lockName, lockOpts...)

    cfg, err := c.CreateWatcherConfig()
    if err != nil {
        returnnil, err
    }

    nw := &nightWatch{runner: runner, locker: locker, config: cfg}
    if err := nw.addWatchers(); err != nil {
        returnnil, err
    }

    return nw, nil
}

*Config.New 方法會通過配置信息構造一個 nightWatch 對象並返回。這裏的 runner 就是異步任務調度器,使用 cron 包實現,用來調度和執行定時任務。並且這個方法內部還實例化了一個 redsync 分佈式鎖對象 lockernightWatch 對象就是通過 runnerlocker 和 cfg 來構造的。

這裏的核心部分是 addWatchers 的邏輯,其實現如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// 註冊所有 Watcher 實例到 nightWatch
func (nw *nightWatch) addWatchers() error {
    for n, w := range watcher.ListWatchers() {
        if err := w.Init(context.Background(), nw.config); err != nil {
            slog.Error(err.Error()"Failed to construct watcher""watcher", n)
            return err
        }

        spec := watcher.Every3Seconds
        if obj, ok := w.(watcher.ISpec); ok {
            spec = obj.Spec()
        }

        if _, err := nw.runner.AddJob(spec, w); err != nil {
            slog.Error(err.Error()"Failed to add job to the cron""watcher", n)
            return err
        }
    }

    returnnil
}

*nightWatch.addWatchers 方法用來註冊所有 Watcher 對象到調度器 runner 中。

Watcher 是一個接口,定義了異步任務應該實現的方法。Watcher 接口定義如下:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/watcher.go

type Watcher interface {
    Init(ctx context.Context, config *Config) error
    cron.Job
}

type ISpec interface {
    Spec() string
}

var (
    registryLock = new(sync.Mutex)
    registry     = make(map[string]Watcher)
)

func Register(watcher Watcher) {
    registryLock.Lock()
    defer registryLock.Unlock()

    name := reflectutil.StructName(watcher)
    if _, ok := registry[name]; ok {
        panic("duplicate watcher entry: " + name)
    }

    registry[name] = watcher
}

func ListWatchers()map[string]Watcher {
    registryLock.Lock()
    defer registryLock.Unlock()

    return registry
}

可以看到,要實現一個異步任務,需要實現 Init 方法以及 cron.Job 接口。cron.Job 接口其實只有一個方法定義如下:

type Job interface {
    Run()
}

只要滿足 Watcher 接口的任務,就可以通過 Register 函數註冊到 registry 中。ListWatchers 函數則可以返回註冊到 registry 中全部任務。而 ListWatchers 函數正是在前文講解的 *nightWatch.addWatchers 方法中調用的。

到目前爲止,任務如何被註冊到 nightWatch.runner 的過程我們就串起來了。接下來需要關注的兩個點是,調度器 runner 是何時啓動的,以及是何時調用 Register 函數註冊任務的。

我們先來看調度器 runner 是何時啓動的:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

// Run 執行異步任務,此方法會阻塞直到關閉 stopCh
func (nw *nightWatch) Run(stopCh <-chanstruct{}) {
    ctx := wait.ContextForChannel(stopCh)

    // 循環加鎖,直到加鎖成功,再去啓動任務
    ticker := time.NewTicker(defaultExpiration + (5 * time.Second))
    defer ticker.Stop()
    for {
        err := nw.locker.LockContext(ctx)
        if err == nil {
            slog.Debug("Successfully acquired lock""lockName", lockName)
            break
        }
        slog.Debug("Failed to acquire lock""lockName", lockName, "err", err)
        <-ticker.C
    }

    // 看門狗,實現鎖自動續約
    ticker = time.NewTicker(extendExpiration)
    defer ticker.Stop()
    gofunc() {
        for {
            select {
            case <-ticker.C:
                if ok, err := nw.locker.ExtendContext(ctx); !ok || err != nil {
                    slog.Debug("Failed to extend lock""err", err, "status", ok)
                }
            case <-ctx.Done():
                slog.Debug("Exiting lock watchdog")
                return
            }
        }
    }()

    // 啓動定時任務
    nw.runner.Start()
    slog.Info("Successfully started nightwatch server")

    // 阻塞等待退出信號
    <-stopCh

    nw.stop()
}

在 *nightWatch.Run 方法中,首先會啓動一個無限循環,定時執行嘗試搶鎖操作,直到搶鎖成功。這與前文中講解的 nightwatch 架構設計是一致的。搶到鎖後,就可以執行 nw.runner.Start() 啓動調度器,執行定時任務了。

此外,在 nightwatch 架構圖中沒有體現的一點是,這裏爲分佈式鎖實現了看門狗機制,用來自動續約。關於 redsync 分佈式鎖的自動續約,在我的文章「在 Go 中如何使用分佈式鎖解決併發問題?」中有詳細講解。

而這個 Run 方法,就是在 main 函數中通過 nw.Run(stopCh) 調用的。

我們還剩下一個最後要看的核心邏輯是 task 在何時會調用 Register 註冊到 registry 變量中。

還記得前文中講解的 Watcher 接口麼,*taskWatcher 實現了這個接口:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/task/watcher.go

var _ watcher.Watcher = (*taskWatcher)(nil)

type taskWatcher struct {
    store     store.IStore
    clientset kubernetes.Interface

    wg sync.WaitGroup
}

func (w *taskWatcher) Init(ctx context.Context, config *watcher.Config) error {
    w.store = config.Store
    w.clientset = config.Clientset
    returnnil
}

func (w *taskWatcher) Spec() string {
    return"@every 30s"
}

func init() {
    watcher.Register(&taskWatcher{})
}

taskWatcher 就是 task 任務的具體對象,它實現了 watcher.Watcher 接口。可以發現,Register 函數是在 init 函數中調用的,即 task 包被導入時實現自動註冊。

task 包會在 nightwatch/internal/watcher/all/all.go 文件被導入:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/all/all.go

package all

import (
    // 觸發所有 Watcher 的 init 函數進行註冊
    _ "github.com/jianghushinian/blog-go-example/nightwatch/internal/watcher/task"
)

這裏以匿名包的方式導入 task 包。如果我們還有其他的任務實現,則同樣可以參考 task 包的註冊方式,在這裏以匿名包形式導入,這也是 all 包名的由來,可以註冊全部的任務。

然後會在 nightwatch 中再次以匿名包的方式導入 all 包:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/nightwatch.go

package nightwatch

import (
    ...
    // 觸發 init 函數
    _ "github.com/jianghushinian/blog-go-example/nightwatch/internal/watcher/all"
)

我們可以總結出任務的註冊流程是,nightwatch 包導入 all 包,all 包會導入 task 包,task 包的 init 函數執行就會完成註冊。所以在入口文件 main.go 導入 nightwatch 包的時候,就會觸發任務的註冊。在調用 nw.Run(stopCh) 啓動服務時,所有的任務已經註冊完成了。

taskWatcher 對象的核心邏輯當然就是 Run 方法了:

https://github.com/jianghushinian/blog-go-example/blob/main/nightwatch/internal/watcher/task/watcher.go

// Run 運行 task watcher 任務
func (w *taskWatcher) Run() {
    w.wg.Add(2)

    slog.Debug("Current sync period is start")

    // NOTE: 將 Normal 狀態任務在 Kubernetes 中啓動
    gofunc() {
        defer w.wg.Done()
        ctx := context.Background()

        _, tasks, err := w.store.Tasks().List(ctx, meta.WithFilter(map[string]any{
            "status": model.TaskStatusNormal,
        }))
        if err != nil {
            slog.Error(err.Error()"Failed to list tasks")
            return
        }

        var wg sync.WaitGroup
        wg.Add(len(tasks))
        for _, task := range tasks {
            gofunc(task *model.Task) {
                defer wg.Done()
                job, err := w.clientset.BatchV1().Jobs(task.Namespace).Create(ctx, toJob(task), metav1.CreateOptions{})
                if err != nil {
                    slog.Error(err.Error()"Failed to create job")
                    return
                }

                task.Status = model.TaskStatusPending
                if err := w.store.Tasks().Update(ctx, task); err != nil {
                    slog.Error(err.Error()"Failed to update task status")
                    return
                }
                slog.Info("Successfully created job""namespace", job.Namespace, "name", job.Name)
            }(task)
        }
        wg.Wait()
    }()

    // NOTE: 同步中間狀態的任務在 Kubernetes 中的狀態到表中
    gofunc() {
        defer w.wg.Done()
        ctx := context.Background()

        _, tasks, err := w.store.Tasks().List(ctx, meta.WithFilterNot(map[string]any{
            // 排除這幾個狀態
            "status": []model.TaskStatus{model.TaskStatusNormal, model.TaskStatusSucceeded, model.TaskStatusFailed},
        }))
        if err != nil {
            slog.Error(err.Error()"Failed to list tasks")
            return
        }

        var wg sync.WaitGroup
        wg.Add(len(tasks))
        for _, task := range tasks {
            gofunc(task *model.Task) {
                defer wg.Done()
                job, err := w.clientset.BatchV1().Jobs(task.Namespace).Get(ctx, task.Name, metav1.GetOptions{})
                if err != nil {
                    slog.Error(err.Error()"Failed to get task")
                    return
                }

                task.Status = toTaskStatus(job)
                if err := w.store.Tasks().Update(ctx, task); err != nil {
                    slog.Error(err.Error()"Failed to update task status")
                    return
                }
                slog.Info("Successfully sync job status to task""namespace", job.Namespace, "name", job.Name, "status", task.Status)
            }(task)
        }
        wg.Wait()
    }()

    w.wg.Wait()
    slog.Debug("Current sync period is complete")
}

Run 方法就是用來實現每個 watcher 對象的業務邏輯。比如這裏就實現了 task 任務的業務邏輯,它包含兩個功能,在 Run 方法的上半部分代碼中啓動了第一個 goroutine 用來實現將 Normal 狀態任務在 Kubernetes 中啓動,下半部分代碼中啓動了第二個 goroutine 用來實現同步已運行的任務在 Kubernetes 中的 Job 狀態到數據庫表中。

至此,nightwatch 項目就講解完成了。我們一起實現了一個架構簡潔且擴展性強的分佈式任務系統。關於 nightwatch 項目中更多的代碼細節你可以跳轉到我的 GitHub 倉庫中查看。

聯繫我

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Yr2d5NeBRvBNmthH0V8X4Q