高效併發編程利器:掌握無鎖編程的精髓

無鎖編程是一種併發編程的技術,旨在避免使用傳統的鎖機制來保護共享數據。相比有鎖編程,無鎖編程可以提供更高的併發性能和可伸縮性。在無鎖編程中,線程或進程通過使用原子操作、CAS(Compare-and-Swap)等技術來實現對共享數據的訪問和修改,而不需要依賴互斥鎖。

無鎖編程常用於高併發場景,如網絡服務器、多線程應用程序等。它可以減少線程之間的競爭和阻塞,並且能夠充分利用多核處理器的計算能力。然而,由於無鎖編程對開發者要求較高,在設計和實現上也更加複雜,需要考慮線程安全性、內存模型等因素。

一、概述

什麼是無鎖編程

無鎖編程,即不使用鎖的情況下實現多線程之間的變量同步,也就是在沒有線程被阻塞的情況下實現變量的同步,所以也叫非阻塞同步(Non-blocking Synchronization),實現非阻塞同步的方案稱爲 “無鎖編程算法”。

爲什麼要非阻塞同步,使用 lock 實現線程同步有非常多缺點:

假設在不使用 lock 的情況下,實現變量同步,那就會避免非常多問題。儘管眼下來看,無鎖編程並不能替代 lock。

實現級別

非同步阻塞的實現分爲三個級別:wait-free/lock-free/obstruction-free

(1)wait-free

(2)lock-free

(3)obstruction-free

在不論什麼時間點,一個線程被隔離爲一個事務進行運行(其它線程 suspended),而且在有限步驟內完畢。在運行過程中,一旦發現數據被改動(採用時間戳、版本),則回滾,也叫做樂觀鎖,即樂觀併發控制 (OOC)。

事務的過程是:

lock-free 必然是 obstruction-free 的。

爲什麼要無鎖?

首先是性能考慮。通信項目一般對性能有極致的追求,這是我們使用無鎖的重要原因。當然,無鎖算法如果實現的不好,性能可能還不如使用鎖,所以我們選擇比較擅長的數據結構和算法進行 lock-free 實現,比如 Queue,對於比較複雜的數據結構和算法我們通過 lock 來控制,比如 Map(雖然我們實現了無鎖 Hash,但是大小是限定的,而 Map 是大小不限定的),對於性能數據,後續文章會給出無鎖和有鎖的對比。

次是避免鎖的使用引起的錯誤和問題。

死鎖 (dead lock):兩個以上線程互相等待

鎖護送 (lock convoy):多個同優先級的線程反覆競爭同一個鎖,搶佔鎖失敗後強制上下文切換,引起性能下降

優先級反轉 (priority inversion):低優先級線程擁有鎖時被中優先級的線程搶佔,而高優先級的線程因爲申請不到鎖被阻塞
如何無鎖?

在現代的 CPU 處理器上,很多操作已經被設計爲原子的,比如對齊讀(Aligned Read)和對齊寫(Aligned Write)等。Read-Modify-Write(RMW)操作的設計讓執行更復雜的事務操作變成了原子操作,當有多個寫入者想對相同的內存進行修改時,保證一次只執行一個操作。

RMW 操作在不同的 CPU 家族中是通過不同的方式來支持的:

x86/64 和 Itanium 架構通過 Compare-And-Swap (CAS) 方式來實現

PowerPC、MIPS 和 ARM 架構通過 Load-Link/Store-Conditional (LL/SC) 方式來實現

在 x64 下進行實踐的,用的是 CAS 操作,CAS 操作是 lock-free 技術的基礎,我們可以用下面的代碼來描述:

template <class T>
bool CAS(T* addr, T expected, T value)
{
  if (*addr == expected)
  {
     *addr = value;
     return true;
  }
  return false;
}

在 GCC 中,CAS 操作如下所示:

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

這兩個函數提供原子的比較和交換,如果 * ptr == oldval,就將 newval 寫入 * ptr,第一個函數在相等並寫入的情況下返回 true,第二個函數的內置行爲和第一個函數相同,只是它返回操作之前的值。

