深入理解工作隊列

偉林,中年碼農,從事過電信、手機、安全、芯片等行業,目前依舊從事 Linux 方向開發工作,個人愛好 Linux 相關知識分享,個人微博 CSDN pwl999,歡迎大家關注!

workqueue 是內核裏面很重要的一個機制,特別是內核驅動,一般的小型任務 (work) 都不會自己起一個線程來處理,而是扔到 workqueu 中處理。workqueue 的主要工作就是用進程上下文來處理內核中大量的小任務。

所以 workqueue 的主要設計思想:一個是並行,多個 work 不要相互阻塞;另外一個是節省資源,多個 work 儘量共享資源 (進程、調度、內存),不要造成系統過多的資源浪費。

爲了實現的設計思想,workqueue 的設計實現也更新了很多版本。最新的 workqueue 實現叫做 CMWQ(Concurrency Managed Workqueue),也就是用更加智能的算法來實現 “並行和節省”。新版本的 workque 創建函數改成 alloc_workqueue(),舊版本的函數 create_*workqueue() 逐漸會被被廢棄。

本文的代碼分析基於 linux kernel 3.18.22,最好的學習方法還是 "read the fucking source code"

1.CMWQ 的幾個基本概念

關於 workqueue 中幾個概念都是 work 相關的數據結構非常容易混淆,大概可以這樣來理解:

最終的目的還是把 work(工作) 傳遞給 worker(工人) 去執行,中間的數據結構和各種關係目的是把這件事組織的更加清晰高效。

1.1 worker_pool

每個執行 work 的線程叫做 worker,一組 worker 的集合叫做 worker_pool。CMWQ 的精髓就在 worker_pool 裏面 worker 的動態增減管理上 manage_workers()。

CMWQ 對 worker_pool 分成兩類:

1.1.1 normal worker_pool

默認 work 是在 normal worker_pool 中處理的。系統的規劃是每個 cpu 創建兩個 normal worker_pool:一個 normal 優先級 (nice=0)、一個高優先級 (nice=HIGHPRI_NICE_LEVEL),對應創建出來的 worker 的進程 nice 不一樣。

每個 worker 對應一個 worker_thread() 內核線程,一個 worker_pool 包含一個或者多個 worker,worker_pool 中 worker 的數量是根據 worker_pool 中 work 的負載來動態增減的。

我們可以通過 “ps|grep kworker” 命令來查看所有 worker 對應的內核線程,normal worker_pool 對應內核線程 (worker_thread()) 的命名規則是這樣的:

 snprintf(id_buf, sizeof(id_buf)"%d:%d%s", pool->cpu, id,
   pool->attrs->nice < 0  ? "H" : "");

 worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
           "kworker/%s", id_buf);

so 類似名字是 normal worker_pool:

shell@PRO5:/ $ ps | grep "kworker"
root      14    2     0      0     worker_thr 0000000000 S kworker/1:0H  // cpu1 高優先級worker_pool的第0個worker進程
root      17    2     0      0     worker_thr 0000000000 S kworker/2:0  // cpu2 低優先級worker_pool的第0個worker進程
root      18    2     0      0     worker_thr 0000000000 S kworker/2:0H  // cpu2 高優先級worker_pool的第0個worker進程
root      23699 2     0      0     worker_thr 0000000000 S kworker/0:1  // cpu0 低優先級worker_pool的第1個worker進程

對應的拓撲圖如下:

以下是 normal worker_pool 詳細的創建過程代碼分析:

static int __init init_workqueues(void)
{
 int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
 int i, cpu;

 // (1)給每個cpu創建對應的worker_pool
 /* initialize CPU pools */
 for_each_possible_cpu(cpu) {
  struct worker_pool *pool;

  i = 0;
  for_each_cpu_worker_pool(pool, cpu) {
   BUG_ON(init_worker_pool(pool));
   // 指定cpu
   pool->cpu = cpu;
   cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
   // 指定進程優先級nice
   pool->attrs->nice = std_nice[i++];
   pool->node = cpu_to_node(cpu);

   /* alloc pool ID */
   mutex_lock(&wq_pool_mutex);
   BUG_ON(worker_pool_assign_id(pool));
   mutex_unlock(&wq_pool_mutex);
  }
 }

 // (2)給每個worker_pool創建第一個worker
 /* create the initial worker */
 for_each_online_cpu(cpu) {
  struct worker_pool *pool;

  for_each_cpu_worker_pool(pool, cpu) {
   pool->flags &= ~POOL_DISASSOCIATED;
   BUG_ON(!create_worker(pool));
  }
 }

}
| →
static int init_worker_pool(struct worker_pool *pool)
{
 spin_lock_init(&pool->lock);
 pool->id = -1;
 pool->cpu = -1;
 pool->node = NUMA_NO_NODE;
 pool->flags |= POOL_DISASSOCIATED;
 // (1.1) worker_pool的work list,各個workqueue把work掛載到這個鏈表上,
 // 讓worker_pool對應的多個worker來執行
 INIT_LIST_HEAD(&pool->worklist);
 // (1.2) worker_pool的idle worker list,
 // worker沒有活幹時,不會馬上銷燬,先進入idle狀態備選
 INIT_LIST_HEAD(&pool->idle_list);
 // (1.3) worker_pool的busy worker list,
 // worker正在幹活,在執行work
 hash_init(pool->busy_hash);

 // (1.4) 檢查idle狀態worker是否需要destroy的timer
 init_timer_deferrable(&pool->idle_timer);
 pool->idle_timer.function = idle_worker_timeout;
 pool->idle_timer.data = (unsigned long)pool;

 // (1.5) 在worker_pool創建新的worker時,檢查是否超時的timer
 setup_timer(&pool->mayday_timer, pool_mayday_timeout,
      (unsigned long)pool);

 mutex_init(&pool->manager_arb);
 mutex_init(&pool->attach_mutex);
 INIT_LIST_HEAD(&pool->workers);

 ida_init(&pool->worker_ida);
 INIT_HLIST_NODE(&pool->hash_node);
 pool->refcnt = 1;

 /* shouldn't fail above this point */
 pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
 if (!pool->attrs)
  return -ENOMEM;
 return 0;
}
| →
static struct worker *create_worker(struct worker_pool *pool)
{
 struct worker *worker = NULL;
 int id = -1;
 char id_buf[16];

