ConcurrentHashMap 原理分析

作者:tony 的博客小屋

鏈接:cnblogs.com/ITtangtang/p/3948786.html

線程不安全的 HashMap

因爲多線程環境下,使用 Hashmap 進行 put 操作會引起死循環,導致 CPU 利用率接近 100%,所以在併發情況下不能使用 HashMap。

效率低下的 HashTable 容器

HashTable 容器使用 synchronized 來保證線程安全,但在線程競爭激烈的情況下 HashTable 的效率非常低下。因爲當一個線程訪問 HashTable 的同步方法時,其他線程訪問 HashTable 的同步方法時,可能會進入阻塞或輪詢狀態。如線程 1 使用 put 進行添加元素,線程 2 不但不能使用 put 方法添加元素,並且也不能使用 get 方法來獲取元素,所以競爭越激烈效率越低。

鎖分段技術

HashTable 容器在競爭激烈的併發環境下表現出效率低下的原因,是因爲所有訪問 HashTable 的線程都必須競爭同一把鎖,那假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麼當多線程訪問容器裏不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效的提高併發訪問效率,這就是 ConcurrentHashMap 所使用的鎖分段技術,首先將數據分成一段一段的存儲,然後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。有些方法需要跨段,比如 size()和 containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段,這需要按順序鎖定所有段,操作完畢後,又按順序釋放所有段的鎖。這裏 “按順序” 是很重要的,否則極有可能出現死鎖,在 ConcurrentHashMap 內部,段數組是 final 的,並且其成員變量實際上也是 final 的,但是,僅僅是將數組聲明爲 final 的並不保證數組成員也是 final 的,這需要實現上的保證。這可以確保不會出現死鎖,因爲獲得鎖的順序是固定的。

ConcurrentHashMap 是由 Segment 數組結構和 HashEntry 數組結構組成。Segment 是一種可重入鎖 ReentrantLock,在 ConcurrentHashMap 裏扮演鎖的角色,HashEntry 則用於存儲鍵值對數據。一個 ConcurrentHashMap 裏包含一個 Segment 數組,Segment 的結構和 HashMap 類似,是一種數組和鏈表結構, 一個 Segment 裏包含一個 HashEntry 數組,每個 HashEntry 是一個鏈表結構的元素, 每個 Segment 守護者一個 HashEntry 數組裏的元素, 當對 HashEntry 數組的數據進行修改時,必須首先獲得它對應的 Segment 鎖。

當有一個大數組時需要在多個線程共享時就可以考慮是否把它給分層多個節點了,避免大鎖。並可以考慮通過 hash 算法進行一些模塊定位。

其實不止用於線程,當設計數據表的事務時(事務某種意義上也是同步機制的體現),可以把一個表看成一個需要同步的數組,如果操作的表數據太多時就可以考慮事務分離了(這也是爲什麼要避免大表的出現),比如把數據進行字段拆分,水平分表等.

ConcurrentHashMap(1.7 及之前) 中主要實體類就是三個:ConcurrentHashMap(整個 Hash 表),Segment(桶),HashEntry(節點),對應上面的圖可以看出之間的關係

/**
* The segments, each of which is a specialized hash table
*/  
final Segment<K,V>[] segments;

不變 (Immutable) 和易變(Volatile)

ConcurrentHashMap 完全允許多個讀操作併發進行,讀操作並不需要加鎖。如果使用傳統的技術,如 HashMap 中的實現,如果允許可以在 hash 鏈的中間添加或刪除元素,讀操作不加鎖將得到不一致的數據。ConcurrentHashMap 實現技術是保證 HashEntry 幾乎是不可變的。HashEntry 代表每個 hash 鏈中的一個節點,其結構如下所示:

static final class HashEntry<K,V> {
     final K key;
     final int hash;
     volatile V value;
     final HashEntry<K,V> next;
 }

可以看到除了 value 不是 final 的,其它值都是 final 的,這意味着不能從 hash 鏈的中間或尾部添加或刪除節點,因爲這需要修改 next 引用值,所有的節點的修改只能從頭部開始。對於 put 操作,可以一律添加到 Hash 鏈的頭部。但是對於 remove 操作,可能需要從中間刪除一個節點,這就需要將要刪除節點的前面所有節點整個複製一遍,最後一個節點指向要刪除結點的下一個結點。這在講解刪除操作時還會詳述。爲了確保讀操作能夠看到最新的值,將 value 設置成 volatile,這避免了加鎖。

