Java 併發隊列原理剖析

來源:https://github.com/afkbrb/java-concurrency-note

LinkedBlockingQueue 和 ArrayBlockingQueue 比較簡單,不進行講解了。下面只介紹 PriorityBlockingQueue 和 DelayQueue。

PriorityBlockingQueue

PriorityBlockingQueue 是帶優先級的無界阻塞隊列,每次出隊都返回優先級最高或最低的元素。內部使用二叉堆實現。

類圖結構

PriorityBlockingQueue 內部有一個數組 queue,用來存放隊列元素。allocationSpinLock 是個自旋鎖,通過 CAS 操作來保證同時只有一個線程可以擴容隊列,狀態爲 0 或 1。

由於這是一個優先隊列,所以有一個 comparator 用來比較元素大小。

下面爲構造函數:

private static final int DEFAULT_INITIAL_CAPACITY = 11;

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

可知默認隊列容量爲 11,默認比較器爲 null,也就是使用元素的 compareTo 方法進行比較來確定元素的優先級,這意味着隊列元素必須實現 Comparable 接口。

原理講解

boolean offer()
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    // 獲取獨佔鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 擴容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 通過對二叉堆的上浮操作保證最大或最小的元素總在根節點
            siftUpComparable(n, e, array);
        else
            // 使用了自定義比較器
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        // 激活因調用take()方法被阻塞的線程
        notEmpty.signal();
    } finally {
        // 釋放鎖
        lock.unlock();
    }
    return true;
}

流程比較簡單,下面主要看擴容和建堆操作。

先看擴容。

private void tryGrow(Object[] array, int oldCap) {
    // 由前面的代碼可知,調用tryGrow函數前先獲取了獨佔鎖,
    // 由於擴容比較費時,此處先釋放鎖,
    // 讓其他線程可以繼續操作(如果滿足可操作的條件的話),
    // 以提升併發性能
    lock.unlock();
    Object[] newArray = null;
    // 通過allocationSpinLock保證同時最多隻有一個線程進行擴容操作。
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
        try {
            // 當容量比較小時,一次只增加2容量
            // 比較大時增加一倍
            int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));
            // 溢出檢測
            if (newCap - MAX_ARRAY_SIZE > 0) {
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 釋放鎖,沒用CAS是因爲同時最多有一個線程操作allocationSpinLock
            allocationSpinLock = 0;
        }
    }
    // 如果當前線程發現有其他線程正在對隊列進行擴容,
    // 則調用yield方法嘗試讓出CPU資源促使擴容操作儘快完成
    if (newArray == null)
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

下面來看建堆算法

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 獲取父節點,設子節點索引爲k,
        // 則由二叉堆的性質可知,父節點的索引總爲(k - 1) >>> 1
        int parent = (k - 1) >>> 1;
        // 獲取父節點對應的值
        Object e = array[parent];
        // 只有子節點的值小於父節點的值時才上浮
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

如果瞭解二叉堆的話,此處代碼是十分容易理解的。關於二叉堆,可參看《數據結構之二叉堆》。

E poll()
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 出隊
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        // 獲取尾節點,在實現對二叉堆的下沉操作時要用到
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 下沉操作,保證取走最小的節點(根節點)後,新的根節點仍時最小的,二叉堆的性質依然滿足
            siftDownComparable(0, x, array, n);
        else
            // 使用自定義比較器
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

poll 方法通過調用 dequeue 方法使最大或最小的節點出隊並將其返回。

下面來看二叉堆的下沉操作。

private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;
        while (k < half) {
            // child爲兩個子節點(如果有的話)中較小的那個對應的索引
            int child = (<< 1) + 1;
            Object c = array[child];
            int right = child + 1;
            // 通過比較保證child對應的爲較小值的索引
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            // 下沉,將較小的子節點換到父節點位置
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

同上,對下沉操作有疑問的話可參考上述文章。

void put(E e)

調用了 offer

public void put(E e){
    offer(e);
}
E take()

take 操作的作用是獲取二叉堆的根節點元素,如果隊列爲空則阻塞。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 阻塞可被中斷
    lock.lockInterruptibly();
    E result;
    try {
        // 隊列爲空就將當前線程放入notEmpty條件隊列
        // 使用while循環判斷是爲了避免虛假喚醒
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

DelayQueue

DelayQueue 併發隊列是一個無界阻塞延遲隊列,隊列中的每一個元素都有一個過期時間,當從隊列中獲取元素時只有過期元素纔會出列。隊列頭元素是最快要過期的元素。

類圖結構

DelayQueue 內部使用 PriorityQueue 存放數據,使用 ReentrantLock 實現線程同步。
隊列裏的元素要實現 Delayed 接口(Delayed 接口繼承了 Comparable 接口),用以得到過期時間並進行過期時間的比較。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

available 是由 lock 生成的條件變量,用以實現線程間的同步。

leader 是 leader-follower 模式的變體,用於減少不必要的線程等待。當一個線程調用隊列的 take 方法變爲 leader 線程後,它會調用條件變量 available.waitNanos(delay) 等待 delay 時間,但是其他線程(follower)則會調用 available.await() 進行無限等待。leader 線程延遲時間過期後,會退出 take 方法,並通過調用 available.signal() 方法喚醒一個 follower 線程,被喚醒的線程會被選舉爲新的 leader 線程。

原理講解

boolean offer(E e)
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 添加新元素
        q.offer(e);
        // 查看新添加的元素是否爲最先過期的
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

上述代碼首先獲取獨佔鎖,然後添加元素到優先級隊列,由於 q 是優先級隊列,所以添加元素後,調用 q.peek() 方法返回的並不一定是當前添加的元素。當如果 q.peek() == e,說明當前元素是最先要過期的,那麼重置 leader 線程爲 null 並激活 available 條件隊列裏的一個線程,告訴它隊列裏面有元素了。

E take()

獲取並移除隊列裏面過期的元素,如果隊列裏面沒有過期元素則等待。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 可中斷
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 爲空則等待
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                // 過期則成功獲取
                if (delay <= 0)
                    return q.poll();
                // 執行到此處,說明頭元素未過期    
                first = null; // don't retain ref while waiting
                // follower無限等待,直到被喚醒
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // leader等待lelay時間,則頭元素必定已經過期
                        available.awaitNanos(delay);
                    } finally {
                        // 重置leader,給follower稱爲leader的機會
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 喚醒一個follower線程
            available.signal();
        lock.unlock();
    }
}

一個線程調用 take 方法時,會首先查看頭元素是否爲空,爲空則直接等待,否則判斷是否過期。
若頭元素已經過期,則直接通過 poll 獲取並移除,否則判斷是否有 leader 線程。
若有 leader 線程則一直等待,否則自己成爲 leader 並等待頭元素過期。

E poll()

獲取並移除頭過期元素,如果沒有過期元素則返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        // 若隊列爲空或沒有元素過期則直接返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}
int size()

計算隊列元素個數,包含過期的和未過期的。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.size();
    } finally {
        lock.unlock();
    }
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/K8tkmG-p_auys3T4CH2m9Q