 /* ID is needed to determine kthread name */
 id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);
 if (id < 0)
  goto fail;

 worker = alloc_worker(pool->node);
 if (!worker)
  goto fail;

 worker->pool = pool;
 worker->id = id;

 if (pool->cpu >= 0)
  // (2.1) 給normal worker_pool的worker構造進程名
  snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
    pool->attrs->nice < 0  ? "H" : "");
 else
  // (2.2) 給unbound worker_pool的worker構造進程名
  snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

 // (2.3) 創建worker對應的內核進程
 worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
           "kworker/%s", id_buf);
 if (IS_ERR(worker->task))
  goto fail;

 // (2.4) 設置內核進程對應的優先級nice
 set_user_nice(worker->task, pool->attrs->nice);

 /* prevent userland from meddling with cpumask of workqueue workers */
 worker->task->flags |= PF_NO_SETAFFINITY;

 // (2.5) 將worker和worker_pool綁定
 /* successful, attach the worker to the pool */
 worker_attach_to_pool(worker, pool);

 // (2.6) 將worker初始狀態設置成idle,
 // wake_up_process以後,worker自動leave idle狀態
 /* start the newly created worker */
 spin_lock_irq(&pool->lock);
 worker->pool->nr_workers++;
 worker_enter_idle(worker);
 wake_up_process(worker->task);
 spin_unlock_irq(&pool->lock);

 return worker;

fail:
 if (id >= 0)
  ida_simple_remove(&pool->worker_ida, id);
 kfree(worker);
 return NULL;
}
|| →
static void worker_attach_to_pool(struct worker *worker,
       struct worker_pool *pool)
{
 mutex_lock(&pool->attach_mutex);

 // (2.5.1) 將worker線程和cpu綁定
 /*
  * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
  * online CPUs.  It'll be re-applied when any of the CPUs come up.
  */
 set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);

 /*
  * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains
  * stable across this function.  See the comments above the
  * flag definition for details.
  */
 if (pool->flags & POOL_DISASSOCIATED)
  worker->flags |= WORKER_UNBOUND;

 // (2.5.2) 將worker加入worker_pool鏈表
 list_add_tail(&worker->node, &pool->workers);

 mutex_unlock(&pool->attach_mutex);
}

1.1.2 unbound worker_pool

大部分的 work 都是通過 normal worker_pool 來執行的 (例如通過 schedule_work()、schedule_work_on() 壓入到系統 workqueue(system_wq)中的 work),最後都是通過 normal worker_pool 中的 worker 來執行的。這些 worker 是和某個 cpu 綁定的,work 一旦被 worker 開始執行,都是一直運行到某個 cpu 上的不會切換 cpu。

unbound worker_pool 相對應的意思,就是 worker 可以在多個 cpu 上調度的。但是他其實也是綁定的,只不過它綁定的單位不是 cpu 而是 node。所謂的 node 是對 NUMA(Non Uniform Memory Access Architecture) 系統來說的,NUMA 可能存在多個 node,每個 node 可能包含一個或者多個 cpu。

unbound worker_pool 對應內核線程 (worker_thread()) 的命名規則是這樣的:

 snprintf(id_buf, sizeof(id_buf)"u%d:%d", pool->id, id);

 worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
           "kworker/%s", id_buf);

so 類似名字是 unbound worker_pool:

shell@PRO5:/ $ ps | grep "kworker"
root      23906 2     0      0     worker_thr 0000000000 S kworker/u20:2 // unbound pool 20的第2個worker進程
root      24564 2     0      0     worker_thr 0000000000 S kworker/u20:0 // unbound pool 20的第0個worker進程
root      24622 2     0      0     worker_thr 0000000000 S kworker/u21:1 // unbound pool 21的第1個worker進程

unbound worker_pool 也分成兩類:

對應的拓撲圖如下:

對應的拓撲圖如下:

以下是 unbound worker_pool 詳細的創建過程代碼分析:

