Linux 線程同步:深度剖析與實戰指南

當我們深入探索 Linux 操作系統的奧祕時,線程同步無疑是一個至關重要且充滿挑戰的領域。在複雜的 Linux 系統中,多個線程常常同時運行,它們如同在一個繁忙的數字工坊中協同工作的工匠。然而,若沒有有效的同步機制,這些線程可能會相互干擾、衝突,導致數據不一致、系統不穩定甚至崩潰。

線程同步就像是爲這個工坊制定的一套精準的規則和協調機制。它確保不同的線程在合適的時機進行操作,有序地共享資源,避免混亂和錯誤。理解 Linux 線程同步,不僅能讓我們更好地優化程序性能,提高系統的可靠性,還能深入洞察操作系統底層的工作原理。本文將深入探討 Linux 線程同步的方法和實現機制,幫助讀者更好地理解和應用線程同步技術。

一、概述

Linux 線程同步是多線程編程中的關鍵環節,確保多個線程在訪問共享資源時能夠協調一致,避免出現競態條件和數據不一致的情況。在 Linux 系統中,線程同步是非常重要的概念。線程同步是指多個線程之間爲了協調彼此的操作而採取的一種機制。在多線程編程中,由於線程是併發執行的,一些資源(如共享變量)可能會被多個線程同時訪問,導致競爭條件。線程同步可以確保在多線程環境下對共享資源的訪問是有序的,從而避免競爭條件的發生。

在操作系統中,線程同步是非常重要的概念,它確保多個線程能夠按照預期的順序執行,並且對共享資源的訪問是安全的。在多線程編程中,多個線程可能會同時訪問共享的數據或資源。如果沒有適當的同步機制,可能會出現競態條件、死鎖、飢餓等問題。爲了解決這些問題,Linux 提供了多種線程同步方法。

Linux 實現線程同步主要有如下三個手段:mutex(互斥變量)/cond(條件變量)/sem(信號量)。互斥變量是實現線程同步的一個重要手段,通過對互斥變量加解鎖機制,可以實現線程的同步。一個線程對一個互斥變量加鎖,而其他任何一個線程想再次對互斥變量加鎖時都會被阻塞,直到加鎖線程釋放互斥鎖。

條件變量是實現線程同步的另一個重要手段,可以使用條件變量等待另一個線程的結束,條件變量一般痛互斥變量一起使用。條件變量的一個重要表象就是一個線程等待另外的線程發送信號。除了互斥鎖和條件變量外,Linux 還支持其他線程同步機制,如信號量、屏障等。這些機制可以根據實際需求進行選擇,以確保線程之間的協調和同步。

在使用這些線程同步機制時,需要注意不僅要保證正確性,還要避免死鎖等問題。在編寫多線程程序時,正確使用線程同步機制是至關重要的。如果不正確地處理線程之間的同步關係,可能會導致程序出現各種問題,如數據錯誤、死鎖等。因此,在進行多線程編程時,開發人員需要充分了解線程同步的概念和機制,並在代碼中妥善處理線程之間的同步關係。

1.1 名詞解釋

CPU:本文中的 CPU 都是指邏輯 CPU。

UP:單處理器 (單 CPU)。

SMP:對稱多處理器 (多 CPU)。

線程、執行流:線程的本質是一個執行流,但執行流不僅僅有線程,還有 ISR、softirq、tasklet,都是執行流。本文中說到線程一般是泛指各種執行流,除非是在需要區分不同執行流時,線程才特指狹義的線程。

併發、並行:併發是指線程在宏觀上表現爲同時執行,微觀上可能是同時執行也可能是交錯執行,並行是指線程在宏觀上是同時執行,微觀上也是同時執行。

僞併發、真併發:僞併發就是微觀上是交錯執行的併發,真併發就是並行。UP 上只有僞併發,SMP 上既有僞併發也有真併發。

臨界區:訪問相同數據的代碼段,如果可能會在多個線程中併發執行,就叫做臨界區,臨界區可以是一個代碼段被多個線程併發執行,也可以是多個不同的代碼段被多個線程併發執行。

同步:首先線程同步的同步和同步異步的同步,不是一個意思。線程同步的同步,本文按照字面意思進行解釋,同步就是統一步調、同時執行的意思。

線程同步現象:線程併發過程中如果存在臨界區併發執行的情況,就叫做線程同步現象。

線程防同步機制:如果發生線程同步現象,由於臨界區會訪問共同的數據,程序可能就會出錯,因此我們要防止發生線程同步現象,也就是防止臨界區併發執行的情況,爲此我們採取的防範措施叫做線程防同步機制。

1.2 線程同步與防同步

爲什麼線程同步叫線程同步,不叫線程防同步,叫線程同步給人的感覺好像就是要讓線程同時執行的意思啊。但是實際上線程同步是讓臨界區不要併發執行的意思,不管你們倆誰先執行,只要錯開,誰先誰後執行都可以。所以本文後面都採用線程防同步機制、防同步機制等詞。