後面的可擴展參數 (...) 用來指出哪些變量需要 memory barrier,因爲目前 gcc 實現的是 full barrier,所以可以略掉這個參數, 除過 CAS 操作,GCC 還提供了其他一些原子操作,可以在無鎖算法中靈活使用:

type __sync_fetch_and_add (type *ptr, type value, ...)
type __sync_fetch_and_sub (type *ptr, type value, ...)
type __sync_fetch_and_or (type *ptr, type value, ...)
type __sync_fetch_and_and (type *ptr, type value, ...)
type __sync_fetch_and_xor (type *ptr, type value, ...)
type __sync_fetch_and_nand (type *ptr, type value, ...)

type __sync_add_and_fetch (type *ptr, type value, ...)
type __sync_sub_and_fetch (type *ptr, type value, ...)
type __sync_or_and_fetch (type *ptr, type value, ...)
type __sync_and_and_fetch (type *ptr, type value, ...)
type __sync_xor_and_fetch (type *ptr, type value, ...)
type __sync_nand_and_fetch (type *ptr, type value, ...)

sync* 系列的 built-in 函數,用於提供加減和邏輯運算的原子操作。這兩組函數的區別在於第一組返回更新前的值,第二組返回更新後的值。

無鎖算法感觸最深的是複雜度的分解,比如多線程對於一個雙向鏈表的插入或刪除操作,如何能一步一步分解成一個一個串聯的原子操作,並能保證事務內存的一致性。

二、工作原理

無鎖編程具體使用和考慮到的技術方法包括:原子操作(atomic operations), 內存柵欄(memory barriers), 內存順序衝突(memory order), 指令序列一致性(sequential consistency)和順 ABA 現象等等。

在這其中最基礎最重要的是操作的原子性或說原子操作。原子操作可以理解爲在執行完畢之前不會被任何其它任務或事件中斷的一系列操作。原子操作是非阻塞編程最核心基本的部分,沒有原子操作的話,操作會因爲中斷異常等各種原因引起數據狀態的不一致從而影響到程序的正確。

對於原子操作的實現機制,在硬件層面上 CPU 處理器會默認保證基本的內存操作的原子性,CPU 保證從系統內存當中讀取或者寫入一個字節的行爲肯定是原子的,當一個處理器讀取一個字節時,其他 CPU 處理器不能訪問這個字節的內存地址。但是對於複雜的內存操作 CPU 處理器不能自動保證其原子性,比如跨總線寬度或者跨多個緩存行(Cache Line),跨頁表的訪問等。這個時候就需要用到 CPU 指令集中設計的原子操作指令,現在大部分 CPU 指令集都會支持一系列的原子操作。而在無鎖編程中經常用到的原子操作是 Read-Modify-Write (RMW)這種類型的,這其中最常用的原子操作又是 COMPARE AND SWAP(CAS),幾乎所有的 CPU 指令集都支持 CAS 的原子操作,比如 X86 平臺下中的是 CMPXCHG。

繼續說一下 CAS,CAS 操作行爲是比較某個內存地址處的內容是否和期望值一致,如果一致則將該地址處的數值替換爲一個新值。CAS 能夠操作的位數越多,使用它來實現鎖無關的數據結構就越容易(細節可以在 intel 手冊中查看)。CAS 操作具體的實現原理主要是兩種方式:總線鎖定和緩存鎖定。所謂總線鎖定,就是 CPU 執行某條指令的時候先鎖住數據總線的, 使用同一條數據總線的 CPU 就無法訪問內存了,在指令執行完成後再釋放鎖住的數據總線。鎖住數據總線的方式系統開銷很大,限制了訪問內存的效率,所以又有了基於 CPU 緩存一致性來保持操作原子性作的方法作爲補充,簡單來說就是用 CPU 的緩存一致性的機制來防止內存區域的數據被兩個以上的處理器修改(可詳見 CPU 緩存的 MESI 協議)。

在操作系統的層面,Linux 系統提供了軟件級的原子操作,包括兩大類系統調用,一類是基於對整數進行操作的 atomic_set/and/inc,一類是針對單獨的位進行操作的 set/clear/change_bit,它們大部分都是基於硬件層面的 CAS 的指令實現的。

