5 倍性能提升!如何優化 Golang 定時任務調度

項目中需要使用一個簡單的定時任務調度的框架,最初直接從 GitHub 上搜了一個 star 比較多的,就是 https://github.com/robfig/cron,目前有 8000+ star。剛開始使用的時候發現問題不大,但是隨着單機需要定時調度的任務越來越多,高峯期差不多接近 500QPS,隨着業務的推廣使用,可以預期增長還會比較快,但是已經遇到 CPU 使用率偏高的問題,通過 pprof 分析,很多都是在做排序,看了下這個項目的代碼,整體執行的過程大概如下:

  1. 對所有任務進行排序,按照下次執行時間進行排序

  2. 選擇數組中第一個任務,計算下次執行時間減去當前時間得到時間 t,然後 sleep t

  3. 然後從數組第一個元素開始遍歷任務,如果此任務需要調度的時間 < now,那麼就執行此任務,執行之後重新計算這個任務的 next 執行時間

  4. 每次待執行的任務執行完畢之後,都會重新對這個數組進行排序

  5. 然後再循環從排好序的數組中找到第一個需要執行的任務去執行。

代碼如下:

for {
        // Determine the next entry to run.
        sort.Sort(byTime(c.entries))

        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }

        for {
            select {
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake""now", now)

                // Run every entry whose next time was less than now
                for _, e := range c.entries {
                    if e.Next.After(now) || e.Next.IsZero() {
                        break
                    }
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run""now", now, "entry", e.ID, "next", e.Next)
                }

            case newEntry := <-c.add:
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added""now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue

            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return

            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed""entry", id)
            }

            break
        }
    }

問題就顯而易見了,執行一個任務(或幾個任務)都重新計算 next 執行時間,重新排序,最壞情況就是每次執行 1 個任務,排序一遍,那麼執行 k 個任務需要的時間的時間複雜度就是 O(k*nlogn),這無疑是非常低效的。

於是想着怎麼優化一下這個框架,不難想到每次找最先需要執行的任務就是從一堆任務中找 schedule_time 最小的那一個(設 schedule_time 是任務的執行時間),那麼比較容易想到的思路就是使用最小堆:

  1. 在初始化任務列表的時候就直接構建一個最小堆

  2. 每次執行查看 peek 元素是否需要執行

  3. 需要執行就 pop 堆頂元素,計算 next 執行時間,重新 push 入堆

  4. 不需要執行就 break 到外層循環取堆頂元素,計算 next_time-now() = need_sleep_time,然後 select 睡眠、add、remove 等操作。

我修改爲 min-heap 的方式之後,每次添加任務的時候通過堆的屬性進行 up 和 down 調整,每次添加任務時間複雜度 O(logn),執行 k 個任務時間複雜度是 O(klogn)。經過驗證線上 CPU 使用降低 4~5 倍。CPU 從 50% 左右降低至 10% 左右

優化後的代碼如下,只是其中一部分。

全部的代碼也已經在 github 上已經創建了一個 Fork 的倉庫並推送上去了,全部單測 Case 也都 PASS。感興趣可以點過去看。https://github.com/tovenja/cron

    for {
        // Determine the next entry to run.
        // Use min-heap no need sort anymore


     // 這裏不再需要排序了,因爲add的時候直接進行堆調整
     //sort.Sort(byTime(c.entries))

        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
            //fmt.Printf(" %v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID)
        }

        for {
            select {
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake""now", now)
                // Run every entry whose next time was less than now
                for {
                    e := c.entries.Peek()
                    if e.Next.After(now) || e.Next.IsZero() {
                        break
                    }
                    e = heap.Pop(&c.entries).(*Entry)
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    heap.Push(&c.entries, e)
                    c.logger.Info("run""now", now, "entry", e.ID, "next", e.Next)
                }

            case newEntry := <-c.add:
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                heap.Push(&c.entries, newEntry)
                c.logger.Info("added""now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue

            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return

            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed""entry", id)
            }

            break
        }
    }

轉自:

cnblogs.com/aboutblank/p/14860571.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/HEha-VnFgnXtyjf4wOQU2A