我小時候一直有個疑惑,感冒藥爲啥叫感冒藥,感冒藥是讓人感冒的藥啊,不是應該叫治感冒藥纔對嗎,治療感冒的藥。後來一想,就沒有讓人感冒的藥,所以感冒藥表達的就是治療感冒的藥,沒必要加個治字。但是同時還有一種藥,叫老鼠藥,是治療老鼠的藥嗎,不是啊,是要毒死老鼠的藥,因爲沒有人會給老鼠治病。不過我們那裏也有把老鼠藥叫做害老鼠藥的,加個害字,意思更明確,不會有歧義。

因此本文用了兩個詞,同步現象、防同步機制,使得概念更加清晰明確。

說了這麼多就是爲了闡述一個非常簡單的概念,就是不能同時操作相同的數據,因爲可能會出錯,所以我們要想辦法防止,這個方法我們把它叫做線程防同步。

還有一個詞是競態條件 (race condition),很多關於線程同步的書籍文檔中都有提到,但是我一直沒有理解是啥意思。競態條件,條件,線程同步和條件也沒啥關係啊;競態,也不知道是什麼意思。再看它的英文,condition 有情況的意思,race 有賽跑、競爭的意思,是不是要表達賽跑情況、競爭現象,想說兩個線程在競爭賽跑,看誰能先訪問到公共數據。我發現沒有競態條件這個詞對我們理解線程同步問題一點影響都沒有,有了這個詞反而不明所以,所以我們就忽略這個詞。

二、線程防同步方法

在我們理解了同步現象、防同步機制這兩個詞後,下面的內容就很好理解了。同步現象是指同時訪問相同的數據,那麼如何防止呢,我不讓你同時訪問相同的數據不就可以了嘛。因此防同步機制有三大類方法,分別是從時間上防同步、從空間上防同步、事後防同步。從時間上和空間上防同步都比較好理解,事後防同步的意思是說我先不防同步,先把臨界區走一遍再說,然後回頭看剛纔有沒有發生同步現象,如果有的話,就再重新走一遍臨界區,直到沒有發生同步現象爲止。

2.1 時間上防同步

我不讓你們同時進入臨界區,這樣就不會同時操作相同的數據了,有三種方法:

⑴原子操作

對於個別特別簡單特別短的臨界區,CPU 會提供一些原子指令,在一條指令中把多個操作完成,兩個原子指令必然一個在前一個在後地執行,不可能同時執行。原子指令能防僞併發和真併發,適用於 UP 和 SMP。

⑵加鎖

對於大部分臨界區來說,代碼都比較複雜,CPU 不可能都去實現原子指令,因此可以在臨界區的入口處加鎖,同一時間只能有一個線程進入,獲得鎖的線程進入,在臨界區的出口處再釋放鎖。未獲得鎖的線程在外面等待,等待的方式有兩種,忙等待的叫做自旋鎖,休眠等待的叫做阻塞鎖。根據臨界區內的數據讀寫操作不同,鎖又可以分爲單一鎖和讀寫鎖,單一鎖不區分讀者寫者,所有用戶都互斥;讀寫鎖區分讀者和寫者,讀者之間可以並行,寫者與讀者、寫者與寫者之間是互斥的。自旋鎖有單一鎖和讀寫鎖,阻塞鎖也有單一鎖和讀寫鎖。自旋鎖只能防真併發,適用於 SMP;休眠鎖能防僞併發和真併發,適用於 UP 和 SMP。

⑶臨時禁用僞併發

對於某些由於僞併發而產生的同步問題,可以通過在臨界區的入口處禁用此類僞併發、在臨界區的出口處再恢復此類僞併發來解決。這種方式顯然只能防僞併發,適用於 UP 和 SMP 上的單 CPU。而自旋鎖只能防真併發,所以在 SMP 上經常會同時使用這兩種方法,同時防僞併發和真併發。臨時禁用僞併發有三種情況:

2.2 空間上防同步

你們可以同時,但是我不讓你們訪問相同的數據,有兩種方法:

⑴數據分割

把大家都要訪問的公共數據分割成 N 份,各訪問各的。這也有兩種情況:

① 在 SMP 上如果多個 CPU 要經常訪問一個全局數據,那麼可以把這個數據拆分成 NR_CPU 份,每個 CPU 只訪問自己對應的那份,這樣就不存在併發問題了,這個方法叫做 per-CPU 變量,只能防真併發,適用於 SMP,需要和禁用搶佔配合使用。

②TLS,每個線程都用自己的局部數據,這樣就不存在併發問題了,能防真併發和僞併發,適用於 UP 和 SMP。

⑵數據複製