其它

爲了加快定位段以及段中 hash 槽的速度,每個段 hash 槽的的個數都是 2^n,這使得通過位運算就可以定位段和段中 hash 槽的位置。當併發級別爲默認值 16 時,也就是段的個數,hash 值的高 4 位決定分配在哪個段中。但是我們也不要忘記《算法導論》給我們的教訓:hash 槽的的個數不應該是 2^n,這可能導致 hash 槽分配不均,這需要對 hash 值重新再 hash 一次。(這段似乎有點多餘了 )

定位操作:

final Segment<K,V> segmentFor(int hash) {
     return segments[(hash >>> segmentShift) & segmentMask];
 }

既然 ConcurrentHashMap 使用分段鎖 Segment 來保護不同段的數據,那麼在插入和獲取元素的時候,必須先通過哈希算法定位到 Segment。可以看到 ConcurrentHashMap 會首先使用 Wang/Jenkins hash 的變種算法對元素的 hashCode 進行一次再哈希。

再哈希,其目的是爲了減少哈希衝突,使元素能夠均勻的分佈在不同的 Segment 上,從而提高容器的存取效率。假如哈希的質量差到極點,那麼所有的元素都在一個 Segment 中,不僅存取元素緩慢,分段鎖也會失去意義。我做了一個測試,不通過再哈希而直接執行哈希計算。

System.out.println(Integer.parseInt("0001111", 2) & 15);

System.out.println(Integer.parseInt("0011111", 2) & 15);

System.out.println(Integer.parseInt("0111111", 2) & 15);

System.out.println(Integer.parseInt("1111111", 2) & 15);

計算後輸出的哈希值全是 15,通過這個例子可以發現如果不進行再哈希,哈希衝突會非常嚴重,因爲只要低位一樣,無論高位是什麼數,其哈希值總是一樣。我們再把上面的二進制數據進行再哈希後結果如下,爲了方便閱讀,不足 32 位的高位補了 0,每隔四位用豎線分割下。

0100|0111|0110|0111|1101|1010|0100|1110

1111|0111|0100|0011|0000|0001|1011|1000

0111|0111|0110|1001|0100|0110|0011|1110

1000|0011|0000|0000|1100|1000|0001|1010

可以發現每一位的數據都散列開了,通過這種再哈希能讓數字的每一位都能參加到哈希運算當中,從而減少哈希衝突。ConcurrentHashMap 通過以下哈希算法定位 segment。

默認情況下 segmentShift 爲 28,segmentMask 爲 15,再哈希後的數最大是 32 位二進制數據,向右無符號移動 28 位,意思是讓高 4 位參與到 hash 運算中, (hash>>> segmentShift) & segmentMask 的運算結果分別是 4,15,7 和 8,可以看到 hash 值沒有發生衝突。

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

數據結構

所有的成員都是 final 的,其中 segmentMask 和 segmentShift 主要是爲了定位段,參見上面的 segmentFor 方法。

關於 Hash 表的基礎數據結構,這裏不想做過多的探討。Hash 表的一個很重要方面就是如何解決 hash 衝突,ConcurrentHashMap 和 HashMap 使用相同的方式,都是將 hash 值相同的節點放在一個 hash 鏈中。與 HashMap 不同的是,ConcurrentHashMap 使用多個子 Hash 表,也就是段 (Segment)。

每個 Segment 相當於一個子 Hash 表,它的數據成員如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
         /**
          * The number of elements in this segment's region.
          */
         transient volatileint count;
         /**
          * Number of updates that alter the size of the table. This is
          * used during bulk-read methods to make sure they see a
          * consistent snapshot: If modCounts change during a traversal
          * of segments computing size or checking containsValue, then
          * we might have an inconsistent view of state so (usually)
          * must retry.
          */
         transient int modCount;
         /**
          * The table is rehashed when its size exceeds this threshold.
          * (The value of this field is always <tt>(int)(capacity *
          * loadFactor)</tt>.)
          */
         transient int threshold;
         /**
          * The per-segment table.
          */
         transient volatile HashEntry<K,V>[] table;
         /**
          * The load factor for the hash table. Even though this value
          * is same for all segments, it is replicated to avoid needing
          * links to outer object.
          * @serial 
          */
         final float loadFactor;
 }