在各種開發語言中(c,c++,java)基於操作系統提供的接口也都封裝實現了對應的原子操作 api,所以開發者完全可以直接調用各個開發語言提供的接口實現無鎖程序。

除了使用原子操作保證操作的原子性,還要考慮在不用的語言和內存模型下,如何保證,操作的順序性, 編譯時和運行時的指令重排序,還有由假共享引起內存順序衝突(Memory order violation)等等細節。原子性確保指令執行期間不被打斷,要麼全部執行,要麼根本不執行,而順序性確保即使兩條或多條指令出現在獨立的執行線程中或者獨立的處理器上時,保持它們本該執行的順序。假共享是指多個 CPU 同時修改同一個緩存行的不同部分而引起其中一個 CPU 的操作無效,當出現這個內存順序衝突時,CPU 必須清空流水線。這些其實在多核編程的環境下的 locked-base 的實現中也有類似的問題所以這裏就不多展開。

ABA 問題

ABA 問題可以俗稱爲 “調包問題”,我們先看一個生活化的例子:

你拿着一個裝滿錢的手提箱在飛機場,此時過來了一個火辣性感的美女,然後她很暖昧地挑逗着你,並趁你不注意的時候,把用一個一模一樣的手提箱和你那裝滿錢的箱子調了個包,然後就離開了,你看到你的手提箱還在那,於是就提着手提箱去趕飛機了。

我們再看一個 CAS 化的例子:

若線程對同一內存地址進行了兩次讀操作,而兩次讀操作得到了相同的值,通過 "值相同" 來判定 "值沒變" 是不可靠的。因爲在這兩次讀操作的時間間隔之內,另外的線程可能已經多次修改了該值,這樣就相當於欺騙了前面的線程,使其認爲 "值沒變",實際上值已經被篡改過了。

下面是 ABA 問題發生的過程:

  1. T1 線程從共享的內存地址讀取值 A;

  2. T1 線程被搶佔,線程 T2 開始運行;

  3. T2 線程將共享的內存地址中的值由 A 改成 B,然後又改成 A;

  4. T1 線程繼續執行,讀取共享的內存地址中的值 A,認爲沒有改變,然後繼續執行

由於 T1 並不知道兩次讀取的值 A 已經被 "隱性" 的修改過,所以可能產生無法預期的結果。
當 CAS 操作循環執行時,存在多個線程交錯地對共享的內存地址進行處理,如果實現的不正確,將有可能遇到 ABA 問題。

三、無鎖隊列

無鎖隊列是 lock-free 中最基本的數據結構,一般應用場景是資源分配,比如 TimerId 的分配,WorkerId 的分配,上電內存初始塊數的申請等等。對於多線程用戶來說,無鎖隊列的入隊和出隊操作是線程安全的,不用再加鎖控制。

API

ErrCode initQueue(void** queue, U32 unitSize, U32 maxUnitNum);
ErrCode enQueue(void* queue, void* unit);
ErrCode deQueue(void* queue, void* unit);
U32 getQueueSize(void* queue);
BOOL isQueueEmpty(void* queue);

API 使用示例

我們以定時器 Id 的管理爲例,瞭解一下無鎖隊列主要 API 的使用。

初始化:主線程調用

ErrCode ret = initQueue(&queue, sizeof(U32), MAX_TMCB_NUM);
if (ret != ERR_TIMER_SUCC)
{
   ERR_LOG("lock free init_queue error: %u\n", ret);
   return;
}

for (U32 timerId = 0; timerId < MAX_TMCB_NUM; timerId++)
{
    ret = enQueue(queue, &timerId);
    if (ret != ERR_TIMER_SUCC)
    {
        ERR_LOG("lock free enqueue error: %u\n", ret);
        return;
    }
}

timerId 分配:多線程調用

U32 timerId;
ErrCode ret = deQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
    ERR_LOG("deQueue failed!");
    return INVALID_TIMER_ID;
}

timerId 回收:多線程調用

ErrCode ret = enQueue(queue, &timerId);
if (ret != ERR_TIMER_SUCC)
{
    ERR_LOG("enQueue failed!");
}