RCU,只適用於用指針訪問的動態數據。讀者複製指針,然後就可以隨意讀取數據了,所有的讀者可以共同讀一份數據。寫者複製數據,然後就可以隨意修改複製後的數據了,因爲這份數據是私有的。不過修改完數據之後要修改指針指向最新的數據,修改指針的這個操作需要是原子的。對於讀者來說,它是複製完指針之後用自己的私有指針來訪問數據的,所以它訪問的要麼是之前的數據,要麼是修改之後的數據,而不會是不一致的數據。RCU 不僅能實現讀者之間的同時訪問,還能實現讀者與一個寫者的同時訪問,可謂是併發性非常高。RCU 對於讀者端的開銷非常低、性能非常高。RCU 能防僞併發和真併發,適用於 UP 和 SMP。

2.3 事後防同步

不去積極預防併發,而是假設不存在併發,直接訪問數據。訪問完了之後再檢查剛纔是否有併發發生,如果有就再重來一遍,一直重試,直到沒有併發發生爲止。這就是內核裏面的序列鎖 seqlock,能防僞併發和真併發,適用於 UP 和 SMP。下面我們來畫張圖總結一下:

三、線程同步方法

3.1 互斥鎖

互斥鎖是 Linux 線程同步中一種簡單而有效的加鎖方法。它具有原子性和唯一性等特點,確保在同一時間只有一個線程可以訪問共享資源。在多任務操作系統中,多個線程可能都需要使用同一種資源,互斥鎖就像一把鎖,用於控制對共享資源的訪問。

互斥鎖的特點如下:

互斥鎖的操作流程如下:

常用的關於互斥鎖的函數有:

互斥鎖是休眠鎖,加鎖失敗時要把自己放入等待隊列,釋放鎖的時候要考慮喚醒等待隊列的線程。互斥鎖的定義如下 (刪除了一些配置選項):

struct mutex {
	atomic_long_t		owner;
	raw_spinlock_t		wait_lock;
	struct list_head	wait_list;
};

可以看到互斥鎖的定義有 atomic_long_t owner,這個和我們之前定義的 int lock 是相似的,只不過這裏是個加強版,0 代表沒加鎖,加鎖的時候是非 0,而不是簡單的 1,而是記錄的是加鎖的線程。然後是自旋鎖和等待隊列,自旋鎖是用來保護等待隊列的。這裏的自旋鎖爲什麼要用 raw_spinlock_t 呢,它和 spinlock_t 有什麼區別呢?在標準的內核版本下是沒有區別的,如果打了 RTLinux 補丁之後它們就不一樣了,spinlock_t 會轉化爲阻塞鎖,raw_spinlock_t 還是自旋鎖,如果需要在任何情況下都要自旋的話請使用 raw_spinlock_t。下面我們看一下它的加鎖操作:

void __sched mutex_lock(struct mutex *lock)
{
	might_sleep();

	if (!__mutex_trylock_fast(lock))
		__mutex_lock_slowpath(lock);
}
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
	unsigned long curr = (unsigned long)current;
	unsigned long zero = 0UL;

	if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
		return true;

	return false;
}

可以看到加鎖時先嚐試直接加鎖,用的就是 CAS 原子指令 (x86 的 CAS 指令叫做 cmpxchg)。如果 owner 是 0,代表當前鎖是開着的,就把 owner 設置爲自己 (也就是當前線程,struct task_struct * 強轉爲 ulong),代表自己成爲這個鎖的主人,也就是自己加鎖成功了,然後返回 true。如果 owner 不爲 0 的話,代表有人已經持有鎖了,返回 false,後面就要走慢速路徑了,也就是把自己放入等待隊列裏休眠等待。下面看看慢速路徑的代碼是怎麼實現的:

static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
	__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}

static int __sched
__mutex_lock(struct mutex *lock, unsigned int state, unsigned int subclass,
	     struct lockdep_map *nest_lock, unsigned long ip)
{
	return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}

static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, unsigned int state, unsigned int subclass,
		    struct lockdep_map *nest_lock, unsigned long ip,
		    struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
	struct mutex_waiter waiter;
	int ret;

	raw_spin_lock(&lock->wait_lock);
	waiter.task = current;
	__mutex_add_waiter(lock, &waiter, &lock->wait_list);

	set_current_state(state);
	for (;;) {
		bool first;
		if (__mutex_trylock(lock))
			goto acquired;

		raw_spin_unlock(&lock->wait_lock);
		schedule_preempt_disabled();

		first = __mutex_waiter_is_first(lock, &waiter);
		set_current_state(state);
		if (__mutex_trylock_or_handoff(lock, first))
			break;

		raw_spin_lock(&lock->wait_lock);
	}

acquired:
	__set_current_state(TASK_RUNNING);
	__mutex_remove_waiter(lock, &waiter);
	raw_spin_unlock(&lock->wait_lock);

	return ret;
}

static void
__mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter,
		   struct list_head *list)
{
	debug_mutex_add_waiter(lock, waiter, current);

	list_add_tail(&waiter->list, list);
	if (__mutex_waiter_is_first(lock, waiter))
		__mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
}