static int __init init_workqueues(void)
{

 // (1) 初始化normal和high nice對應的unbound attrs
 /* create default unbound and ordered wq attrs */
 for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
  struct workqueue_attrs *attrs;

  // (2) unbound_std_wq_attrs
  BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
  attrs->nice = std_nice[i];
  unbound_std_wq_attrs[i] = attrs;
  /*
   * An ordered wq should have only one pwq as ordering is
   * guaranteed by max_active which is enforced by pwqs.
   * Turn off NUMA so that dfl_pwq is used for all nodes.
   */
  // (3) ordered_wq_attrs,no_numa = true;
  BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
  attrs->nice = std_nice[i];
  attrs->no_numa = true;
  ordered_wq_attrs[i] = attrs;
 }


}
struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
            unsigned int flags,
            int max_active,
            struct lock_class_key *key,
            const char *lock_name, ...)
{
 size_t tbl_size = 0;
 va_list args;
 struct workqueue_struct *wq;
 struct pool_workqueue *pwq;

 /* see the comment above the definition of WQ_POWER_EFFICIENT */
 if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
  flags |= WQ_UNBOUND;

 /* allocate wq and format name */
 if (flags & WQ_UNBOUND)
  tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);

 // (1) 分配workqueue_struct數據結構
 wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
 if (!wq)
  return NULL;

 if (flags & WQ_UNBOUND) {
  wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
  if (!wq->unbound_attrs)
   goto err_free_wq;
 }

 va_start(args, lock_name);
 vsnprintf(wq->name, sizeof(wq->name), fmt, args);
 va_end(args);

 // (2) pwq最多放到worker_pool中的work數
 max_active = max_active ?: WQ_DFL_ACTIVE;
 max_active = wq_clamp_max_active(max_active, flags, wq->name);

 /* init wq */
 wq->flags = flags;
 wq->saved_max_active = max_active;
 mutex_init(&wq->mutex);
 atomic_set(&wq->nr_pwqs_to_flush, 0);
 INIT_LIST_HEAD(&wq->pwqs);
 INIT_LIST_HEAD(&wq->flusher_queue);
 INIT_LIST_HEAD(&wq->flusher_overflow);
 INIT_LIST_HEAD(&wq->maydays);

 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
 INIT_LIST_HEAD(&wq->list);

 // (3) 給workqueue分配對應的pool_workqueue
 // pool_workqueue將workqueue和worker_pool鏈接起來
 if (alloc_and_link_pwqs(wq) < 0)
  goto err_free_wq;

 // (4) 如果是WQ_MEM_RECLAIM類型的workqueue
 // 創建對應的rescuer_thread()內核進程
 /*
  * Workqueues which may be used during memory reclaim should
  * have a rescuer to guarantee forward progress.
  */
 if (flags & WQ_MEM_RECLAIM) {
  struct worker *rescuer;

  rescuer = alloc_worker(NUMA_NO_NODE);
  if (!rescuer)
   goto err_destroy;

  rescuer->rescue_wq = wq;
  rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
            wq->name);
  if (IS_ERR(rescuer->task)) {
   kfree(rescuer);
   goto err_destroy;
  }

  wq->rescuer = rescuer;
  rescuer->task->flags |= PF_NO_SETAFFINITY;
  wake_up_process(rescuer->task);
 }

 // (5) 如果是需要,創建workqueue對應的sysfs文件
 if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
  goto err_destroy;

 /*
  * wq_pool_mutex protects global freeze state and workqueues list.
  * Grab it, adjust max_active and add the new @wq to workqueues
  * list.
  */
 mutex_lock(&wq_pool_mutex);

 mutex_lock(&wq->mutex);
 for_each_pwq(pwq, wq)
  pwq_adjust_max_active(pwq);
 mutex_unlock(&wq->mutex);

 // (6) 將新的workqueue加入到全局鏈表workqueues中
 list_add(&wq->list, &workqueues);

 mutex_unlock(&wq_pool_mutex);

 return wq;

err_free_wq:
 free_workqueue_attrs(wq->unbound_attrs);
 kfree(wq);
 return NULL;
err_destroy:
 destroy_workqueue(wq);
 return NULL;
}
| →
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
 bool highpri = wq->flags & WQ_HIGHPRI;
 int cpu, ret;

 // (3.1) normal workqueue
 // pool_workqueue鏈接workqueue和worker_pool的過程
 if (!(wq->flags & WQ_UNBOUND)) {
  // 給workqueue的每個cpu分配對應的pool_workqueue,賦值給wq->cpu_pwqs
  wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
  if (!wq->cpu_pwqs)
   return -ENOMEM;

  for_each_possible_cpu(cpu) {
   struct pool_workqueue *pwq =
    per_cpu_ptr(wq->cpu_pwqs, cpu);
   struct worker_pool *cpu_pools =
    per_cpu(cpu_worker_pools, cpu);

   // 將初始化時已經創建好的normal worker_pool,賦值給pool_workqueue
   init_pwq(pwq, wq, &cpu_pools[highpri]);

   mutex_lock(&wq->mutex);
   // 將pool_workqueue和workqueue鏈接起來
   link_pwq(pwq);
   mutex_unlock(&wq->mutex);
  }
  return 0;
 } else if (wq->flags & __WQ_ORDERED) {
 // (3.2) unbound ordered_wq workqueue
 // pool_workqueue鏈接workqueue和worker_pool的過程
  ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
  /* there should only be single pwq for ordering guarantee */
  WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
         wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
       "ordering guarantee broken for workqueue %s\n", wq->name);
  return ret;
 } else {
 // (3.3) unbound unbound_std_wq workqueue
 // pool_workqueue鏈接workqueue和worker_pool的過程
  return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
 }
}
|| →
int apply_workqueue_attrs(struct workqueue_struct *wq,
     const struct workqueue_attrs *attrs)
{

 // (3.2.1) 根據的ubound的ordered_wq_attrs/unbound_std_wq_attrs
 // 創建對應的pool_workqueue和worker_pool
 // 其中worker_pool不是默認創建好的,是需要動態創建的,對應的worker內核進程也要重新創建
 // 創建好的pool_workqueue賦值給pwq_tbl[node]
 /*
  * If something goes wrong during CPU up/down, we'll fall back to
  * the default pwq covering whole @att- kernel/workqueue.c:  
- __alloc_workqueue_key() -> alloc_and_link_pwqs() -> apply_workqueue_attrs() -> alloc_unbound_pwq()/numa_pwq_tbl_install()rs->cpumask.  Always create
  * it even if we don't use it immediately.
  */
 dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
 if (!dfl_pwq)
  goto enomem_pwq;

 for_each_node(node) {
  if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
   pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
   if (!pwq_tbl[node])
    goto enomem_pwq;
  } else {
   dfl_pwq->refcnt++;
   pwq_tbl[node] = dfl_pwq;
  }
 }


 /* save the previous pwq and install the new one */
 // (3.2.2) 將臨時pwq_tbl[node]賦值給wq->numa_pwq_tbl[node]
 for_each_node(node)
  pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);

}
||| →
static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
     const struct workqueue_attrs *attrs)
{
 struct worker_pool *pool;
 struct pool_workqueue *pwq;

 lockdep_assert_held(&wq_pool_mutex);

 // (3.2.1.1) 如果對應attrs已經創建多對應的unbound_pool,則使用已有的unbound_pool
 // 否則根據attrs創建新的unbound_pool
 pool = get_unbound_pool(attrs);
 if (!pool)
  return NULL;

 pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
 if (!pwq) {
  put_unbound_pool(pool);
  return NULL;
 }

 init_pwq(pwq, wq, pool);
 return pwq;
}