count 用來統計該段數據的個數,它是 volatile,它用來協調修改和讀取操作,以保證讀取操作能夠讀取到幾乎最新的修改。協調方式是這樣的,每次修改操作做了結構上的改變,如增加 / 刪除節點 (修改節點的值不算結構上的改變),都要寫 count 值,每次讀取操作開始都要讀取 count 的值。這利用了 Java 5 中對 volatile 語義的增強,對同一個 volatile 變量的寫和讀存在 happens-before 關係。modCount 統計段結構改變的次數,主要是爲了檢測對多個段進行遍歷過程中某個段是否發生改變,在講述跨段操作時會還會詳述。threashold 用來表示需要進行 rehash 的界限值。table 數組存儲段中節點,每個數組元素是個 hash 鏈,用 HashEntry 表示。table 也是 volatile,這使得能夠讀取到最新的 table 值而不需要同步。loadFactor 表示負載因子。

刪除操作 remove(key)

public V remove(Object key) {
   hash = hash(key.hashCode());
   return segmentFor(hash).remove(key, hash, null);
}

整個操作是先定位到段,然後委託給段的 remove 操作。當多個刪除操作併發進行時,只要它們所在的段不相同,它們就可以同時進行。

下面是 Segment 的 remove 方法實現:

V remove(Object key, int hash, Object value) {
     lock();
     try {
         int c = count - 1;
         HashEntry<K,V>[] tab = table;
         int index = hash & (tab.length - 1);
         HashEntry<K,V> first = tab[index];
         HashEntry<K,V> e = first;
         while (e != null && (e.hash != hash || !key.equals(e.key)))
             e = e.next;
         V oldValue = null;
         if (e != null) {
             V v = e.value;
             if (value == null || value.equals(v)) {
                 oldValue = v;

                 // All entries following removed node can stay
                 // in list, but all preceding ones need to be
                 // cloned.
                 ++modCount;
                 HashEntry<K,V> newFirst = e.next;
                 *for (HashEntry<K,V> p = first; p != e; p = p.next)
                     *newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                   newFirst, p.value);
                 tab[index] = newFirst;
                 count = c; // write-volatile
             }
         }
         return oldValue;
     } finally {
         unlock();
     }
 }

整個操作是在持有段鎖的情況下執行的,空白行之前的行主要是定位到要刪除的節點 e。接下來,如果不存在這個節點就直接返回 null,否則就要將 e 前面的結點複製一遍,尾結點指向 e 的下一個結點。e 後面的結點不需要複製,它們可以重用。

中間那個 for 循環是做什麼用的呢?(* 號標記)從代碼來看,就是將定位之後的所有 entry 克隆並拼回前面去,但有必要嗎?每次刪除一個元素就要將那之前的元素克隆一遍?這點其實是由 entry 的不變性來決定的,仔細觀察 entry 定義,發現除了 value,其他所有屬性都是用 final 來修飾的,這意味着在第一次設置了 next 域之後便不能再改變它,取而代之的是將它之前的節點全都克隆一次。至於 entry 爲什麼要設置爲不變性,這跟不變性的訪問不需要同步從而節省時間有關

下面是個示意圖

刪除元素之前:

刪除元素 3 之後:

第二個圖其實有點問題,複製的結點中應該是值爲 2 的結點在前面,值爲 1 的結點在後面,也就是剛好和原來結點順序相反,還好這不影響我們的討論。

整個 remove 實現並不複雜,但是需要注意如下幾點。第一,當要刪除的結點存在時,刪除的最後一步操作要將 count 的值減一。這必須是最後一步操作,否則讀取操作可能看不到之前對段所做的結構性修改。第二,remove 執行的開始就將 table 賦給一個局部變量 tab,這是因爲 table 是 volatile 變量,讀寫 volatile 變量的開銷很大。編譯器也不能對 volatile 變量的讀寫做任何優化,直接多次訪問非 volatile 實例變量沒有多大影響,編譯器會做相應優化。