可以看到慢速路徑最終會調用函數__mutex_lock_common,這個函數本身是很複雜的,這裏進行了大量的刪減,只保留了核心邏輯。函數先加鎖 mutex 的自旋鎖,然後把自己放到等待隊列上去,然後就在無限 for 循環中休眠並等待別人釋放鎖並喚醒自己。

For 循環的入口處先嚐試加鎖,如果成功就 goto acquired,如果不成功就釋放自旋鎖,並調用 schedule_preempt_disabled,此函數就是休眠函數,它會調度其它進程來執行,自己就休眠了,直到有人喚醒自己纔會醒來繼續執行。別人釋放鎖的時候會喚醒自己,這個我們後面會分析。

當我們被喚醒之後會去嘗試加鎖,如果加鎖失敗,再次來到 for 循環的開頭處,再試一次加鎖,如果不行就再走一次休眠過程。爲什麼我們加鎖會失敗呢,因爲有可能多個線程同時被喚醒來爭奪鎖,我們不一定會搶鎖成功。搶鎖失敗就再去休眠,最後總會搶鎖成功的。

把自己加入等待隊列時會設置 flag MUTEX_FLAG_WAITERS,這個 flag 是設置在 owner 的低位上去,因爲 task_struct 指針至少是 L1_CACHE_BYTES 對齊的,所以最少有 3 位可以空出來做 flag。

下面我們再來看一下釋放鎖的流程:

void __sched mutex_unlock(struct mutex *lock)
{  if (__mutex_unlock_fast(lock))
    return;
  __mutex_unlock_slowpath(lock, _RET_IP_);
}
static __always_inline bool __mutex_unlock_fast(struct mutex *lock)
{
  unsigned long curr = (unsigned long)current;
  return atomic_long_try_cmpxchg_release(&lock->owner, &curr, 0UL);
}
解鎖的時候先嚐試快速解鎖,快速解鎖的意思是沒有其它在等待隊列裏,可以直接釋放鎖。怎麼判斷等待隊列裏沒有線程在等待呢,這就是前面設置的MUTEX_FLAG_WAITERS的作用了。如果設置了這個flag,lock->owner 和 curr就不會相等,直接釋放鎖就會失敗,就要走慢速路徑。慢速路徑的代碼如下:
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
  struct task_struct *next = NULL;
  DEFINE_WAKE_Q(wake_q);
  unsigned long owner;
  owner = atomic_long_read(&lock->owner);
  for (;;) {
    if (atomic_long_try_cmpxchg_release(&lock->owner, &owner, __owner_flags(owner))) {
      if (owner & MUTEX_FLAG_WAITERS)
        break;
    }
  }
  raw_spin_lock(&lock->wait_lock);
  if (!list_empty(&lock->wait_list)) {
    struct mutex_waiter *waiter =
      list_first_entry(&lock->wait_list,
           struct mutex_waiter, list);
    next = waiter->task;
    wake_q_add(&wake_q, next);
  }
  raw_spin_unlock(&lock->wait_lock);
  wake_up_q(&wake_q);
}

上述代碼進行了一些刪減。可以看出上述代碼會先釋放鎖,然後喚醒等待隊列裏面的第一個等待者。

3.2 條件變量

條件變量是一種線程同步機制,用於等待特定條件的發生。條件變量通常與互斥鎖一起使用,線程在等待條件變量時會釋放互斥鎖,進入等待狀態。當條件滿足時,其他線程可以通過信號量喚醒等待的線程。

條件變量需要配合互斥量一起使用,互斥量作爲參數傳入 wait 函數,函數把調用線程放到等待條件的線程列表上,然後對互斥量解鎖,這兩個是原子操作。當線程等待到條件,從 wait 函數返回之前,會再次鎖住互斥量。

在使用條件變量時,需要注意 “驚羣效應”。爲了防止這種情況,在 wait 被喚醒後,還需要在 while 中去檢查條件。例如有兩個線程同時阻塞在 wait,先後醒來,快的線程做完處理然後把條件 reset 了,並且對互斥量解鎖,此時慢的線程在 wait 裏獲得了鎖返回,還再去做處理就會出問題。

3.3 信號量

信號量是一種計數器,用於控制對共享資源的訪問數量。信號量可以用於實現線程同步和互斥,通過增加和減少信號量的值來控制線程的訪問。

信號量的主要作用是保護共享資源,確保在同一時刻只有一個線程能夠對其進行訪問。通過信號量,線程在進入關鍵代碼段之前必須獲取信號量,一旦完成了對共享資源的操作,就釋放信號量,以允許其他線程訪問該資源。

信號量與互斥量的區別在於,互斥量只允許一個線程進入臨界區,其他線程必須等待該線程釋放鎖才能進入;而信號量可以控制對一組資源的訪問,信號量的值可以大於 1,表示同時允許多個線程訪問資源。

信號量的實現原理涉及到原子操作和操作系統的底層支持,在不同的操作系統和編程語言中可能會有不同的實現方式。通常,操作系統提供了對信號量的原子操作支持,以確保在多線程環境下信號量的正確使用。