1.2 worker

每個 worker 對應一個 worker_thread() 內核線程,一個 worker_pool 對應一個或者多個 worker。多個 worker 從同一個鏈表中 worker_pool->worklist 獲取 work 進行處理。

所以這其中有幾個重點:

1.2.1 worker 處理 work

處理 work 的過程主要在 worker_thread() -> process_one_work() 中處理,我們具體看看代碼的實現過程。

static int worker_thread(void *__worker)
{
 struct worker *worker = __worker;
 struct worker_pool *pool = worker->pool;

 /* tell the scheduler that this is a workqueue worker */
 worker->task->flags |= PF_WQ_WORKER;
woke_up:
 spin_lock_irq(&pool->lock);

 // (1) 是否die
 /* am I supposed to die? */
 if (unlikely(worker->flags & WORKER_DIE)) {
  spin_unlock_irq(&pool->lock);
  WARN_ON_ONCE(!list_empty(&worker->entry));
  worker->task->flags &= ~PF_WQ_WORKER;

  set_task_comm(worker->task, "kworker/dying");
  ida_simple_remove(&pool->worker_ida, worker->id);
  worker_detach_from_pool(worker, pool);
  kfree(worker);
  return 0;
 }

 // (2) 脫離idle狀態
 // 被喚醒之前worker都是idle狀態
 worker_leave_idle(worker);
recheck:
 
 // (3) 如果需要本worker繼續執行則繼續,否則進入idle狀態
 // need more worker的條件: (pool->worklist != 0) && (pool->nr_running == 0)
 // worklist上有work需要執行,並且現在沒有處於running的work
 /* no more worker necessary? */
 if (!need_more_worker(pool))
  goto sleep;

 // (4) 如果(pool->nr_idle == 0),則啓動創建更多的worker
 // 說明idle隊列中已經沒有備用worker了,先創建 一些worker備用
 /* do we need to manage? */
 if (unlikely(!may_start_working(pool)) && manage_workers(worker))
  goto recheck;

 /*
  * ->scheduled list can only be filled while a worker is
  * preparing to process a work or actually processing it.
  * Make sure nobody diddled with it while I was sleeping.
  */
 WARN_ON_ONCE(!list_empty(&worker->scheduled));

 /*
  * Finish PREP stage.  We're guaranteed to have at least one idle
  * worker or that someone else has already assumed the manager
  * role.  This is where @worker starts participating in concurrency
  * management if applicable and concurrency management is restored
  * after being rebound.  See rebind_workers() for details.
  */
 worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);

 do {
  // (5) 如果pool->worklist不爲空,從其中取出一個work進行處理
  struct work_struct *work =
   list_first_entry(&pool->worklist,
      struct work_struct, entry);

  if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
   /* optimization path, not strictly necessary */
   // (6) 執行正常的work
   process_one_work(worker, work);
   if (unlikely(!list_empty(&worker->scheduled)))
    process_scheduled_works(worker);
  } else {
   // (7) 執行系統特意scheduled給某個worker的work
   // 普通的work是放在池子的公共list中的pool->worklist
   // 只有一些特殊的work被特意派送給某個worker的worker->scheduled
   // 包括:1、執行flush_work時插入的barrier work;
   // 2、collision時從其他worker推送到本worker的work
   move_linked_works(work, &worker->scheduled, NULL);
   process_scheduled_works(worker);
  }
 // (8) worker keep_working的條件:
 // pool->worklist不爲空 && (pool->nr_running <= 1)
 } while (keep_working(pool));

 worker_set_flags(worker, WORKER_PREP);supposed
sleep:
 // (9) worker進入idle狀態
 /*
  * pool->lock is held and there's no work to process and no need to
  * manage, sleep.  Workers are woken up only while holding
  * pool->lock or from local cpu, so setting the current state
  * before releasing pool->lock is enough to prevent losing any
  * event.
  */
 worker_enter_idle(worker);
 __set_current_state(TASK_INTERRUPTIBLE);
 spin_unlock_irq(&pool->lock);
 schedule();
 goto woke_up;
}
| →
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
 struct pool_workqueue *pwq = get_work_pwq(work);
 struct worker_pool *pool = worker->pool;
 bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
 int work_color;
 struct worker *collision;
#ifdef CONFIG_LOCKDEP
 /*
  * It is permissible to free the struct work_struct from
  * inside the function that is called from it, this we need to
  * take into account for lockdep too.  To avoid bogus "held
  * lock freed" warnings as well as problems when looking into
  * work->lockdep_map, make a copy and use that here.
  */
 struct lockdep_map lockdep_map;

 lockdep_copy_map(&lockdep_map, &work->lockdep_map);