get 操作

ConcurrentHashMap 的 get 操作是直接委託給 Segment 的 get 方法,直接看 Segment 的 get 方法:

V get(Object key, int hash) {
     if (count != 0) { // read-volatile 當前桶的數據個數是否爲0
         HashEntry<K,V> e = getFirst(hash); 得到頭節點
         while (e != null) {
             if (e.hash == hash && key.equals(e.key)) {
                 V v = e.value;
                 if (v != null)
                     return v;
                 return readValueUnderLock(e); // recheck
             }
             e = e.next;
         }
     }
     returnnull;
 }

get 操作不需要鎖。

除非讀到的值是空的纔會加鎖重讀,我們知道 HashTable 容器的 get 方法是需要加鎖的,那麼 ConcurrentHashMap 的 get 操作是如何做到不加鎖的呢?原因是它的 get 方法裏將要使用的共享變量都定義成 volatile

第一步是訪問 count 變量,這是一個 volatile 變量,由於所有的修改操作在進行結構修改時都會在最後一步寫 count 變量,通過這種機制保證 get 操作能夠得到幾乎最新的結構更新。對於非結構更新,也就是結點值的改變,由於 HashEntry 的 value 變量是 volatile 的,也能保證讀取到最新的值。

接下來就是根據 hash 和 key 對 hash 鏈進行遍歷找到要獲取的結點,如果沒有找到,直接訪回 null。對 hash 鏈進行遍歷不需要加鎖的原因在於鏈指針 next 是 final 的。但是頭指針卻不是 final 的,這是通過 getFirst(hash) 方法返回,也就是存在 table 數組中的值。這使得 getFirst(hash) 可能返回過時的頭結點,例如,當執行 get 方法時,剛執行完 getFirst(hash) 之後,另一個線程執行了刪除操作並更新頭結點,這就導致 get 方法中返回的頭結點不是最新的。這是可以允許,通過對 count 變量的協調機制,get 能讀取到幾乎最新的數據,雖然可能不是最新的。要得到最新的數據,只有採用完全的同步。

最後,如果找到了所求的結點,判斷它的值如果非空就直接返回,否則在有鎖的狀態下再讀一次。這似乎有些費解,理論上結點的值不可能爲空,這是因爲 put 的時候就進行了判斷,如果爲空就要拋 NullPointerException。空值的唯一源頭就是 HashEntry 中的默認值,因爲 HashEntry 中的 value 不是 final 的,非同步讀取有可能讀取到空值。仔細看下 put 操作的語句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在這條語句中,HashEntry 構造函數中對 value 的賦值以及對 tab[index] 的賦值可能被重新排序,這就可能導致結點的值爲空。這裏當 v 爲空時,可能是一個線程正在改變節點,而之前的 get 操作都未進行鎖定,根據 bernstein 條件,讀後寫或寫後讀都會引起數據的不一致,所以這裏要對這個 e 重新上鎖再讀一遍,以保證得到的是正確值。

V readValueUnderLock(HashEntry<K,V> e) {
     lock();
     try {
         return e.value;
     } finally {
         unlock();
     }
 }

如用於統計當前 Segement 大小的 count 字段和用於存儲值的 HashEntry 的 value。定義成 volatile 的變量,能夠在線程之間保持可見性,能夠被多線程同時讀,並且保證不會讀到過期的值,但是隻能被單線程寫(有一種情況可以被多線程寫,就是寫入的值不依賴於原值),在 get 操作裏只需要讀不需要寫共享變量 count 和 value,所以可以不用加鎖。之所以不會讀到過期的值,是根據 java 內存模型的 happen before 原則,對 volatile 字段的寫入操作先於讀操作,即使兩個線程同時修改和獲取 volatile 變量,get 操作也能拿到最新的值,這是用 volatile 替換鎖的經典應用場景

put 操作