核心實現

顯然,隊列操作的核心實現爲入隊和出隊操作。

入隊

入隊的關鍵點有下面幾點:

  1. 通過寫次數確保隊列元素數小於最大元素數

  2. 獲取 next tail 的位置

  3. 將新元素插入到隊尾

  4. 尾指針偏移

  5. 讀次數加 1

最大元素數校驗

#include <atomic>

template<typename T>
class LockFreeQueue {
private:
    struct Node {
        T data;
        Node* next;

        Node(const T& value) : data(value), next(nullptr) {}
    };

    std::atomic<Node*> head;
    std::atomic<Node*> tail;
    std::atomic<int> count;

public:
    LockFreeQueue() : head(nullptr), tail(nullptr), count(0) {}

    void push(const T& value) {
        Node* newNode = new Node(value);
        newNode->next = nullptr;

        Node* prevTail = tail.exchange(newNode, std::memory_order_acq_rel);
        if (prevTail != nullptr)
            prevTail->next = newNode;

        count.fetch_add(1, std::memory_order_release);
    }

    bool pop(T& value) {
        if (head == nullptr)
            return false;

        Node* oldHead = head.load(std::memory_order_acquire);
        Node* newHead = oldHead->next;
        
        if (newHead == nullptr)
            return false;

        value = newHead->data;
        
        head.store(newHead, std::memory_order_release);

        delete oldHead;

        count.fetch_sub(1, std::memory_order_release);

        return true;
    }

    int size() const {
        return count.load(std::memory_order_relaxed);
    }
};

在入隊操作開始,就判斷寫次數是否超過隊列元素的最大值,如果已超過,則反饋隊列已滿的錯誤碼,否則通過 CAS 操作將寫次數加 1。如果 CAS 操作失敗,說明有多個線程同時判斷了寫次數都小於隊列最大元素數,那麼只有一個線程 CAS 操作成功,其他線程則需要重新做循環操作。

獲取 next tail 的位置

do
{
    do
    {
        nextTail = queueHead->nextTail;
    } while (!__sync_bool_compare_and_swap(&queueHead->nextTail, nextTail, (nextTail + 1) % (queueHead->maxUnitNum + 1)));

    unitHead = UNIT_HEAD(queue, nextTail);
} while (unitHead->hasUsed);

當前 next tail 的位置就是即將入隊的元素的目標位置,並通過 CAS 操作更新隊列頭中 nextTail 的值。如果 CAS 操作失敗,則說明其他線程也正在進行入隊操作,並比本線程快,則需要進行重新嘗試,從而更新 next tail 的值,確保該入隊元素的位置正確。

然而事情並沒有這麼簡單,由於多線程的搶佔,導致隊列並不是按下標大小依次鏈接起來的,所以要判斷一下 next tail 的位置是否正被佔用。如果 next tail 的位置正被佔用,則需要重新競爭 next tail,直到 next tail 的位置是空閒的。

將新元素插入到隊尾

初始化新元素:

unitHead->next = LIST_END;
memcpy(UNIT_DATA(queue, nextTail), unit, queueHead->unitSize);

插入到隊尾:

do
{
    listTail = queueHead->listTail;
    oldListTail = listTail;
    unitHead = UNIT_HEAD(queue, listTail);

    if ((++tryTimes) >= 3)
    {
        while (unitHead->next != LIST_END)
        {
            listTail = unitHead->next;
            unitHead = UNIT_HEAD(queue, listTail);
        }
    }
} while (!__sync_bool_compare_and_swap(&unitHead->next, LIST_END, nextTail));

通過 CAS 操作判斷當前指針是否到達隊尾,如果到達隊尾,則將新元素連接到隊尾元素之後 (next),否則進行追趕。

在這裏,我們做了優化,當 CAS 操作連續失敗 3 次後,那麼就直接通過 next 的遞推找到隊尾,這樣比 CAS 操作的效率高很多。我們在測試多線程的隊列操作時,出現過一個線程插入到 tail 爲 400 多的時候,已有線程插入到 tail 爲 1000 多的場景。

尾指針偏移