下面我們先看一下信號量的定義:

struct semaphore {
	raw_spinlock_t		lock;
	unsigned int		count;
	struct list_head	wait_list;
};

#define __SEMAPHORE_INITIALIZER(name, n)				\
{									\
	.lock		= __RAW_SPIN_LOCK_UNLOCKED((name).lock),	\
	.count		= n,						\
	.wait_list	= LIST_HEAD_INIT((name).wait_list),		\
}

static inline void sema_init(struct semaphore *sem, int val)
{
	*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
}

可以看出信號量和互斥鎖的定義很相似,都有一個自旋鎖,一個等待隊列,不同的是信號量沒有 owner,取而代之的是 count,代表着某一類資源的個數,而且自旋鎖同時保護着等待隊列和 count。信號量初始化時要指定 count 的大小。我們來看一下信號量的 down 操作 (獲取一個資源):

void down(struct semaphore *sem)
{
	unsigned long flags;

	might_sleep();
	raw_spin_lock_irqsave(&sem->lock, flags);
	if (likely(sem->count > 0))
		sem->count--;
	else
		__down(sem);
	raw_spin_unlock_irqrestore(&sem->lock, flags);
}

static noinline void __sched __down(struct semaphore *sem)
{
	__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}

static inline int __sched __down_common(struct semaphore *sem, long state,
								long timeout)
{
	struct semaphore_waiter waiter;

	list_add_tail(&waiter.list, &sem->wait_list);
	waiter.task = current;
	waiter.up = false;

	for (;;) {
		if (signal_pending_state(state, current))
			goto interrupted;
		if (unlikely(timeout <= 0))
			goto timed_out;
		__set_current_state(state);
		raw_spin_unlock_irq(&sem->lock);
		timeout = schedule_timeout(timeout);
		raw_spin_lock_irq(&sem->lock);
		if (waiter.up)
			return 0;
	}

 timed_out:
	list_del(&waiter.list);
	return -ETIME;

 interrupted:
	list_del(&waiter.list);
	return -EINTR;
}

可以看出我們會先持有自旋鎖,然後看看 count 是不是大於 0,大於 0 的話代表資源還有剩餘,我們直接減 1,代表佔用一份資源,就可以返回了。如果不大於 0 的話,代表資源沒有了,我們就進去等待隊列等待。

我們再來看看 up 操作 (歸還資源):

void up(struct semaphore *sem)
{
	unsigned long flags;

	raw_spin_lock_irqsave(&sem->lock, flags);
	if (likely(list_empty(&sem->wait_list)))
		sem->count++;
	else
		__up(sem);
	raw_spin_unlock_irqrestore(&sem->lock, flags);
}

static noinline void __sched __up(struct semaphore *sem)
{
	struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
						struct semaphore_waiter, list);
	list_del(&waiter->list);
	waiter->up = true;
	wake_up_process(waiter->task);
}

先加鎖自旋鎖,然後看看等待隊列是否爲空,如果爲空的話直接把 count 加 1 就可以了。如果不爲空的話,則代表有人在等待資源,資源就不加 1 了,直接喚醒隊首的線程來獲取。

3.4Futex

Futex 是 Linux 系統中的一種高效線程同步機制,通過減少系統調用的次數來提高效率。Futex 允許線程在用戶態進行大部分操作,只有在真正需要等待時纔會調用系統調用進入內核態。

Futex 是 Fast Userspace Mutexes 的縮寫,即快速用戶空間互斥體。其設計思想是在傳統的 Unix 系統中,很多同步操作是無競爭的,但進程仍需要陷入內核去檢查是否有競爭發生,這造成了大量的性能開銷。Futex 是一種用戶態和內核態混合的同步機制,同步的進程間通過 mmap 共享一段內存,futex 變量就位於這段共享的內存中且操作是原子的,當進程嘗試進入互斥區或者退出互斥區的時候,先去查看共享內存中的 futex 變量,如果沒有競爭發生,則只修改 futex,而不用再執行系統調用了。當通過訪問 futex 變量告訴進程有競爭發生,則還是得執行系統調用去完成相應的處理(wait 或者 wake up)。

四、線程同步的實現機制

4.1 信號量機制

在 Linux 中,信號量是一種常用的線程同步機制。信號量可以用於實現線程同步和互斥,通過增加和減少信號量的值來控制線程的訪問。

信號量的初始化可以通過 sem_init 函數實現,該函數接受三個參數:信號量指針、共享範圍(0 表示線程間使用,非 0 表示進程間使用)以及初始值。例如:sem_init(&sem, 0, value);,這裏的 sem 是要初始化的信號量,初始值爲 value。

當需要等待信號量時,可以使用 sem_wait 函數。該函數以原子操作的方式將信號量的值減 1。如果信號量的值大於 0,將信號量的值減 1,立即返回。如果信號量的值爲 0,則線程阻塞,直到該信號量值爲非 0 值。