#endif
 /* ensure we're on the correct CPU */
 WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
       raw_smp_processor_id() != pool->cpu);

 // (8.1) 如果work已經在worker_pool的其他worker上執行,
 // 將work放入對應worker的scheduled隊列中延後執行
 /*
  * A single work shouldn't be executed concurrently by
  * multiple workers on a single cpu.  Check whether anyone is
  * already processing the work.  If so, defer the work to the
  * currently executing one.
  */
 collision = find_worker_executing_work(pool, work);
 if (unlikely(collision)) {
  move_linked_works(work, &collision->scheduled, NULL);
  return;
 }

 // (8.2) 將worker加入busy隊列pool->busy_hash
 /* claim and dequeue */
 debug_work_deactivate(work);
 hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
 worker->current_work = work;
 worker->current_func = work->func;
 worker->current_pwq = pwq;
 work_color = get_work_color(work);

 list_del_init(&work->entry);

 // (8.3) 如果work所在的wq是cpu密集型的WQ_CPU_INTENSIVE
 // 則當前work的執行脫離worker_pool的動態調度,成爲一個獨立的線程
 /*
  * CPU intensive works don't participate in concurrency management.
  * They're the scheduler's responsibility.  This takes @worker out
  * of concurrency management and the next code block will chain
  * execution of the pending work items.
  */
 if (unlikely(cpu_intensive))
  worker_set_flags(worker, WORKER_CPU_INTENSIVE);

 // (8.4) 在UNBOUND或者CPU_INTENSIVE work中判斷是否需要喚醒idle worker
 // 普通work不會執行這個操作
 /*
  * Wake up another worker if necessary.  The condition is always
  * false for normal per-cpu workers since nr_running would always
  * be >= 1 at this point.  This is used to chain execution of the
  * pending work items for WORKER_NOT_RUNNING workers such as the
  * UNBOUND and CPU_INTENSIVE ones.
  */
 if (need_more_worker(pool))
  wake_up_worker(pool);

 /*
  * Record the last pool and clear PENDING which should be the last
  * update to @work.  Also, do this inside @pool->lock so that
  * PENDING and queued state changes happen together while IRQ is
  * disabled.
  */
 set_work_pool_and_clear_pending(work, pool->id);

 spin_unlock_irq(&pool->lock);

 lock_map_acquire_read(&pwq->wq->lockdep_map);
 lock_map_acquire(&lockdep_map);
 trace_workqueue_execute_start(work);
 // (8.5) 執行work函數
 worker->current_func(work);
 /*
  * While we must be careful to not use "work" after this, the trace
  * point will only record its address.
  */
 trace_workqueue_execute_end(work);
 lock_map_release(&lockdep_map);
 lock_map_release(&pwq->wq->lockdep_map);

 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
  pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
         "     last function: %pf\n",
         current->comm, preempt_count(), task_pid_nr(current),
         worker->current_func);
  debug_show_held_locks(current);
  dump_stack();
 }

 /*
  * The following prevents a kworker from hogging CPU on !PREEMPT
  * kernels, where a requeueing work item waiting for something to
  * happen could deadlock with stop_machine as such work item could
  * indefinitely requeue itself while all other CPUs are trapped in
  * stop_machine. At the same time, report a quiescent RCU state so
  * the same condition doesn't freeze RCU.
  */
 cond_resched_rcu_qs();

 spin_lock_irq(&pool->lock);

 /* clear cpu intensive status */
 if (unlikely(cpu_intensive))
  worker_clr_flags(worker, WORKER_CPU_INTENSIVE);

 /* we're done with it, release */
 hash_del(&worker->hentry);
 worker->current_work = NULL;
 worker->current_func = NULL;
 worker->current_pwq = NULL;
 worker->desc_valid = false;
 pwq_dec_nr_in_flight(pwq, work_color);
}

1.2.2 worker_pool 動態管理 worker

worker_pool 怎麼來動態增減 worker,這部分的算法是 CMWQ 的核心。其思想如下:

詳細代碼可以參考上節 worker_thread() -> process_one_work() 的分析。

爲了追蹤 worker 的 running 和 suspend 狀態,用來動態調整 worker 的數量。wq 使用在進程調度中加鉤子函數的技巧:

void wq_worker_waking_up(struct task_struct *task, int cpu)
{
 struct worker *worker = kthread_data(task);

 if (!(worker->flags & WORKER_NOT_RUNNING)) {
  WARN_ON_ONCE(worker->pool->cpu != cpu);
  // 增加worker_pool中running的worker數量
  atomic_inc(&worker->pool->nr_running);
 }
}
struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu)
{
 struct worker *worker = kthread_data(task), *to_wakeup = NULL;
 struct worker_pool *pool;

 /*
  * Rescuers, which may not have all the fields set up like normal
  * workers, also reach here, let's not access anything before
  * checking NOT_RUNNING.
  */
 if (worker->flags & WORKER_NOT_RUNNING)
  return NULL;

 pool = worker->pool;

 /* this can only happen on the local cpu */
 if (WARN_ON_ONCE(cpu != raw_smp_processor_id() || pool->cpu != cpu))
  return NULL;

 /*
  * The counterpart of the following dec_and_test, implied mb,
  * worklist not empty test sequence is in insert_work().
  * Please read comment there.
  *
  * NOT_RUNNING is clear.  This means that we're bound to and
  * running on the local cpu w/ rq lock held and preemption
  * disabled, which in turn means that none else could be
  * manipulating idle_list, so dereferencing idle_list without pool
  * lock is safe.
  */
 // 減少worker_pool中running的worker數量
 // 如果worklist還有work需要處理,喚醒第一個idle worker進行處理
 if (atomic_dec_and_test(&pool->nr_running) &&
     !list_empty(&pool->worklist))
  to_wakeup = first_idle_worker(pool);
 return to_wakeup ? to_wakeup->task : NULL;
}

這裏 worker_pool 的調度思想是:如果有 work 需要處理,保持一個 running 狀態的 worker 處理,不多也不少。

但是這裏有一個問題如果 work 是 cpu 密集型的,它雖然也沒有進入 suspend 狀態,但是會長時間的佔用 cpu,讓後續的 work 阻塞太長時間。

爲了解決這個問題,CMWQ 設計了 WQ_CPU_INTENSIVE,如果一個 wq 聲明自己是 CPU_INTENSIVE,則讓當前 worker 脫離動態調度,像是進入了 suspend 狀態,那麼 CMWQ 會創建新的 worker,後續的 work 會得到執行。