do
{
    __sync_val_compare_and_swap(&queueHead->listTail, oldListTail, nextTail);
    oldListTail = queueHead->listTail;
    unitHead = UNIT_HEAD(queue, oldListTail);
    nextTail = unitHead->next;
} while (nextTail != LIST_END);

在多線程場景下,隊尾指針是動態變化的,當前尾可能不是新尾了,所以通過 CAS 操作更新隊尾。當 CAS 操作失敗時,說明隊尾已經被其他線程更新,此時不能將 nextTail 賦值給隊尾。

讀次數加 1

__sync_fetch_and_add(&queueHead->readCount, 1);

寫次數加 1 是爲了保證隊列元素的數不能超過最大元素數,而讀次數加 1 是爲了確保不能從空隊列出隊。

出隊

出隊的關鍵點有下面幾點:

  1. 通過讀次數確保不能從空隊列出隊

  2. 頭指針偏移

  3. dummy 頭指針

  4. 寫次數減 1

空隊列校驗

do
{
    readCount = queueHead->readCount;
    if (readCount == 0) return ERR_QUEUE_HAS_EMPTY;
} while (!__sync_bool_compare_and_swap(&queueHead->readCount, readCount, readCount - 1));

讀次數爲 0,說明隊列爲空,否則通過 CAS 操作將讀次數減 1。如果 CAS 操作失敗,說明其他線程已更新讀次數成功,必須重試,直到成功。

頭指針偏移

U32 readCount;
do
{
    listHead = queueHead->listHead;
    unitHead = UNIT_HEAD(queue, listHead);
} while (!__sync_bool_compare_and_swap(&queueHead->listHead, listHead, unitHead->next));

如果 CAS 操作失敗,說明隊頭指針已經在其他線程的操作下進行了偏移,所以要重試,直到成功。

dummy 頭指針

我們可以看出,頭元素爲 head->next,這就是說隊列的第一個元素都是基於 head->next 而不是 head,這樣設計是有原因的。

考慮一個邊界條件:在隊列只有一個元素條件下,如果 head 和 tail 指針指向同一個結點,這樣入隊操作和出隊操作本身就需要互斥了。通過引入一個 dummy 頭指針來解決這個問題,如下圖所示:

寫次數減 1

__sync_fetch_and_sub(&queueHead->writeCount, 1);

出隊操作結束前,要將寫次數減 1,以便入隊操作能成功。

四、無鎖編程技術

事實證明,當你試圖滿足無鎖編程的無阻塞條件時,會出現一系列技術:原子操作、內存屏障、避免 ABA 問題,等等。從這裏開始,事情很快變得棘手了。

原子的 Read-Modify-Write 操作

所謂原子操作是指,通過一種看起來不可分割的方式來操作內存:線程無法看到原子操作的中間過程。在現代的處理器上,有很多操作本身就是的原子的。例如,對簡單類型的對齊的讀和寫通常就是原子的。

Read-Modify-Write(RMW)操作更進一步,它允許你按照原子的方式,操作更復雜的事務。當一個無鎖的算法必須支持多個寫入者時,原子操作會尤其有用,因爲多個線程試圖在同一個地址上進行 RMW 時,它們會按 “一次一個” 的方式排隊執行這些操作。我已經在我的博客中涉及了 RMW 操作了,如實現 輕量級互斥量、遞歸互斥量 和 輕量級日誌系統。

RMW 操作的例子包括:Win32 上 的 _InterlockedIncrement,iOS 上的 OSAtomicAdd32 以及 C++11 中的 std::atomic::fetch_add。需要注意的是,C++11 的原子標準不保證其在每個平臺上的實現都是無鎖的,因此最好要清楚你的平臺和工具鏈的能力。你可以調用 std::atomic<>::is_lock_free 來確認一下。

不同的 CPU 系列支持 RMW 的方式也是不同的。例如,PowerPC 和 ARM 提供 load-link/store-conditional 指令,這實際上是允許你實現你自定義的底層 RMW 操作。常用的 RMW 操作就已經足夠了。

