不知如何實現一個定時器?看這一篇就夠了

定時器作用

定時器在各種場景都需要用到,比如遊戲的 Buff 實現,Redis 中的過期任務,Linux 中的定時任務等等。顧名思義,定時器的主要用途是執行定時任務。

定時器數據結構選取

定時器數據結構要求:

以下爲各數據結構時間複雜度表現

有序鏈表:插入O(n),刪除O(1),過期expire執行O(1)

最小堆:插入O(logn),刪除O(logn),過期expire執行O(1)

紅黑樹:插入O(logn),刪除O(logn),過期expire執行O(logn)

哈希表 + 鏈表(時間輪):插入O(1),刪除O(1),過期expire平均執行O(1)(最壞爲O(n)

不同開源框架定時器實現方式不一,如,libuv採用最小堆來實現,nginx採用紅黑樹實現,linux內核和skynet採用時間輪算法實現等等。

定時器接口封裝

作爲定時器,需要封裝以下 4 類接口給用戶使用:

其中執行到期任務有兩種工作方式:

  1. 輪詢: 每隔一個時間片去查找哪些任務到期

  2. 睡眠 / 喚醒:不停查找 deadline 最近任務,到期執行,否則 sleep;sleep 期間,任務有改變,線程會被喚醒

接下來將介紹分別用跳錶、紅黑樹、時間輪來實現定時器。

跳錶實現定時器

跳錶簡介

跳錶是一種動態的數據結構,採用空間換時間的思想,在有序鏈表基礎上加入多級索引,通過索引進行二分快速查找,支持快速刪除、插入和查找操作(平均時間複雜度爲O(logN),最壞爲O(N)),效率可與平衡樹媲美,實現比其簡單。

下面通過一張圖來簡單說明跳錶操作。跳錶的最底層即爲基本的有序鏈表,存儲所有的數據,可理解爲數據層;往上則爲索引層,理想狀態下,上一層爲下一層節點數的一半。比如,要查找下圖的數據爲11的節點,從begin''出發,向右走,如果下一個節點大於11則往下走,直到找到目標節點。可見,跳錶要比原始鏈表少比較一些節點,但前提是需要花更多空間存儲索引節點。

image-20210323182236910

跳錶實現定時器

學會吸取開源框架中優秀數據結構和代碼思想,直接採用redis中跳錶結構的實現,取出所需部分,用於實現定時器。如下:

跳錶數據結構

跳錶節點與跳錶結構

/*skiplist.h*/
#define ZSKIPLIST_MAXLEVEL 32
#define ZSKIPPLIST 0.25

typedef struct zskiplistNode zskiplistNode;
typedef void (*handler_pt) (zskiplistNode * node);
// 跳錶節點
struct zskiplistNode {
  unsigned long score;  /*用於排序的值*/
  handler_pt handler;  /*處理函數*/
  struct zskiplistLevel {
    struct zskiplistNode **forward;
  }level[];
};
// 跳錶結構
typedef struct zskiplist {
  struct zskiplistNode * header;
  int length;
  int level;  /*跳錶層數*/
}zskiplist;

跳錶接口申明

具體接口實現細節請移步redis源碼。

/*skiplist.h*/
/*創建跳錶,初始化*/
zskiplist *zslCreate(void);
/*刪除跳,表釋放資源*/
void zslFree(zskiplist *zsl);
/*插入節點*/
zskiplistNode *zslInsert(zskiplist *zsl, unsigned long score, handler_pt func);
/*刪除頭節點*/
void zsklDeleteHead(zskiplist *zsl);
/*刪除任意節點*/
void zslDelete(zskiplist *zsl, zskplistNode *zn);
/*打印,調試*/
void zslPrint(zskiplist *zsl);

定時器接口實現

主要介紹四個接口實現:初始化定時器,添加定時任務,刪除 / 取消定時任務,處理定時任務

// test_user.c  封裝給用戶使用的接口
static uint32_t
current_time() {
 uint32_t t;
    struct timespec ti;
    clock_getttime(CLOCK_MONOTONIC, &ti);
    t = (uint32_t)ti.tv_sec * 1000;
    t += ti.tv_sec / 1000000;
}
zskiplist *init_timer() {
    // 初始化定時器
    return zslCreate();
}
zskiplistNode *add_timer(zskiplist *zsl, uint32_t msec, handler_pt func) {
    // 添加定時任務
    msec += current_time();
    return zslInsert(zsl, msec, func);
}
void cancel_timer(zskiplist *zsl, zskiplistNode *zn) {
    // 刪除/取消定時任務
    zslDelete(zsl, zn);
}
void expire_timer(zskiplist *zsl){
    // 處理定時任務
    zskiplistNode *x;
    uint32_t now = current_time();
    for (;;) {
        x = zslMin(zsl);  // 最近節點
        if (!x) break;
        if (x->score > now)  break;  // 時間未到
        x->handler(x);  // 執行相關定時任務
        zslDeleteHead(zsl);  // 執行完刪除
    }
}

紅黑樹實現定時器

紅黑樹

紅黑樹是一種自平衡的二叉查找樹,即,插入和刪除操作如果破壞樹的平衡時,需要重新調整達到平衡狀態。因此,是一種比較難的數據結構。

紅黑樹五條性質

弄懂紅黑樹如何調整樹的平衡,保證滿足這 5 條性質,是比較麻煩,需要耐心的去推導一遍,此處不展開。

紅黑樹實現定時器

AVL 樹平衡要求太高,維護平衡操作過多,較複雜;紅黑樹只需維護一個黑高度,效率較高

紅黑樹查找,刪除,添加時間複雜度爲:O(log(n))

吸取開源框架中優秀數據結構和代碼思想,選用nginx中的紅黑樹結構

紅黑樹數據結構

紅黑樹節點與紅黑樹

// rbtree.h  紅黑樹數據結構以及相關接口,具體接口實現同上
#ifndef _NGX_RBTREE_H_INCLUDE_
#define _NGX_RBTREE_H_INCLUDE_

typedef unsigned int ngx_rbtree_key_t;
typedef unsigned int ngx_uint_t;
typedef int ngx_rbtree_key_int_t;

// 紅黑樹節點
typedef struct ngx_rbtree_node_s  ngx_rbtree_node_t;
struct ngx_rbtree_node_s {
    ngx_rbtree_key_t key;
    ngx_rbtree_node_t *left;
    ngx_rbtree_node_t *right;
    ngx_rbtree_node_t *parent;
    u_char    color;  // 節點顏色
    u_char    data;  // 節點數據
};
// 插入函數指針
typedef void (*ngx_rbtree_insert_pt) (ngx_rbtree_node_t *root,
     ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
// 紅黑樹
typedef struct ngx_rbtree_s ngx_rbtree_t;
struct ngx_rbtree_s {
    ngx_rbtree_node_t  *root;
    ngx_rbtree_node_t  *sentinel;
    ngx_rbtree_insert_pt insert;
};

紅黑樹接口聲明

// 紅黑樹初始化
#define ngx_rbtree_init(tree, s, i)       \
 ngx_rbtree_sentinel_init(s);      \
 (tree)->root = s;        \
 (tree)->sentinel = s;       \
 (tree)->insert = i;        
// 插入操作
void ngx_rbtree_insert(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);
// 刪除操作
void ngx_rbtree_delete(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);
// 插入value
void ngx_rbtree_insert_value(ngx_rbtree_node_t *root, ngx_rbtree_node_t *node,
                            ngx_rbtree_node_t *sentinel);
// 插入timer
void ngx_rbtree_insert_timer_value(ngx_rbtree_node_t *root,
                                  ngx_rbtree_node_t *node,
                                  ngx_rbtree_node_t *sentinel);
// 獲取下一個節點
ngx_rbtree_node_t *ngx_rbtree_next(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);
#define ngx_rbt_red(node)    ((node)->color = 1)
#define ngx_rbt_black(node)    ((node)->color = 0)
#define ngx_rbt_is_red(node)   ((node)->color)
#define ngx_rbt_is_black(node)   (!ngx_rbt_is_red(node))
#define ngx_rbt_copy_color(n1, n2)  (n1->color = n2->color)
#define ngx_rbtree_sentinel_init(node)  ngx_rbt_black(node)
// 找到最小值,一直往左走即可
static inline ngx_rbtree_node_t *
ngx_rbtree_min(ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
{
    while (node->left != sentinel){
        node = node->left;
    }
    return node;
}

定時器接口實現

// test_user.c  封裝給用戶使用的接口
ngx_rbtree_t     timer;
static ngx_rbtree_node_t   sentinel;
typedef struct timer_entry_s timer_entry_t;
typedef void (*timer_handler_pt)(timer_entry_t *ev);

struct timer_entry_s {
   ngx_rbtree_node_t timer;
    timer_handler_pt  handler;
};
// 初始化
int init_timer() {
    ngx_rbtree_init(&timer, &sentinel, ngx_rbtree_insert_timer_value);
    return 0;
}
// 添加定時任務
void add_timer(timer_entry_t *te, uint32_t msec) {
    msec += current_time();
    te->timer.key = msec;
    ngx_rbtree_insert(&timer, &te->timer);
}
// 取消定時
void cancel_timer(timer_entry_t *te) {
    ngx_rbtree_delete(&timer, &te->timer);
}
// 執行到期任務
void expire_timer() {
    timer_entry_t *te;
    ngx_rbtree_node_t *sentinel, *root, *node;
    sentinel = timer.sentinel;
    uint32_t now = current_time();
    for(;;){
        root = timer.root;
        if (root == sentinel) break;
        if (node->key > now) break;
        te = (timer_entry_t *) ((char *) node - offsetof(timer_entry_t, timer));
        te->handler(te);
        ngx_rbtree_delete(&timer, &te->timer);
        free(te);
    }
}

以上,爲紅黑樹和跳錶實現的定時器,多線程環境下加鎖粒度比較大,高併發場景下效率不高,而時間輪適合高併發場景,如下。

時間輪實現定時器

時間輪

可以用於高效的執行大量定時任務,如下爲分層時間輪示意圖:

timewheel

時間輪可參考時鐘進行理解,秒針 (Seconds wheel) 轉一圈,則分針 (Minutes wheel) 走一格,分針 (Minutes wheel) 轉一圈,則時針 (Hours wheel) 走一格。隨着,時間的流逝,任務不斷從上層流下下一層,最終到達秒針輪上,當秒針走到時執行。

如上所示,時間輪大小爲8格,秒針1s轉動一格,其每一格所指向的鏈表保存着待執行任務。比如,如果當前指針指向1,要添加一個3s後執行的任務,由於1+3=4,即在第4格的鏈表中添加一個任務節點即可。如果要添加一個10s後執行的任務,10+1=11,超過了秒針輪範圍,因此需要對 8 取模11 % 8 = 3,即,會把這個任務放到分針輪上3對應的鏈表上,之後再從分針輪把任務丟到秒針輪上進行處理。也即,** 秒針輪 (Seconds wheel)** 即保存着最近將要執行的任務,隨着時間的流逝,任務會慢慢的從上層流到秒針輪中進行執行。

優點:加鎖粒度較小,只需要加一個格子即可,一個格子對應一串鏈表;適合高併發場景

缺點:不好刪除

如何解決時間輪定時任務刪除?

  1. 通過引用計數來解決

  2. 交由業務層處理,將刪除標記設爲true , 在函數回調中根據這個標記判斷是否需要處理

這裏介紹兩種定時器實現方案,一種是簡單實現方案,另一種是skynet較爲複雜的實現。

時間輪實現定時器

簡單時間輪實現方案

功能場景:由心跳包進行超時連接檢測,10s 未收到則斷開連接

一般做法:map<fd, *connect>每秒輪詢這個結構,檢測所有連接是否超時,收到心跳包,記錄時間戳

缺點:效率很差,每次需要檢測所有連接,時間複雜度爲O(n)

優化:分治大法,只需檢測快過期的連接, 採用 hash 數組 + 鏈表形式,數組大小設置成 16 :[0] + [1] + [2] + ... + [15] ,相同過期時間的放入一個數組,因此,每次只需檢測最近過期的數組即可,不需要遍歷所有。

數據結構定義

以下爲定時器節點,增加引用計數ref, 只有當ref爲 0 時刪除連接。

class CTimerNode {
public:
    CTimerNode(int fd) : id(fd), ref(0) {}
    void Offline() {this->ref = 0};
    bool tryKill() {
        if (this->ref == 0) return true;
        DecRef();
        if (this->ref == 0){
            return true;
        }
        return false;
    }
    void IncRef() {this->ref++;}
protected:
    void DecRef() {this->ref--;}
private:
    int ref;
    int id;
}
// 時間輪數組大小16, (x對16取餘)==(x&1111) 落到0-15之間,即落到對應的數組
const int TW_SIZE = 16;
const in EXPIRE = 10; // 過期間隔
const int TW_MASK = TW_SIZE - 1;  // 掩碼, 用於對16取餘
static size_t iReadTick = 0;  // 滴答時鐘
typedef list<CTimerNode*> TimeList; // 數組每一個槽位對應一個list
typedef TimeList::iterator TimeListIter;
typedef vector<TimeList> TimeWheel; // 時間輪
定時器接口
// 添加定時
void AddTimeOut(TimerWheel &tw, CTimerNode *p) {
    if (p) {
        p->IncRef();
        // 找到iRealTick對應數組的idx(槽位)
        TimeList &le = tw[(iRealTick+EXPIRE) & TW_MASK];
        le.push_back(p);  // 把時間節點加入list中
    }
}
// 延時調用
void AddTimeOutDelay(TimeWheel &tw, CTimerNode *p, size_t delay) {
    if (p) {
        p->IncRef();
        TimeList &le = tw[(iRealTick + EXPIRE + delay) & TW_MASK];
        le.push_back(p);
    }
}
// 時間輪移動
void TimerShift(TimeWheel &tw) {
    size_t tick = iRealTick;
    iRealTick++;
    TimeList &le = tw[tick & TW_MASK];
    TimeListIter iter = le.begin();
    for (; iter != le.end(); iter++) {
        CTimerNode *p = *iter;
        if (&& p->trySkill()){
            delete p;
        }
    }
    le.clear();
}

Skynet 定時器實現方案

skynet 中定時器數據結構

採用時間輪方式,hash 表 + 鏈表實現,

struct timer_node {  //時間節點
 struct timer_node *next;
    uint32_t expire; //到期滴答數
};
struct link_list {  // 鏈表
  struct timer_node head;
  struct timer_node *tail;
};
struct timer {
 struct link_list near[256];  // 即將到來的定時器
    struct link_list t[4][64]; // 相對較遙遠的定時器
    struct spinlock lock;
    uint32_t time;  // 記錄當前滴答數
    uint64_t starttime;
    uint64_t current;
    uint64_t current_point;
};

其中time32位無符號整數, 記錄時間片對應數組near[256] ,表示即將到來的定時任務, t[4][64],表示較爲遙遠的定時任務。

定時器執行流程

skynet_time_wheel

| t[3] | t[2] | t[1] | t[0] | near | | --- | --- | --- | --- | --- | | 26-32 位 | 20-26 位 | 14-20 位 | 8-14 位 | 0-8 位 | | [2^(8+6x3),2^(8+6x4)-1] | [2^(8+6x2),2^(8+6x3)-1] | [2^(8+6),2^(8+6x2)-1] | [2^8,2^(8+6) -1] | [0,2^8-1] |

以下爲具體實現,僅貼出主要接口,具體細節請參考skynet源代碼。

定時器初始化
// skynet_start.c
// skynet 啓動入口
void
skynet_start(struct skynet_config * config) {
    ...
    skynet_timer_init();
    ...
}
// skynet_timer.c
void
skynet_timer_init(void) {
    // 創建全局timer結構 TI
    TI  = timer_create_timer();
    uint32_t current = 0;
    systime(&TI->starttime, ¤t);
    TI->current = current;
    TI->current_point = gettime();
}
添加定時器

通過skynet_server.c中的cmd_timeout調用skynet_timeout添加新的定時器

// TI爲全局的定時器指針
static struct timer * TI = NULL; 
int skynet_timeout(uint32_t handle, int time, int session) {
    ...
    struct timer_event event;
    event.handle = handle;  // callback
    eveng.session = session;
    // 添加新的定時器節點
    timer_add(TI, &event, sizeof(event)time);
    return session;
}
// 添加新的定時器節點
static void timer_add(struct timer *T, void 8arg, size_t sz, int time) {
    // 給timer_node指針分配空間,還需要分配timer_node + timer_event大小的空間,
    // 之後通過node + 1可獲得timer_event數據
    struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
    memcpy(node+1,arg,sz);
    SPIN_LOCK(T);
    node->expire=time+T->time;
    add_node(T, node);
    SPIN_UNLOCK(T);
}

// 添加到定時器鏈表裏,如果定時器的到期滴答數跟當前比較近(<2^8),表示即將觸發定時器添加到T->near數組裏
// 否則根據差值大小添加到對應的T->T[i]中
static void add_node(struct timer *T, struct timer_node *node) {
    ...
}
驅動方式

skynet啓動時,會創建一個線程專門跑定時器,每幀 (0.0025s) 調用skynet_updatetime()

// skynet_start.c
static void * 
thread_timer(void *p) {
    struct monitor * m = p;
    skynet_initthread(THREAD_TIMER);
    for (;;) {
        skynet_updatetime();  // 調用timer_update
        skynet_socket_updatetime();
        CHECK_ABORT
        wakeup(m,m->count-1);
        usleep(2500);  // 2500微秒 = 0.0025s
        if (SIG) {
            signal_hup();
            SIG = 0;
        }
    }
    ...
}

每個定時器設置一個到期滴答數,與當前系統的滴答數 (啓動時爲 0,1 滴答 1 滴答往後跳,1 滴答 ==0.01s) 比較得到差值 interval;

如果 interval 比較小 (0 <= interval <= 2^8-1),表示定時器即將到來,保存在第一部分前 2^8 個定時器鏈表中;否則找到屬於第二部分對用的層級中。

// skynet_timer.c
void 
skynet_updatetime(void) {
    ...
    uint32_t diff = (uint32_t)(cp - TI->current_point); 
    TI->current_point = cp;
    TI->current += diff;
    // diff單位爲0.01s
    for (i = 0; i < diff; i++){
        timer_update(TI);        
    }
}
static void
timer_update(struct timer *T) {
    SPIN_LOCK(T);
    timer_execute(T); // 檢查T->near是否位空,有就處理到期定時器
    timer_shift(T);  // 時間片time++,移動高24位的鏈表
    timer_execute(T);
    SPIN_UNLOCK(T);
}
// 每幀從T->near中觸發到期得定時器
static inline void
timer_execute(struct timer *T) {
  ...
}
// 遍歷處理定時器鏈表中所有的定時器
static inline void
dispatch_list(struct timer_node *current) {
    ...
}
// 將高24位對應的4個6位的數組中的各個元素的鏈表往低位移
static void
timer_shift(struct timer *T) {
    ...
}
// 將level層的idx位置的定時器鏈表從當前位置刪除,並重新add_node
static void move_list(struct timer *T, int level, int idx) {

}

最小堆實現定時器

最小堆實現例子:boost.asio採用二叉樹,go採用四叉樹, libuv

具體實現略。

總結

本文主要介紹定時器作用,實現定時器數據結構選取,並詳細介紹了跳錶,紅黑樹,時間輪實現定時器的思路和方法。

參考


跳錶介紹

https://baijiahao.baidu.com/s?id=1633338040568845450&wfr=spider&for=pc

Skynet GitHub

https://github.com/cloudwu/skynet

skynet 源碼剖析

https://zhongyiqun.gitbooks.io/skynet/content/18-skynetding-shi-qi-yuan-li.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/f0caii_V3OODcehGY0qL9A