static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{

 bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;


 // (1) 設置當前worker的WORKER_CPU_INTENSIVE標誌
 // nr_running會被減1
 // 對worker_pool來說,當前worker相當於進入了suspend狀態
 /*
  * CPU intensive works don't participate in concurrency management.
  * They're the scheduler's responsibility.  This takes @worker out
  * of concurrency management and the next code block will chain
  * execution of the pending work items.
  */
 if (unlikely(cpu_intensive))
  worker_set_flags(worker, WORKER_CPU_INTENSIVE);

 // (2) 接上一步,判斷是否需要喚醒新的worker來處理work
 /*
  * Wake up another worker if necessary.  The condition is always
  * false for normal per-cpu workers since nr_running would always
  * be >= 1 at this point.  This is used to chain execution of the
  * pending work items for WORKER_NOT_RUNNING workers such as the
  * UNBOUND and CPU_INTENSIVE ones.
  */
 if (need_more_worker(pool))
  wake_up_worker(pool);

 // (3) 執行work
 worker->current_func(work);


 // (4) 執行完,清理當前worker的WORKER_CPU_INTENSIVE標誌
 // 當前worker重新進入running狀態
 /* clear cpu intensive status */
 if (unlikely(cpu_intensive))
  worker_clr_flags(worker, WORKER_CPU_INTENSIVE);


}


 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE |
      WORKER_UNBOUND | WORKER_REBOUND,


static inline void worker_set_flags(struct worker *worker, unsigned int flags)
{
 struct worker_pool *pool = worker->pool;

 WARN_ON_ONCE(worker->task != current);

 /* If transitioning into NOT_RUNNING, adjust nr_running. */
 if ((flags & WORKER_NOT_RUNNING) &&
     !(worker->flags & WORKER_NOT_RUNNING)) {
  atomic_dec(&pool->nr_running);
 }

 worker->flags |= flags;
}


static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
{
 struct worker_pool *pool = worker->pool;
 unsigned int oflags = worker->flags;

 WARN_ON_ONCE(worker->task != current);

 worker->flags &= ~flags;

 /*
  * If transitioning out of NOT_RUNNING, increment nr_running.  Note
  * that the nested NOT_RUNNING is not a noop.  NOT_RUNNING is mask
  * of multiple flags, not a single flag.
  */
 if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
  if (!(worker->flags & WORKER_NOT_RUNNING))
   atomic_inc(&pool->nr_running);
}

1.2.3 cpu hotplug 處理

從上幾節可以看到,系統會創建和 cpu 綁定的 normal worker_pool 和不綁定 cpu 的 unbound worker_pool,worker_pool 又會動態的創建 worker。

那麼在 cpu hotplug 的時候,會怎麼樣動態的處理 worker_pool 和 worker 呢?來看具體的代碼分析:

static int __init init_workqueues(void)
{

 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
 hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);

}
| →
static int workqueue_cpu_down_callback(struct notifier_block *nfb,
       unsigned long action,
       void *hcpu)
{
 int cpu = (unsigned long)hcpu;
 struct work_struct unbind_work;
 struct workqueue_struct *wq;

 switch (action & ~CPU_TASKS_FROZEN) {
 case CPU_DOWN_PREPARE:
  /* unbinding per-cpu workers should happen on the local CPU */
  INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
  // (1) cpu down_prepare
  // 把和當前cpu綁定的normal worker_pool上的worker停工
  // 隨着當前cpu被down掉,這些worker會遷移到其他cpu上
  queue_work_on(cpu, system_highpri_wq, &unbind_work);

  // (2) unbound wq對cpu變化的更新
  /* update NUMA affinity of unbound workqueues */
  mutex_lock(&wq_pool_mutex);
  list_for_each_entry(wq, &workqueues, list)
   wq_update_unbound_numa(wq, cpu, false);
  mutex_unlock(&wq_pool_mutex);

  /* wait for per-cpu unbinding to finish */
  flush_work(&unbind_work);
  destroy_work_on_stack(&unbind_work);
  break;
 }
 return NOTIFY_OK;
}
| →
static int workqueue_cpu_up_callback(struct notifier_block *nfb,
            unsigned long action,
            void *hcpu)
{
 int cpu = (unsigned long)hcpu;
 struct worker_pool *pool;
 struct workqueue_struct *wq;
 int pi;

 switch (action & ~CPU_TASKS_FROZEN) {
 case CPU_UP_PREPARE:
  for_each_cpu_worker_pool(pool, cpu) {
   if (pool->nr_workers)
    continue;
   if (!create_worker(pool))
    return NOTIFY_BAD;
  }
  break;

 case CPU_DOWN_FAILED:
 case CPU_ONLINE:
  mutex_lock(&wq_pool_mutex);
  
  // (3) cpu up
  for_each_pool(pool, pi) {
   mutex_lock(&pool->attach_mutex);

   // 如果和當前cpu綁定的normal worker_pool上,有WORKER_UNBOUND停工的worker
   // 重新綁定worker到worker_pool
   // 讓這些worker開工,並綁定到當前cpu
   if (pool->cpu == cpu)
    rebind_workers(pool);
   else if (pool->cpu < 0)
    restore_unbound_workers_cpumask(pool, cpu);

   mutex_unlock(&pool->attach_mutex);
  }

  /* update NUMA affinity of unbound workqueues */
  list_for_each_entry(wq, &workqueues, list)
   wq_update_unbound_numa(wq, cpu, true);

  mutex_unlock(&wq_pool_mutex);
  break;
 }
 return NOTIFY_OK;
}

1.3 workqueue

workqueue 就是存放一組 work 的集合,基本可以分爲兩類:一類系統創建的 workqueue,一類是用戶自己創建的 workqueue。

不論是系統還是用戶 workqueue,如果沒有指定 WQ_UNBOUND,默認都是和 normal worker_pool 綁定。

1.3.1 系統 workqueue

系統在初始化時創建了一批默認的 workqueue:system_wq、system_highpri_wq、system_long_wq、system_unbound_wq、system_freezable_wq、system_power_efficient_wq、system_freezable_power_efficient_wq。

像 system_wq,就是 schedule_work() 默認使用的。

