一文讀懂進程併發與同步

併發 是指在某一時間段內能夠處理多個任務的能力,而 並行 是指同一時間能夠處理多個任務的能力。併發和並行看起來很像,但實際上是有區別的,如下圖(圖片來源於網絡):

concurrency-parallelism

上圖的意思是,有兩條在排隊買咖啡的隊列,併發只有一架咖啡機在處理,而並行就有兩架的咖啡機在處理。咖啡機的數量越多,並行能力就越強。

可以把上面的兩條隊列看成兩個進程,併發就是指只有單個 CPU 在處理,而並行就有兩個 CPU 在處理。爲了讓兩個進程在單核 CPU 中也能得到執行,一般的做法就是讓每個進程交替執行一段時間,比如讓每個進程固定執行 100毫秒,執行時間使用完後切換到其他進程執行。而並行就沒有這種問題,因爲有兩個 CPU,所以兩個進程可以同時執行。如下圖:

concurrency-parallelism

原子操作

上面介紹過,併發有可能會打斷當前執行的進程,然後替切換成其他進程執行。如果有兩個進程同時對一個共享變量 count 進行加一操作,由於 C 語言的 count++ 操作會被翻譯成如下指令:

mov eax, [count]
inc eax
mov [count], eax

那麼在併發的情況下,有可能出現如下問題:

concurrency-problem

假設 count 變量初始值爲 0:

雖然進程 1 和進程 2 執行了兩次 count++ 操作,但是 count 最後的值爲 1,而不是 2。

要解決這個問題就需要使用 原子操作,原子操作是指不能被打斷的操作,在單核 CPU 中,一條指令就是原子操作。比如上面的問題可以把 count++ 語句翻譯成指令 inc [count] 即可。Linux 也提供了這樣的原子操作,如對整數加一操作的 atomic_inc()

static __inline__ void atomic_inc(atomic_t *v)
{
 __asm__ __volatile__(
  LOCK "incl %0"
  :"=m" (v->counter)
  :"m" (v->counter));
}

在多核 CPU 中,一條指令也不一定是原子操作,比如 inc [count] 指令在多核 CPU 中需要進行如下過程:

  1. 從內存將 count 的數據讀取到 cpu。

  2. 累加讀取的值。

  3. 將修改的值寫回 count 內存。

Intel x86 CPU 提供了 lock 前綴來鎖住總線,可以讓指令保證不被其他 CPU 中斷,如下:

lock
inc [count]

原子操作 能夠保證操作不被其他進程干擾,但有時候一個複雜的操作需要由多條指令來實現,那麼就不能使用原子操作了,這時候可以使用  來實現。

計算機科學中的  與日常生活的  有點類似,舉個例子:比如要上公廁,首先找到一個沒有人的廁所,然後把廁所門鎖上。其他人要使用的話,必須等待當前這人使用完畢,並且把門鎖打開才能使用。在計算機中,要對某個公共資源進行操作時,必須對公共資源進行上鎖,然後才能使用。如果不上鎖,那麼就可能導致數據混亂的情況。

在 Linux 內核中,比較常用的鎖有:自旋鎖信號量讀寫鎖 等,下面介紹一下自旋鎖和信號量的實現。

自旋鎖

自旋鎖 只能在多核 CPU 系統中,其核心原理是 原子操作,原理如下圖:

spinlock

使用 自旋鎖 時,必須先對自旋鎖進行初始化(設置爲 1),上鎖過程如下:

  1. 對自旋鎖 lock 進行減一操作,判斷結果是否等於 0,如果是表示上鎖成功並返回。

  2. 如果不等於 0,表示其他進程已經上鎖,此時必須不斷比較自旋鎖 lock 的值是否等於 1(表示已經解鎖)。

  3. 如果自旋鎖 lock 等於 1,跳轉到第一步繼續進行上鎖操作。

由於 Linux 的自旋鎖使用匯編實現,所以比較苦澀難懂,這裏使用 C 語言來模擬一下:

