一文讀懂進程併發與同步
併發
是指在某一時間段內能夠處理多個任務的能力,而 並行
是指同一時間能夠處理多個任務的能力。併發和並行看起來很像,但實際上是有區別的,如下圖(圖片來源於網絡):
concurrency-parallelism
上圖的意思是,有兩條在排隊買咖啡的隊列,併發只有一架咖啡機在處理,而並行就有兩架的咖啡機在處理。咖啡機的數量越多,並行能力就越強。
可以把上面的兩條隊列看成兩個進程,併發就是指只有單個 CPU 在處理,而並行就有兩個 CPU 在處理。爲了讓兩個進程在單核 CPU 中也能得到執行,一般的做法就是讓每個進程交替執行一段時間,比如讓每個進程固定執行 100毫秒
,執行時間使用完後切換到其他進程執行。而並行就沒有這種問題,因爲有兩個 CPU,所以兩個進程可以同時執行。如下圖:
concurrency-parallelism
原子操作
上面介紹過,併發有可能會打斷當前執行的進程,然後替切換成其他進程執行。如果有兩個進程同時對一個共享變量 count
進行加一操作,由於 C 語言的 count++
操作會被翻譯成如下指令:
mov eax, [count]
inc eax
mov [count], eax
那麼在併發的情況下,有可能出現如下問題:
concurrency-problem
假設 count 變量初始值爲 0:
-
進程 1 執行完
mov eax, [count]
後,寄存器 eax 內保存了 count 的值 0。 -
進程 2 被調度執行。進程 2 執行
count++
的所有指令,將累加後的 count 值 1 寫回到內存。 -
進程 1 再次被調度執行,計算 count 的累加值仍爲 1,寫回到內存。
雖然進程 1 和進程 2 執行了兩次 count++
操作,但是 count 最後的值爲 1,而不是 2。
要解決這個問題就需要使用 原子操作
,原子操作是指不能被打斷的操作,在單核 CPU 中,一條指令就是原子操作。比如上面的問題可以把 count++
語句翻譯成指令 inc [count]
即可。Linux 也提供了這樣的原子操作,如對整數加一操作的 atomic_inc()
:
static __inline__ void atomic_inc(atomic_t *v)
{
__asm__ __volatile__(
LOCK "incl %0"
:"=m" (v->counter)
:"m" (v->counter));
}
在多核 CPU 中,一條指令也不一定是原子操作,比如 inc [count]
指令在多核 CPU 中需要進行如下過程:
-
從內存將 count 的數據讀取到 cpu。
-
累加讀取的值。
-
將修改的值寫回 count 內存。
Intel x86 CPU
提供了 lock
前綴來鎖住總線,可以讓指令保證不被其他 CPU 中斷,如下:
lock
inc [count]
鎖
原子操作
能夠保證操作不被其他進程干擾,但有時候一個複雜的操作需要由多條指令來實現,那麼就不能使用原子操作了,這時候可以使用 鎖
來實現。
計算機科學中的 鎖
與日常生活的 鎖
有點類似,舉個例子:比如要上公廁,首先找到一個沒有人的廁所,然後把廁所門鎖上。其他人要使用的話,必須等待當前這人使用完畢,並且把門鎖打開才能使用。在計算機中,要對某個公共資源進行操作時,必須對公共資源進行上鎖,然後才能使用。如果不上鎖,那麼就可能導致數據混亂的情況。
在 Linux 內核中,比較常用的鎖有:自旋鎖
、信號量
、讀寫鎖
等,下面介紹一下自旋鎖和信號量的實現。
自旋鎖
自旋鎖
只能在多核 CPU 系統中,其核心原理是 原子操作
,原理如下圖:
spinlock
使用 自旋鎖
時,必須先對自旋鎖進行初始化(設置爲 1),上鎖過程如下:
-
對自旋鎖
lock
進行減一操作,判斷結果是否等於 0,如果是表示上鎖成功並返回。 -
如果不等於 0,表示其他進程已經上鎖,此時必須不斷比較自旋鎖
lock
的值是否等於 1(表示已經解鎖)。 -
如果自旋鎖
lock
等於 1,跳轉到第一步繼續進行上鎖操作。
由於 Linux 的自旋鎖使用匯編實現,所以比較苦澀難懂,這裏使用 C 語言來模擬一下:
void spin_lock(amtoic_t *lock)
{
again:
result = --(*lock);
if (result == 0) {
return;
}
while (true) {
if (*lock == 1) {
goto again;
}
}
}
上面代碼將 result = --(*lock);
當成原子操作,解鎖過程只需要把 lock
設置爲 1 即可。由於自旋鎖會不斷嘗試上鎖操作,並不會對進程進行調度,所以在單核 CPU 中可能會導致 100% 的 CPU 佔用率。另外,自旋鎖只適合粒度比較小的操作,如果操作粒度比較大,就需要使用信號量這種可調度進程的鎖。
信號量
與 自旋鎖
不一樣,噹噹前進程對 信號量
進行上鎖時,如果其他進程已經對其進行上鎖,那麼當前進程會進入睡眠狀態,等待其他進程對信號量進行解鎖。過程如下圖:
semaphore
在 Linux 內核中,信號量使用 struct semaphore
表示,定義如下:
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
各個字段的作用如下:
-
lock
:自旋鎖,用於對多核 CPU 平臺進行同步。 -
count
:信號量的計數器,上鎖時對其進行減一操作 (count--),如果得到的結果爲大於等於 0,表示成功上鎖,如果小於 0 表示已經被其他進程上鎖。 -
wait_list
:正在等待信號量解鎖的進程隊列。
信號量
上鎖通過 down()
函數實現,代碼如下:
void down(struct semaphore *sem)
{
unsigned long flags;
spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
__down(sem);
spin_unlock_irqrestore(&sem->lock, flags);
}
上面代碼可以看出,down()
函數首先對信號量進行自旋鎖操作(爲了避免多核 CPU 競爭),然後比較計數器是否大於 0,如果是對計數器進行減一操作,並且返回,否則調用 __down()
函數進行下一步操作。__down()
函數實現如下:
static noinline void __sched __down(struct semaphore *sem)
{
__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
static inline int __down_common(struct semaphore *sem,
long state, long timeout)
{
struct task_struct *task = current;
struct semaphore_waiter waiter;
// 把當前進程添加到等待隊列中
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
waiter.up = 0;
for (;;) {
...
__set_task_state(task, state);
spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
spin_lock_irq(&sem->lock);
if (waiter.up) // 當前進程是否獲得信號量鎖?
return 0;
}
...
}
__down()
函數最終調用 __down_common()
函數,而 __down_common()
函數的操作過程如下:
-
把當前進程添加到信號量的等待隊列中。
-
切換到其他進程運行,直到被其他進程喚醒。
-
如果當前進程獲得信號量鎖(由解鎖進程傳遞),那麼函數返回。
接下來看看解鎖過程,解鎖過程主要通過 up()
函數實現,代碼如下:
void up(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list))) // 如果沒有等待的進程, 直接對計數器加一操作
sem->count++;
else
__up(sem); // 如果有等待進程, 那麼調用 __up() 函數進行喚醒
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __up(struct semaphore *sem)
{
// 獲取到等待隊列的第一個進程
struct semaphore_waiter *waiter = list_first_entry(
&sem->wait_list, struct semaphore_waiter, list);
list_del(&waiter->list); // 把進程從等待隊列中刪除
waiter->up = 1; // 告訴進程已經獲得信號量鎖
wake_up_process(waiter->task); // 喚醒進程
}
解鎖過程如下:
-
判斷當前信號量是否有等待的進程,如果沒有等待的進程, 直接對計數器加一操作
-
如果有等待的進程,那麼獲取到等待隊列的第一個進程。
-
把進程從等待隊列中刪除。
-
告訴進程已經獲得信號量鎖
-
喚醒進程。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/LtTAOcduxV4Gfypr-xtugQ