如上面流程圖所述,即使在單處理器系統上,原子的 RMW 操作也是無鎖編程的必要部分。沒有原子性的話,一個線程的事務會被中途打斷,這可能會導致一個錯誤的狀態。

Compare-And-Swap 循環

或許,最常討論的 RMW 操作是 compare-and-swap (CAS)。在 Win32 上,CAS 是通過如 _InterlockedCompareExchange 等一系列指令來提供的。通常,程序員會在一個事務中使用 CAS 循環。這個模式通常包括:複製一個共享的變量至本地變量,做一些特定的工作(改動),再試圖使用 CAS 發佈這些改動。

void LockFreeQueue::push(Node* newHead)
{
    for (;;)
    {
        // Copy a shared variable (m_Head) to a local.
        Node* oldHead = m_Head;

        // Do some speculative work, not yet visible to other threads.
        newHead->next = oldHead;

        // Next, attempt to publish our changes to the shared variable.
        // If the shared variable hasn't changed, the CAS succeeds and we return.
        // Otherwise, repeat.
        if (_InterlockedCompareExchange(&m_Head, newHead, oldHead) == oldHead)
            return;
    }
}

這樣的循環仍然有資格作爲無鎖的,因爲如果一個線程檢測失敗,意味着有其它線程成功—儘管某些架構提供一個 較弱的 CAS 變種 。無論何時實現一個 CAS 循環,都必須十分小心地避免 ABA 問題。

順序一致性

順序一致性(Sequential consistency)意味着,所有線程就內存操作的順序達成一致。這個順序是和操作在程序源代碼中的順序是一致的。

一種實現順序一致性的簡單(但顯然不切實際)的方法是禁用編譯器優化,並強制所有線程在單個處理器上運行。即使線程在任意時間被搶佔和調度,處理器也永遠不會看到自己的內存影響。

某些編程語言甚至可以爲在多處理器環境中運行的優化代碼提供順序一致性。在 C ++ 11 中,可以將所有共享變量聲明爲具有默認內存排序約束的 C ++ 11 原子類型。

std::atomic X(0), Y(0);
int r1, r2;
 
void thread1()
{
    X.store(1);
    r1 = Y.load();
}
 
void thread2()
{
    Y.store(1);
    r2 = X.load();
}

因爲 C ++ 11 原子類型保證順序一致性,所以結果 r1 = r2 = 0 是不可能的。爲此,編譯器會在後臺輸出其他指令——通常是 內存圍欄 和 / 或 RMW 操作。與程序員直接處理內存排序的指令相比,那些額外的指令可能會使實現效率降低。

內存保序

正如前面流程圖所建議的那樣,任何時候做多核(或者任何對稱多處理器)的無鎖編程,如果你的環境不能保證順序一致性,你都必須考慮如何來防止 內存重新排序。

在當今的架構中,增強內存保序性的工具通常分爲三類,它們既防止 編譯器重新排序 又防止 處理器重新排序:

獲取語義可防止按照程序順序對其進行操作的內存重新排序,而釋放語義則可防止對其進行操作前的內存重新排序。這些語義尤其適用於存在生產者 / 消費者關係的情況,其中一個線程發佈一些信息,而另一個線程讀取它。

不同的處理器有不同的內存模型

在內存重新排序方面,不同的 CPU 系列具有不同的習慣。每個 CPU 供應商都記錄了這些規則,並嚴格遵循了硬件。例如,PowerPC 和 ARM 處理器可以相對於指令本身更改內存存儲的順序,但是通常,英特爾和 AMD 的 x86 / 64 系列處理器不會。

有人傾向於將這些特定於平臺的細節抽象出來,尤其是 C ++ 11 爲我們提供了編寫可移植的無鎖代碼的標準方法。但是目前,我認爲大多數無鎖程序員至少對平臺差異有所瞭解。如果需要記住一個關鍵的區別,那就是在 x86 / 64 指令級別上,每次從內存中加載都會獲取語義,並且每次存儲到內存都將提供釋放語義–至少對於非 SSE 指令和非寫組合內存 。因此,過去通常會編寫可在 x86 / 64 上運行但 在其他處理器上無法運行 的無鎖代碼。

