原文出處http://cmsblogs.com/
作者:chenssy
要實現一個線程安全的隊列有兩種方式:阻塞和非阻塞。阻塞隊列無非就是鎖的應用,而非阻塞則是CAS算法的應用。下面我們就開始一個非阻塞算法的研究:CoucurrentLinkedQueue。 ConcurrentLinkedQueue是一個基於連結節點的無邊界的線程安全隊列,它採用FIFO原則對元素進行排序。採用「wait-free」算法(即CAS算法)來實現的。 CoucurrentLinkedQueue規定了如下幾個不變性:
- 在入隊的最後一個元素的next為null
- 隊列中所有未刪除的節點的item都不能為null且都能從head節點遍歷到
- 對於要刪除的節點,不是直接將其設置為null,而是先將其item域設置為null(疊代器會跳過item為null的節點)
- 允許head和tail更新滯後。這是什麼意思呢?意思就說是head、tail不總是指向第一個元素和最後一個元素(後面闡述)。
head的不變性和可變性:
- 不變性
- 所有未刪除的節點都可以通過head節點遍歷到
- head不能為null
- head節點的next不能指向自身
- 可變性
- head的item可能為null,也可能不為null
- 允許tail滯後head,也就是說調用succc()方法,從head不可達tail
tail的不變性和可變性
- 不變性
- tail不能為null
- 可變性
- tail的item可能為null,也可能不為null
- tail節點的next域可以指向自身
- 允許tail滯後head,也就是說調用succc()方法,從head不可達tail
這些特性是否已經暈了?沒關係,我們看下面的源碼分析就可以理解這些特性了。
ConcurrentLinkedQueue源碼分析
CoucurrentLinkedQueue的結構由head節點和tail節點組成,每個節點由節點元素item和指向下一個節點的next引用組成,而節點與節點之間的關係就是通過該next關聯起來的,從而組成一張鍊表的隊列。節點Node為ConcurrentLinkedQueue的內部類,定義如下:
入列
入列,我們認為是一個非常簡單的過程:tail節點的next執行新節點,然後更新tail為新節點即可。從單線程角度我們這麼理解應該是沒有問題的,但是多線程呢?如果一個線程正在進行插入動作,那麼它必須先獲取尾節點,然後設置尾節點的下一個節點為當前節點,但是如果已經有一個線程剛剛好完成了插入,那麼尾節點是不是發生了變化?對於這種情況ConcurrentLinkedQueue怎麼處理呢?我們先看源碼: offer(E e):將指定元素插入都隊列尾部:
光看源碼還是有點兒迷糊的,插入節點一次分析就會明朗很多。 初始化 ConcurrentLinkedQueue初始化時head、tail存儲的元素都為null,且head等於tail:
添加元素A 按照程序分析:第一次插入元素A,head = tail = dummyNode,所有q = p.next = null,直接走步驟2:p.casNext(null, newNode),由於 p == t成立,所以不會執行步驟3:casTail(t, newNode),直接return。插入A節點後如下:
添加元素B q = p.next = A ,p = tail = dummyNode,所以直接跳到步驟7:p = (p != t && t != (t = tail)) ? t : q;。此時p = q,然後進行第二次循環 q = p.next = null,步驟2:p == null成立,將該節點插入,因為p = q,t = tail,所以步驟3:p != t 成立,執行步驟4:casTail(t, newNode),然後return。如下:
添加節點C 此時t = tail ,p = t,q = p.next = null,和插入元素A無異,如下:
這裡整個offer()過程已經分析完成了,可能p == q 有點兒難理解,p 不是等於q.next麼,怎麼會有p == q呢?這個疑問我們在出列poll()中分析
出列
ConcurrentLinkedQueue提供了poll()方法進行出列操作。入列主要是涉及到tail,出列則涉及到head。我們先看源碼:
這個相對於offer()方法而言會簡單些,裡面有一個很重要的方法:updateHead(),該方法用於CAS更新head節點,如下:
final void updateHead(Nodeh, Node p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
我們先將上面offer()的鍊表poll()掉,添加A、B、C節點結構如下:
poll A head = dumy,p = head, item = p.item = null,步驟1不成立,步驟4:(q = p.next) == null不成立,p.next = A,跳到步驟6,下一個循環,此時p = A,所以步驟1 item != null,進行p.casItem(item, null)成功,此時p == A != h,所以執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),q = p.next = B != null,則將head CAS更新成B,如下:
poll B head = B , p = head = B,item = p.item = B,步驟成立,步驟2:p != h 不成立,直接return,如下:
poll C head = dumy ,p = head = dumy,tiem = p.item = null,步驟1不成立,跳到步驟4:(q = p.next) == null,不成立,然後跳到步驟6,此時,p = q = C,item = C(item),步驟1成立,所以講C(item)設置為null,步驟2:p != h成立,執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),如下:
看到這裡是不是一目了然了,在這裡我們再來分析offer()的步驟5:
else if(p == q){
p = (t != (t = tail))? t : head;
}
ConcurrentLinkedQueue中規定,p == q表明,該節點已經被刪除了,也就說tail滯後於head,head無法通過succ()方法遍歷到tail,怎麼做? (t != (t = tail))? t : head;(這段代碼的可讀性實在是太差了,真他媽難理解:不知道是否可以理解為t != tail ? tail : head)這段代碼主要是來判讀tail節點是否已經發生了改變,如果發生了改變,則說明tail已經重新定位了,只需要重新找到tail即可,否則就只能指向head了。 就上面那個我們再次插入一個元素D。則p = head,q = p.next = null,執行步驟1: q = null且 p != t ,所以執行步驟4:,如下:
再插入元素E,q = p.next = null,p == t,所以插入E後如下:
到這裡ConcurrentLinkedQueue的整個入列、出列都已經分析完畢了,對於ConcurrentLinkedQueue LZ真心感覺難看懂,看懂之後也感嘆設計得太精妙了,利用CAS來完成數據操作,