當信號量的值滿足條件時,可以使用 sem_post 函數以原子操作的方式將信號量的值加 1。當信號量的值大於 0 時,其它正在調用 sem_wait 等待信號量的線程將被喚醒,選擇機制同樣是由線程的調度策略決定的。

在使用完信號量後,可以使用 sem_destroy 函數對其進行銷燬清理。只有用 sem_init 初始化的信號量才能用 sem_destroy 銷燬。

信號量的主要作用是保護共享資源,確保在同一時刻只有一個線程能夠對其進行訪問。通過信號量,線程在進入關鍵代碼段之前必須獲取信號量,一旦完成了對共享資源的操作,就釋放信號量,以允許其他線程訪問該資源。

信號量與互斥量的區別在於,互斥量只允許一個線程進入臨界區,其他線程必須等待該線程釋放鎖才能進入;而信號量可以控制對一組資源的訪問,信號量的值可以大於 1,表示同時允許多個線程訪問資源。

信號量的實現原理涉及到原子操作和操作系統的底層支持,在不同的操作系統和編程語言中可能會有不同的實現方式。通常,操作系統提供了對信號量的原子操作支持,以確保在多線程環境下信號量的正確使用。

4.2 互斥鎖機制

互斥鎖的初始化、加鎖、解鎖和銷燬等操作可以通過系統調用實現。互斥鎖的類型包括普通鎖、嵌套鎖、檢錯鎖和適應鎖等,不同類型的鎖在試圖對一個已經被鎖定的互斥鎖加鎖時表現不同。

普通鎖是互斥鎖的默認類型。當一個線程對一個普通鎖加鎖以後,其餘請求該鎖的線程將形成一個等待隊列,並在該鎖解鎖後按照優先級獲得它,這種鎖類型保證了資源分配的公平性。一個線程如果對一個已經加鎖的普通鎖再次加鎖,將引發死鎖;對一個已經被其他線程加鎖的普通鎖解鎖,或者對一個已經解鎖的普通鎖再次解鎖,將導致不可預期的後果。

檢錯鎖在一個線程對一個已經加鎖的檢錯鎖再次加鎖時,加鎖操作返回 EDEADLK;對一個已經被其他線程加鎖的檢錯鎖解鎖或者對一個已經解鎖的檢錯鎖再次解鎖,則解鎖操作返回 EPERM。

嵌套鎖允許一個線程在釋放鎖之前多次對它加鎖而不發生死鎖;其他線程要獲得這個鎖,則當前鎖的擁有者必須執行多次解鎖操作;對一個已經被其他線程加鎖的嵌套鎖解鎖,或者對一個已經解鎖的嵌套鎖再次解鎖,則解鎖操作返回 EPERM。

默認鎖在一個線程如果對一個已經加鎖的默認鎖再次加鎖,或者雖一個已經被其他線程加鎖的默認鎖解鎖,或者對一個解鎖的默認鎖解鎖,將導致不可預期的後果;這種鎖實現的時候可能被映射成上述三種鎖之一。

互斥鎖的操作流程通常是在訪問共享資源前的臨界區域,對互斥鎖進行加鎖,使用諸如 pthread_mutex_lock 等函數。在訪問完成後釋放互斥鎖上的鎖,如使用 pthread_mutex_unlock 函數。

4.3 條件變量機制

條件變量的初始化、等待、激發和銷燬等操作可以通過系統調用實現。條件變量通常與互斥鎖一起使用,用於等待特定條件的發生。

條件變量需要配合互斥量一起使用,互斥量作爲參數傳入 wait 函數,函數把調用線程放到等待條件的線程列表上,然後對互斥量解鎖,這兩個是原子操作。當線程等待到條件,從 wait 函數返回之前,會再次鎖住互斥量。

在使用條件變量時,需要注意 “驚羣效應”。爲了防止這種情況,在 wait 被喚醒後,還需要在 while 中去檢查條件。例如有兩個線程同時阻塞在 wait,先後醒來,快的線程做完處理然後把條件 reset 了,並且對互斥量解鎖,此時慢的線程在 wait 裏獲得了鎖返回,還再去做處理就會出問題。

示例僞代碼如下:

void* Thread1(void){
    while(線程運行條件成立){
        …
        pthread_mutex_lock(qlock);
        while(條件成立)
            pthread_cond_wait(qcond,qlock);
            // 或者 pthread_cond_wait(qcond,qlock,timeout);
            reset條件變量…
        pthread_mutex_unlock(qlock);
    }
}

void* Thread2(void){
    while(線程運行條件成立){
        …
        pthread_mutex_lock(qlock);
        set了條件變量…
        // 可以發送處理信號
        pthread_cond_signal(qcond);
        // 或者 pthread_cond_broadcast(qcond);
        pthread_mutex_unlock(qlock);
    }
}