如果你對硬件如何處理內存重新排序的細節感興趣,我建議看看《Is Pararllel Programming Hard》的附錄 C。無論如何,請記住,由於編譯器對指令的重新排序,也會發生內存重新排序。

五、無鎖編程實例

原子操作是無鎖編程的基石,原子操作是不可分隔的操作,一般通過 CAS(Compare andSwap) 操作實現,CAS 操作需要輸入兩個數值,一箇舊值(期望操作前的值)和一個新值,在操作期間先比較下舊值有沒有發生變化,如果沒有發生變化,才交換成新值,發生了變化則不交換。

C++11 的線程庫爲我們提供了一系列原子類型,同時提供了相對應的原子操作,原子類型的基本使用方法如下:

#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>

using namespace std;
atomic<int> i = 0;

void iplusplus() {
    int c = 10000000;  //循環次數
    while (c--) {
        i++;
    }
}
int main()
{
    chrono::steady_clock::time_point start_time = chrono::steady_clock::now();//開始時間
    thread thread1(iplusplus);
    thread thread2(iplusplus);
    thread1.join();  // 等待線程1運行完畢
    thread2.join();  // 等待線程2運行完畢
    cout << "i = " << i << endl;
    chrono::steady_clock::time_point stop_time = chrono::steady_clock::now();//結束時間
    chrono::duration<double> time_span = chrono::duration_cast<chrono::microseconds>(stop_time - start_time);
    std::cout << "共耗時:" << time_span.count() << " ms" << endl; // 耗時
    system("pause");
    return 0;
}

代碼的第 8 行定義了一個原子類型(int)變量 i,在第 13 行多線程修改 i 的時候即可免去加鎖和解鎖的步驟,同時又能保證變量 i 的線程安全性。代碼運行結果如下:

可以看到 i 的值是符合預期的,代碼運行總耗時 1.12731ms,僅爲有鎖編程的耗時 3.37328ms 的 1/3,由此可以看出無鎖編程由於避免了加鎖而相對於有鎖編程提高了一定的性能。

無鎖編程優缺點

無鎖編程的優點:

  1. 訪問共享變量只需要一條語句(CAS 原子操作),可以節省每次對共享變量進行操作都進行的加鎖解鎖動作,節省了系統開銷。

  2. 併發競爭的時候使用自旋嘗試的方法,避免了用鎖的情況下線程因阻塞而頻繁的切換。

無鎖編程的缺點:

  1. 只能對簡單的原子類型進行修改,不支持對大型的數據對象進行修改,或者說只能保證一條代碼的原子性,不能保證代碼塊的原子性。

  2. 在併發量比較高的情況下,如果許多線程更新變量不成功,一直自旋反覆嘗試,會給 CPU 帶來很大的負擔。

無鎖編程的擴展應用

那麼這裏就牽涉到一個問題了,在高併發場景下,如何高效地控制多個線程對一個很大的對象的訪問呢?

使用無鎖編程:無鎖編程只能對簡單的原子類型進行修改,不支持對大型的數據對象進行修改。

使用鎖:修改一個很大的對象需要很長的一段代碼和處理過程,如果寫線程鎖住這整個修改過程,那麼會導致所有的讀線程被阻塞很長一段時間。

當然如果這是一個高一致性的場景,也只能使用鎖了。但是如果這並不是一個對一致性要求非常高的場景呢?比如可以允許修改內容在 10s 內生效而不是立即生效,那麼就可以結合無鎖編程使用一種高效的方法:

用指針來訪問這個共享對象,寫線程修改對象時,不直接在對象上進行修改,而是新建一個對象進行賦值,賦值完成後,再通過 CAS 將指針指向這個新的對象,然後銷燬舊的對象。通過這種方法,可以將臨界區從一大段代碼縮減爲一句代碼,極大的減少了對讀線程的影響。

修改指針這一步只有一條代碼語句,雖然也可以用加鎖的方式來保證線程安全,但是恰好無鎖編程也是隻能對一條語句進行操作,所以無鎖編程非常符合這種方式,往往一想到無鎖編程就想到這種方法。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/nijfKgosrpmXwsfkdyrhBQ