static int __init init_workqueues(void)
{

 system_wq = alloc_workqueue("events", 0, 0);
 system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
 system_long_wq = alloc_workqueue("events_long", 0, 0);
 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
         WQ_UNBOUND_MAX_ACTIVE);
 system_freezable_wq = alloc_workqueue("events_freezable",
           WQ_FREEZABLE, 0);
 system_power_efficient_wq = alloc_workqueue("events_power_efficient",
           WQ_POWER_EFFICIENT, 0);
 system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
           WQ_FREEZABLE | WQ_POWER_EFFICIENT,
           0);

}

1.3.2 workqueue 創建

詳細過程見上幾節的代碼分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。

1.3.3 flush_workqueue()

這一部分的邏輯,wq->work_color、wq->flush_color 換來換去的邏輯實在看的頭暈。看不懂暫時不想看,放着以後看吧,或者有誰看懂了教我一下。:)

1.4 pool_workqueue

pool_workqueue 只是一箇中介角色。

詳細過程見上幾節的代碼分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。

1.5 work

描述一份待執行的工作。

1.5.1 queue_work()

將 work 壓入到 workqueue 當中。

static void __queue_work(int cpu, struct workqueue_struct *wq,
    struct work_struct *work)
{
 struct pool_workqueue *pwq;
 struct worker_pool *last_pool;
 struct list_head *worklist;
 unsigned int work_flags;
 unsigned int req_cpu = cpu;

 /*
  * While a work item is PENDING && off queue, a task trying to
  * steal the PENDING will busy-loop waiting for it to either get
  * queued or lose PENDING.  Grabbing PENDING and queueing should
  * happen with IRQ disabled.
  */
 WARN_ON_ONCE(!irqs_disabled());

 debug_work_activate(work);

 /* if draining, only works from the same workqueue are allowed */
 if (unlikely(wq->flags & __WQ_DRAINING) &&
     WARN_ON_ONCE(!is_chained_work(wq)))
  return;
retry:
 // (1) 如果沒有指定cpu,則使用當前cpu
 if (req_cpu == WORK_CPU_UNBOUND)
  cpu = raw_smp_processor_id();

 /* pwq which will be used unless @work is executing elsewhere */
 if (!(wq->flags & WQ_UNBOUND))
  // (2) 對於normal wq,使用當前cpu對應的normal worker_pool
  pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
 else
  // (3) 對於unbound wq,使用當前cpu對應node的worker_pool
  pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));

 // (4) 如果work在其他worker上正在被執行,把work壓到對應的worker上去
 // 避免work出現重入的問題
 /*
  * If @work was previously on a different pool, it might still be
  * running there, in which case the work needs to be queued on that
  * pool to guarantee non-reentrancy.
  */
 last_pool = get_work_pool(work);
 if (last_pool && last_pool != pwq->pool) {
  struct worker *worker;

  spin_lock(&last_pool->lock);

  worker = find_worker_executing_work(last_pool, work);

  if (worker && worker->current_pwq->wq == wq) {
   pwq = worker->current_pwq;
  } else {
   /* meh... not running there, queue here */
   spin_unlock(&last_pool->lock);
   spin_lock(&pwq->pool->lock);
  }
 } else {
  spin_lock(&pwq->pool->lock);
 }

 /*
  * pwq is determined and locked.  For unbound pools, we could have
  * raced with pwq release and it could already be dead.  If its
  * refcnt is zero, repeat pwq selection.  Note that pwqs never die
  * without another pwq replacing it in the numa_pwq_tbl or while
  * work items are executing on it, so the retrying is guaranteed to
  * make forward-progress.
  */
 if (unlikely(!pwq->refcnt)) {
  if (wq->flags & WQ_UNBOUND) {
   spin_unlock(&pwq->pool->lock);
   cpu_relax();
   goto retry;
  }
  /* oops */
  WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
     wq->name, cpu);
 }

 /* pwq determined, queue */
 trace_workqueue_queue_work(req_cpu, pwq, work);

 if (WARN_ON(!list_empty(&work->entry))) {
  spin_unlock(&pwq->pool->lock);
  return;
 }

 pwq->nr_in_flight[pwq->work_color]++;
 work_flags = work_color_to_flags(pwq->work_color);

 // (5) 如果還沒有達到max_active,將work掛載到pool->worklist
 if (likely(pwq->nr_active < pwq->max_active)) {
  trace_workqueue_activate_work(work);
  pwq->nr_active++;
  worklist = &pwq->pool->worklist;
 // 否則,將work掛載到臨時隊列pwq->delayed_works
 } else {
  work_flags |= WORK_STRUCT_DELAYED;
  worklist = &pwq->delayed_works;
 }

 // (6) 將work壓入worklist當中
 insert_work(pwq, work, worklist, work_flags);

 spin_unlock(&pwq->pool->lock);
}

1.5.2 flush_work()

flush 某個 work,確保 work 執行完成。

怎麼判斷異步的 work 已經執行完成?這裏面使用了一個技巧:在目標 work 的後面插入一個新的 work wq_barrier,如果 wq_barrier 執行完成,那麼目標 work 肯定已經執行完成。

/**
 * flush_work - wait for a work to finish executing the last queueing instance
 * @work: the work to flush
 *
 * Wait until @work has finished execution.  @work is guaranteed to be idle
 * on return if it hasn't been requeued since flush started.
 *
 * Return:
 * %true if flush_work() waited for the work to finish execution,
 * %false if it was already idle.
 */