條件變量的用法包括創建和註銷、等待和喚醒等操作。創建和註銷有靜態和動態兩種方式,靜態方式使用 PTHREAD_COND_INITIALIZER 常量,動態方式調用 pthread_cond_init 函數。註銷一個條件變量需要調用 pthread_cond_destroy 函數,只有在沒有線程在該條件變量上等待的時候才能註銷這個條件變量,否則返回 EBUSY。

等待有兩種方式,無條件等待 pthread_cond_wait 和計時等待 pthread_cond_timedwait。這兩種等待方式都必須和一個互斥鎖配合,以防止多個線程同時請求的競爭條件。

喚醒條件有兩種形式,pthread_cond_signal 喚醒一個等待該條件的線程,存在多個等待線程時按入隊順序喚醒其中一個;而 pthread_cond_broadcast 則喚醒所有等待線程。必須在互斥鎖的保護下使用相應的條件變量,否則可能會引起死鎖。

五、線程同步的應用場景

5.1 生產者 - 消費者問題

在多線程編程中,生產者 - 消費者問題是一個經典的同步問題。生產者線程負責生產數據,消費者線程負責消費數據。當生產者生產數據時,如果數據未被消費完,生產者需要等待消費者消費完數據後才能繼續生產。同樣,當消費者消費數據時,如果沒有數據可消費,消費者需要等待生產者生產數據後才能繼續消費。

條件變量可以用於實現生產者 - 消費者問題。當生產者生產數據時,通過信號量喚醒等待的消費者線程;當消費者消費數據時,通過信號量喚醒等待的生產者線程。例如,在以下代碼中:

// 生產者線程程序
void *product() {
    int id = ++product_id;
    while (1) {
        // 用 sleep 的數量可以調節生產和消費的速度,便於觀察
        sleep(1);
        sem_wait(&empty_sem);
        pthread_mutex_lock(&mutex);
        in = in % M;
        printf("product%d in %d. like: \t", id, in);
        buff[in] = 1;
        print();
        ++in;
        pthread_mutex_unlock(&mutex);
        sem_post(&full_sem);
    }
}

// 消費者方法
void *prochase() {
    int id = ++prochase_id;
    while (1) {
        // 用 sleep 的數量可以調節生產和消費的速度,便於觀察
        sleep(1);
        sem_wait(&full_sem);
        pthread_mutex_lock(&mutex);
        out = out % M;
        printf("prochase%d in %d. like: \t", id, out);
        buff[out] = 0;
        print();
        ++out;
        pthread_mutex_unlock(&mutex);
        sem_post(&empty_sem);
    }
}

這裏使用了信號量 empty_sem 和 full_sem 以及互斥鎖 mutex 來實現生產者 - 消費者問題。生產者在生產數據之前,先檢查緩衝區是否已滿,如果已滿,則等待消費者消費數據後再進行生產。消費者在消費數據之前,先檢查緩衝區是否爲空,如果爲空,則等待生產者生產數據後再進行消費。

5.2 讀者 - 寫者問題

多個讀者可以同時讀取數據,但寫者需要獨佔訪問。互斥鎖可以用於實現讀者 - 寫者問題,當讀者讀取數據時,通過互斥鎖保護共享數據;當寫者寫入數據時,通過互斥鎖獨佔訪問共享數據。

例如,在讀者優先的情況下:

void *reader(void *in) {
    //    while(1)
    //    {
    sem_wait(&sem_read);
    readerCnt++;
    if (readerCnt == 1) {
        pthread_mutex_lock(&mutex_write);
    }
    sem_post(&sem_read);
    printf("讀線程id%d進入數據集\n", pthread_self());
    read();
    printf("讀線程id%d退出數據集\n", pthread_self());
    sem_wait(&sem_read);
    readerCnt--;
    if (readerCnt == 0) {
        pthread_mutex_unlock(&mutex_write);
    }
    sem_post(&sem_read);
    sleep(R_SLEEP);
    //    }
    pthread_exit((void *)0);
}

void *writer(void *in) {
    //    while(1)
    //    {
    pthread_mutex_lock(&mutex_write);
    printf("寫線程id%d進入數據集\n", pthread_self());
    write();
    printf("寫線程id%d退出數據集\n", pthread_self());
    pthread_mutex_unlock(&mutex_write);
    sleep(W_SLEEP);
    //    }
    pthread_exit((void *)0);
}

這裏使用了互斥鎖 mutex_write 和信號量 sem_read 來實現讀者優先的讀者 - 寫者問題。讀者在讀取數據之前,先增加讀者計數器 readerCnt,如果是第一個讀者,則獲取互斥鎖 mutex_write,以防止寫者寫入數據。讀者在讀取數據之後,減少讀者計數器 readerCnt,如果是最後一個讀者,則釋放互斥鎖 mutex_write,以允許寫者寫入數據。寫者在寫入數據之前,獲取互斥鎖 mutex_write,以獨佔訪問共享數據。寫者在寫入數據之後,釋放互斥鎖 mutex_write,以允許讀者讀取數據。在寫者優先的情況下:

