Go 讓消費速度更快
可複用的生產消費邏輯
有時候我們從接口當中接受一批數據,想要它們在後臺運行,或者說同步執行效率更快;有時候從緩存或者隊列中消費數據,想要增加消費的速度;有時候跑一批數據,想要處理效率更高;那麼遇到這些場景,這一套生產消費模式就足以應對了,來看下代碼。
代碼
func ConsumeTask(ctx context.Context) {
LOOP:
var total int
var success int
start := time.Now()
wg := sync.WaitGroup{}
gLock := sync.Mutex{}
taskChan := make(chan Task, 50)
wg.Add(1)
go func() {
defer wg.Done()
for {
// 獲取的長度是0或者錯誤 直接break
// 生產 遇到錯誤continue
// 反序列化
total += 1
taskChan <- task
}
// 結束生產
close(taskChan)
}()
// 多個消費者
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if task, ok := <-taskChan; ok { // 消費
// 。。。。。。
gLock.Lock()
success += 1
gLock.Unlock()
}
} else {// chan關閉了 就退出消費
break
}
}
}()
}
log.Warn(ctx, fmt.Sprintf("消費中"))
wg.Wait()
log.Warn(ctx, fmt.Sprintf("消費結束"))
if success == 0 || total == 0 {
log.Warn(ctx, fmt.Sprintf("當前無待消耗的任務, sleep 10s"))
time.Sleep(10 * time.Second)
goto LOOP
}
larkText := requestcommon.NewLarkCustomBotContentRichText("消費", time.Now().Format("2006-01-02 15:04:05"))
totalText := fmt.Sprintf("總共待消費:%d", total)
failText := fmt.Sprintf("失敗:%d", total-success)
successText := fmt.Sprintf("成功: %d", success)
takeText := fmt.Sprintf("耗時: %v", time.Since(start))
ipText := fmt.Sprintf("IP: %s", common.LocalIP())
// 增加各種指標預警
larkText.AddTextWithTag(totalText).AddTextWithTag(successText).AddTextWithTag(failText).AddTextWithTag(takeText).AddTextWithTag(ipText)
// 通過飛書hook url 發送出去
err := requestcommon.SendLarkCustomBotMsgRichText(ctx, "hook_url", *larkText)
log.Warn(ctx, fmt.Sprintf("飛書發送消費通知 err: %v", err))
goto LOOP
}
小結
這套模板大家可以拿來直接用,簡單高效,歡迎有興趣的同學一起交流哈。
堆棧 future 使很多處於迷茫階段的 coder 能從這裏找到光明,堆棧創世,功在當代,利在千秋
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/UQolMJVv00WeIdj9TL6pxw