bool flush_work(struct work_struct *work)
{
 struct wq_barrier barr;

 lock_map_acquire(&work->lockdep_map);
 lock_map_release(&work->lockdep_map);

 if (start_flush_work(work, &barr)) {
  // 等待barr work執行完成的信號
  wait_for_completion(&barr.done);
  destroy_work_on_stack(&barr.work);
  return true;
 } else {
  return false;
 }
}
| →
static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
{
 struct worker *worker = NULL;
 struct worker_pool *pool;
 struct pool_workqueue *pwq;

 might_sleep();

 // (1) 如果work所在worker_pool爲NULL,說明work已經執行完
 local_irq_disable();
 pool = get_work_pool(work);
 if (!pool) {
  local_irq_enable();
  return false;
 }

 spin_lock(&pool->lock);
 /* see the comment in try_to_grab_pending() with the same code */
 pwq = get_work_pwq(work);
 if (pwq) {
  // (2) 如果work所在pwq指向的worker_pool不等於上一步得到的worker_pool,說明work已經執行完
  if (unlikely(pwq->pool != pool))
   goto already_gone;
 } else {
  // (3) 如果work所在pwq爲NULL,並且也沒有在當前執行的work中,說明work已經執行完
  worker = find_worker_executing_work(pool, work);
  if (!worker)
   goto already_gone;
  pwq = worker->current_pwq;
 }

 // (4) 如果work沒有執行完,向work的後面插入barr work
 insert_wq_barrier(pwq, barr, work, worker);
 spin_unlock_irq(&pool->lock);

 /*
  * If @max_active is 1 or rescuer is in use, flushing another work
  * item on the same workqueue may lead to deadlock.  Make sure the
  * flusher is not running on the same workqueue by verifying write
  * access.
  */
 if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)
  lock_map_acquire(&pwq->wq->lockdep_map);
 else
  lock_map_acquire_read(&pwq->wq->lockdep_map);
 lock_map_release(&pwq->wq->lockdep_map);

 return true;
already_gone:
 spin_unlock_irq(&pool->lock);
 return false;
}
|| →
static void insert_wq_barrier(struct pool_workqueue *pwq,
         struct wq_barrier *barr,
         struct work_struct *target, struct worker *worker)
{
 struct list_head *head;
 unsigned int linked = 0;

 /*
  * debugobject calls are safe here even with pool->lock locked
  * as we know for sure that this will not trigger any of the
  * checks and call back into the fixup functions where we
  * might deadlock.
  */
 // (4.1) barr work的執行函數wq_barrier_func()
 INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
 __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
 init_completion(&barr->done);

 /*
  * If @target is currently being executed, schedule the
  * barrier to the worker; otherwise, put it after @target.
  */
 // (4.2) 如果work當前在worker中執行,則barr work插入scheduled隊列
 if (worker)
  head = worker->scheduled.next;
 // 否則,則barr work插入正常的worklist隊列中,插入位置在目標work後面
 // 並且置上WORK_STRUCT_LINKED標誌
 else {
  unsigned long *bits = work_data_bits(target);

  head = target->entry.next;
  /* there can already be other linked works, inherit and set */
  linked = *bits & WORK_STRUCT_LINKED;
  __set_bit(WORK_STRUCT_LINKED_BIT, bits);
 }

 debug_work_activate(&barr->work);
 insert_work(pwq, &barr->work, head,
      work_color_to_flags(WORK_NO_COLOR) | linked);
}
||| →
static void wq_barrier_func(struct work_struct *work)
{
 struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
 // (4.1.1) barr work執行完成,發出complete信號。
 complete(&barr->done);
}

2.Workqueue 對外接口函數

CMWQ 實現的 workqueue 機制,被包裝成相應的對外接口函數。

2.1 schedule_work()

把 work 壓入系統默認 wq system_wq,WORK_CPU_UNBOUND 指定 worker 爲當前 cpu 綁定的 normal worker_pool 創建的 worker。

static inline bool schedule_work(struct work_struct *work)
{
 return queue_work(system_wq, work);
}
| →
static inline bool queue_work(struct workqueue_struct *wq,
         struct work_struct *work)
{
 return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}

2.2 sschedule_work_on()

在 schedule_work() 基礎上,可以指定 work 運行的 cpu。

static inline bool schedule_work_on(int cpu, struct work_struct *work)
{
 return queue_work_on(cpu, system_wq, work);
}

2.3 schedule_delayed_work()

啓動一個 timer,在 timer 定時到了以後調用 delayed_work_timer_fn() 把 work 壓入系統默認 wq system_wq。

static inline bool schedule_delayed_work(struct delayed_work *dwork,
      unsigned long delay)
{
 return queue_delayed_work(system_wq, dwork, delay);
}
| →
static inline bool queue_delayed_work(struct workqueue_struct *wq,
          struct delayed_work *dwork,
          unsigned long delay)
{
 return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
}
|| →
bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
      struct delayed_work *dwork, unsigned long delay)
{
 struct work_struct *work = &dwork->work;
 bool ret = false;
 unsigned long flags;

 /* read the comment in __queue_work() */
 local_irq_save(flags);

 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
  __queue_delayed_work(cpu, wq, dwork, delay);
  ret = true;
 }

 local_irq_restore(flags);
 return ret;
}
||| →
static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
    struct delayed_work *dwork, unsigned long delay)
{
 struct timer_list *timer = &dwork->timer;
 struct work_struct *work = &dwork->work;

 WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
       timer->data != (unsigned long)dwork);
 WARN_ON_ONCE(timer_pending(timer));
 WARN_ON_ONCE(!list_empty(&work->entry));

 /*
  * If @delay is 0, queue @dwork->work immediately.  This is for
  * both optimization and correctness.  The earliest @timer can
  * expire is on the closest next tick and delayed_work users depend
  * on that there's no such delay when @delay is 0.
  */
 if (!delay) {
  __queue_work(cpu, wq, &dwork->work);
  return;
 }

 timer_stats_timer_set_start_info(&dwork->timer);

 dwork->wq = wq;
 dwork->cpu = cpu;
 timer->expires = jiffies + delay;

 if (unlikely(cpu != WORK_CPU_UNBOUND))
  add_timer_on(timer, cpu);
 else
  add_timer(timer);
}
|||| →
void delayed_work_timer_fn(unsigned long __data)
{
 struct delayed_work *dwork = (struct delayed_work *)__data;

 /* should have been called from irqsafe timer with irq already off */
 __queue_work(dwork->cpu, dwork->wq, &dwork->work);
}

參考資料

  1. Documentation/workqueue.txt
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Sz8cr0eU1LDykc6IBUsfuA