void *writer(void *in) {
    //    while(1)
    //    {
    sem_wait(&sem_write);
    {
        //臨界區,希望修改 writerCnt,獨佔 writerCnt
        writerCnt++;
        if (writerCnt == 1) {
            //阻止後續的讀者加入待讀隊列
            pthread_mutex_lock(&mutex_read);
        }
        sem_post(&sem_write);
        pthread_mutex_lock(&mutex_write);
        {
            //臨界區,限制只有一個寫者修改數據
            printf("寫線程id%d進入數據集\n", pthread_self());
            write();
            printf("寫線程id%d退出數據集\n", pthread_self());
        }
        pthread_mutex_unlock(&mutex_write);
        sem_wait(&sem_write);
        {
            //臨界區,希望修改 writerCnt,獨佔 writerCnt
            writerCnt--;
            if (writerCnt == 0) {
                //阻止後續的讀者加入待讀隊列
                pthread_mutex_unlock(&mutex_read);
            }
            sem_post(&sem_write);
        }
        sleep(W_SLEEP);
    }
    pthread_exit((void *)0);
}

void *reader(void *in) {
    //    while(1)
    //    {
    //假如寫者鎖定了 mutex_read,那麼成千上萬的讀者被鎖在這裏
    pthread_mutex_lock(&mutex_read);
    //只被一個讀者佔有
    {
        //臨界區
        sem_wait(&sem_read);
        //代碼段 1
        {
            //臨界區
            readerCnt++;
            if (readerCnt == 1) {
                pthread_mutex_lock(&mutex_write);
            }
            sem_post(&sem_read);
        }
        pthread_mutex_unlock(&mutex_read);
        //釋放時,寫者將優先獲得 mutex_read
        printf("讀線程id%d進入數據集\n", pthread_self());
        read();
        printf("讀線程id%d退出數據集\n", pthread_self());
        sem_wait(&sem_read);
        //代碼段2
        {
            //臨界區
            readerCnt--;
            if (readerCnt == 0) {
                pthread_mutex_unlock(&mutex_write);
                //在最後一個併發讀者讀完這裏開始禁止寫者執行寫操作
            }
            sem_post(&sem_read);
        }
        sleep(R_SLEEP);
    }
    pthread_exit((void *)0);
}

這裏使用了兩個互斥鎖 mutex_write 和 mutex_read 以及兩個信號量 sem_read 和 sem_write 來實現寫者優先的讀者 - 寫者問題。寫者在寫入數據之前,先增加寫者計數器 writerCnt,如果是第一個寫者,則獲取互斥鎖 mutex_read,以阻止讀者讀取數據。寫者在寫入數據之後,減少寫者計數器 writerCnt,如果是最後一個寫者,則釋放互斥鎖 mutex_read,以允許讀者讀取數據。讀者在讀取數據之前,先獲取互斥鎖 mutex_read,以防止寫者寫入數據。讀者在讀取數據之後,釋放互斥鎖 mutex_read,以允許寫者寫入數據。

5.3 計數器問題

多個線程需要對同一個計數器進行遞增操作,需要同步以保證計數的準確性。原子操作可以用於實現計數器問題,通過原子操作確保計數器的遞增操作是原子性的。例如:

static int count = 0;
void *test_func(void *arg) {
    int i = 0;
    for (i = 0; i < 20000; ++i) {
        __sync_fetch_and_add(&count, 1);
    }
    return NULL;
}
int main(int argc, const char *argv[]) {
    pthread_t id[20];
    int i = 0;
    for (i = 0; i < 20; ++i) {
        pthread_create(&id[i], NULL, test_func, NULL);
    }
    for (i = 0; i < 20; ++i) {
        pthread_join(id[i], NULL);
    }
    printf("%d\n", count);
    return 0;
}

在這個例子中,使用了__sync_fetch_and_add 函數來實現原子操作,確保多個線程對計數器 count 的遞增操作是原子性的,從而保證了計數的準確性。

六、全文總結

Linux 線程同步是多線程編程中的關鍵環節,本文介紹了 Linux 線程同步的方法、實現機制和應用場景。通過合理地使用互斥鎖、條件變量、信號量和 Futex 等線程同步機制,可以有效地避免競態條件和數據不一致的情況,提高多線程程序的穩定性和可靠性。

在實際的多線程編程中,我們需要根據具體的應用場景選擇合適的線程同步機制。例如,對於簡單的資源互斥訪問,可以使用互斥鎖;對於需要等待特定條件的情況,可以使用條件變量;對於控制對一組資源的訪問,可以使用信號量。而 Futex 則可以在需要高效線程同步的場景下發揮作用,通過減少系統調用的次數來提高效率。

同時,在使用這些線程同步機制時,我們需要注意一些問題,如避免死鎖、防止 “驚羣效應” 等。正確使用線程同步機制是至關重要的,否則可能會導致程序出現各種問題,如數據錯誤、死鎖等。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/727JDBbuIL3CjnWgYQcPQA