void spin_lock(amtoic_t *lock)
{
again:
    result = --(*lock);
    if (result == 0) {
        return;
    }
    
    while (true) {
        if (*lock == 1) {
     goto again;
 }
    }
}

上面代碼將 result = --(*lock); 當成原子操作,解鎖過程只需要把 lock 設置爲 1 即可。由於自旋鎖會不斷嘗試上鎖操作,並不會對進程進行調度,所以在單核 CPU 中可能會導致 100% 的 CPU 佔用率。另外,自旋鎖只適合粒度比較小的操作,如果操作粒度比較大,就需要使用信號量這種可調度進程的鎖。

信號量

與 自旋鎖 不一樣,噹噹前進程對 信號量 進行上鎖時,如果其他進程已經對其進行上鎖,那麼當前進程會進入睡眠狀態,等待其他進程對信號量進行解鎖。過程如下圖:

semaphore

在 Linux 內核中,信號量使用 struct semaphore 表示,定義如下:

struct semaphore {
    raw_spinlock_t      lock;
    unsigned int        count;
    struct list_head    wait_list;
};

各個字段的作用如下:

信號量 上鎖通過 down() 函數實現,代碼如下:

void down(struct semaphore *sem)
{
    unsigned long flags;

    spin_lock_irqsave(&sem->lock, flags);
    if (likely(sem->count > 0))
        sem->count--;
    else
        __down(sem);
    spin_unlock_irqrestore(&sem->lock, flags);
}

上面代碼可以看出,down() 函數首先對信號量進行自旋鎖操作(爲了避免多核 CPU 競爭),然後比較計數器是否大於 0,如果是對計數器進行減一操作,並且返回,否則調用 __down() 函數進行下一步操作。__down() 函數實現如下:

static noinline void __sched __down(struct semaphore *sem)
{
    __down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}

static inline int __down_common(struct semaphore *sem,
    long state, long timeout)
{
    struct task_struct *task = current;
    struct semaphore_waiter waiter;

    // 把當前進程添加到等待隊列中
    list_add_tail(&waiter.list, &sem->wait_list);
    waiter.task = task;
    waiter.up = 0;

    for (;;) {
        ...
        __set_task_state(task, state);
        spin_unlock_irq(&sem->lock);
        timeout = schedule_timeout(timeout);
        spin_lock_irq(&sem->lock);
        if (waiter.up) // 當前進程是否獲得信號量鎖?
            return 0;
    }
    ...
}

__down() 函數最終調用 __down_common() 函數,而 __down_common() 函數的操作過程如下:

  1. 把當前進程添加到信號量的等待隊列中。

  2. 切換到其他進程運行,直到被其他進程喚醒。

  3. 如果當前進程獲得信號量鎖(由解鎖進程傳遞),那麼函數返回。

接下來看看解鎖過程,解鎖過程主要通過 up() 函數實現,代碼如下:

void up(struct semaphore *sem)
{
    unsigned long flags;
 
    raw_spin_lock_irqsave(&sem->lock, flags);
    if (likely(list_empty(&sem->wait_list))) // 如果沒有等待的進程, 直接對計數器加一操作
        sem->count++;
    else
        __up(sem); // 如果有等待進程, 那麼調用 __up() 函數進行喚醒
    raw_spin_unlock_irqrestore(&sem->lock, flags);
}

static noinline void __sched __up(struct semaphore *sem)
{
    // 獲取到等待隊列的第一個進程
    struct semaphore_waiter *waiter = list_first_entry(
        &sem->wait_list, struct semaphore_waiter, list);

    list_del(&waiter->list);       // 把進程從等待隊列中刪除
    waiter->up = 1;                // 告訴進程已經獲得信號量鎖
    wake_up_process(waiter->task); // 喚醒進程
}

解鎖過程如下:

  1. 判斷當前信號量是否有等待的進程,如果沒有等待的進程, 直接對計數器加一操作

  2. 如果有等待的進程,那麼獲取到等待隊列的第一個進程。

  3. 把進程從等待隊列中刪除。

  4. 告訴進程已經獲得信號量鎖

  5. 喚醒進程。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/LtTAOcduxV4Gfypr-xtugQ