同樣地 put 操作也是委託給段的 put 方法。下面是段的 put 方法:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
     lock();
     try {
         int c = count;
         if (c++ > threshold) // ensure capacity
             rehash();
         HashEntry<K,V>[] tab = table;
         int index = hash & (tab.length - 1);
         HashEntry<K,V> first = tab[index];
         HashEntry<K,V> e = first;
         while (e != null && (e.hash != hash || !key.equals(e.key)))
             e = e.next;
         V oldValue;
         if (e != null) {
             oldValue = e.value;
             if (!onlyIfAbsent)
                 e.value = value;
         }
         else {
             oldValue = null;
             ++modCount;
             tab[index] = new HashEntry<K,V>(key, hash, first, value);
             count = c; // write-volatile
         }
         return oldValue;
     } finally {
         unlock();
     }
 }

該方法也是在持有段鎖 (鎖定整個 segment) 的情況下執行的,這當然是爲了併發的安全,修改數據是不能併發進行的,必須得有個判斷是否超限的語句以確保容量不足時能夠 rehash。接着是找是否存在同樣一個 key 的結點,如果存在就直接替換這個結點的值。否則創建一個新的結點並添加到 hash 鏈的頭部,這時一定要修改 modCount 和 count 的值,同樣修改 count 的值一定要放在最後一步。put 方法調用了 rehash 方法,reash 方法實現得也很精巧,主要利用了 table 的大小爲 2^n,這裏就不介紹了。而比較難懂的是這句 int index = hash & (tab.length - 1),原來 segment 裏面纔是真正的 hashtable,即每個 segment 是一個傳統意義上的 hashtable, 如上圖,從兩者的結構就可以看出區別,這裏就是找出需要的 entry 在 table 的哪一個位置,之後得到的 entry 就是這個鏈的第一個節點,如果 e!=null,說明找到了,這是就要替換節點的值(onlyIfAbsent == false),否則,我們需要 new 一個 entry,它的後繼是 first,而讓 tab[index]指向它,什麼意思呢?實際上就是將這個新 entry 插入到鏈頭,剩下的就非常容易理解了

由於 put 方法裏需要對共享變量進行寫入操作,所以爲了線程安全,在操作共享變量時必須得加鎖。Put 方法首先定位到 Segment,然後在 Segment 裏進行插入操作。插入操作需要經歷兩個步驟,第一步判斷是否需要對 Segment 裏的 HashEntry 數組進行擴容,第二步定位添加元素的位置然後放在 HashEntry 數組裏。

另一個操作是 containsKey,這個實現就要簡單得多了,因爲它不需要讀取值:

boolean containsKey(Object key, int hash) {
     if (count != 0) { // read-volatile
         HashEntry<K,V> e = getFirst(hash);
         while (e != null) {
             if (e.hash == hash && key.equals(e.key))
                 returntrue;
             e = e.next;
         }
     }
     returnfalse;
 }

size() 操作

如果我們要統計整個 ConcurrentHashMap 裏元素的大小,就必須統計所有 Segment 裏元素的大小後求和。Segment 裏的全局變量 count 是一個 volatile 變量,那麼在多線程場景下,我們是不是直接把所有 Segment 的 count 相加就可以得到整個 ConcurrentHashMap 大小了呢?不是的,雖然相加時可以獲取每個 Segment 的 count 的最新值,但是拿到之後可能累加前使用的 count 發生了變化,那麼統計結果就不準了。所以最安全的做法,是在統計 size 的時候把所有 Segment 的 put,remove 和 clean 方法全部鎖住,但是這種做法顯然非常低效。

因爲在累加 count 操作過程中,之前累加過的 count 發生變化的幾率非常小,所以 ConcurrentHashMap 的做法是先嚐試 2 次通過不鎖住 Segment 的方式來統計各個 Segment 大小,如果統計的過程中,容器的 count 發生了變化,則再採用加鎖的方式來統計所有 Segment 的大小。

那麼 ConcurrentHashMap 是如何判斷在統計的時候容器是否發生了變化呢?使用 modCount 變量,在 put , remove 和 clean 方法裏操作元素前都會將變量 modCount 進行加 1,那麼在統計 size 前後比較 modCount 是否發生變化,從而得知容器的大小是否發生變化。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/tM1ZOEYZKefKom-mDcSDbA