Java高性能編程之CAS與ABA及解決方法

2019-09-18     IT技術分享

CAS概念

CAS,全稱Compare And Swap,比較與交換。

屬於硬體級別的同步原語,從處理器層面提供了內存操作的原子性。

從概念上,我們可以得出三點。第一,CAS的運作方式(通過比較與交換實現)。第二,硬體層面支持,性能肯定不低(當然它也不是銀彈)。第三,提供原子性,那麼它的功能肯定是確保原子性,從而確保線程安全。

實際使用中,CAS操作需要輸入兩個數值,一個舊值A(期望操作前的值)和一個新值B,在操作期間先將舊值A與實際內存中的值進行比較,如果沒有發生變化,才將實際內存中的值交換成新值B ,如果發生了變化則不交換。

CAS應用場景

既然CAS的功能是提供原子性,那麼從這個角度出發思考,如計數器,帳戶轉帳等。

那麼提到計數器,就不得不提到JUC包下的atomic包了。其中提供了大量原子操作了,如Integer類型的值變化,Long類型的值變化,Boolean類型的值變化。

說到這裡,某些人就要一句「球多麻袋」,Integer類型的值變化,不就一句代碼嘛(如i = i + 1;),不就是原子操作嘛。即使有賦值操作,也可以寫成(i++;),這樣不就一個操作了嘛。當然學習過彙編或對計算機指令有一定了解的朋友可能就知道原因了。很多時候,我們在程序中的一段代碼,編譯到底層執行時,往往是多個語句(誰讓CPU只能執行非常簡單的操作呢)。如i++操作編譯成Java指令後是以下四句:

  1. getfield
  2. iconst_1
  3. iadd
  4. putfield

具體的意思,我就不解讀了(不是今天的重點),感興趣的,可以百度或者@我。

話題回到JUC包下的atomic包,我之所以提它,就是因為其原子性的實現就是依靠CAS實現的。

AtomicInteger類:

/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}

Unsafe類:

public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}
public final native boolean compareAndSwapInt(Object var1,
long var2, int var4, int var5);

通過上述三段源碼,可以清楚看出,AtomicInteger中getAndSet這一原子方法是通過Unsafe中的原生方法compareAndSwapInt方法完成CAS機制,從而確保操作的原子性。

CAS還涉及到Java中鎖的實現,這個也留到鎖專題再細說,畢竟這次的主題是CAS,ABA及解決之道。

Why need CAS

那麼為什麼需要CAS呢,畢竟Java已經有了多種手段來保證線程安全的原子性問題,最廣為人知的除了Atomic包(底層是CAS),就是synchronized鎖了。

原因很簡單,因為synchronized鎖什麼的太重了。這裡所說的重,是指其消耗的系統資源較多(所以又稱為重量級鎖)。所以有著底層硬體支持的CAS才會那麼受歡迎。當然CAS也有著自己的問題,這個後面會談到。

CAS應用:

說得再多,不如來點實際代碼,看看具體效果。

以下代碼,包含四個類:一個主類,用於調用實現類,展示效果(注釋中有執行結果)。三個實現類,分別展示了沒有處理,使用Atomic包,使用CAS三種方式來多線陳增加全局計數器的效果。

AtomicityWithNoDeal

未做任何處理,通過100個子線程分別執行10000次計數器+1操作。

package tech.jarry.learning.netease;

/**
* @Description:
* @Author: jarry
*/
public class AtomicityWithNoDeal {

private volatile int i = 0;

private void add(){
i++;
}

public void run() throws InterruptedException {
for (int j = 0; j < 100; j++){
new Thread(new Runnable() {
@Override
public void run() {
for (int m = 0; m< 10000; m++){
add();
}
System.out.println(Thread.currentThread().getName()+" has run finished !");
}
}).start();
}

Thread.sleep(2000);
System.out.println("i: "+i);
}
}

AtomicityWithAtomic

進行Atomic包處理,通過100個子線程分別執行10000次計數器+1操作。

package tech.jarry.learning.netease;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @Description:
* @Author: jarry
*/
public class AtomicityWithAtomic {

private AtomicInteger atomicInteger = new AtomicInteger(0);

private void add(){
atomicInteger.incrementAndGet();
}

public void run() throws InterruptedException {

for (int j = 0; j < 100; j++){
new Thread(new Runnable() {
@Override
public void run() {
for (int m = 0; m< 10000; m++){
add();
}
System.out.println(Thread.currentThread().getName()+" has run finished !");
}
}).start();
}

Thread.sleep(2000);
System.out.println("atomicInteger: "+atomicInteger.get());
}
}

AtomicityWithCAS

進行手寫的CAS處理,通過100個子線程分別執行10000次計數器+1操作。

package tech.jarry.learning.netease;

import sun.misc.Unsafe;

import java.lang.reflect.Field;

/**
* @Description:
* @Author: jarry
*/
public class AtomicityWithCAS {

// 建立全局計數器,用於觀察CAS原子性特點
volatile int k = 0;
// 定義Unsafe引用對象
private static Unsafe unsafe = null;
// 定義k的內存偏移量(可以理解為k在內存中地址,當然實際與C指針的內存地址是完全不同的)
private static long valueOffset;

static {
try {
// 利用反射獲取Unsafe實例對象(正常途徑是無法獲取的)
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
// 由於unsafe是靜態對象,所以傳入null。想想也對,畢竟不同的實例對象的非靜態對象當然是不同的,當然需要傳入實例對象作為參數嘍。
// 另外吐槽一句,我查看這段資料的時候,發現百度第一頁的各個博客,幾乎都是一樣的示例代碼。。。
unsafe = (Unsafe)field.get(null);

// 獲取當前對象中全局計數器k的內存地址偏移
Field kField = AtomicityWithCAS.class.getDeclaredField("k");
kField.setAccessible(true);
valueOffset = unsafe.objectFieldOffset(kField);

} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}

/**
* 執行全局計數器k+1的方法
*/
private void add(){
// 當CAS執行失敗時,需要重新執行相關操作,直到執行成功。故CAS是一個自旋鎖。
while(true) {
// 獲取CAS操作所需的舊值
int current = unsafe.getIntVolatile(this,valueOffset);
// 進行CAS操作
if (unsafe.compareAndSwapInt(this,valueOffset,current,current+1)){
// 執行成功,就跳出循環
break;
}
}
}

/**
* 為了體現效果,這裡開啟了100個線程循環執行add()操作
* @throws InterruptedException
*/
public void run() throws InterruptedException {
for (int j = 0; j < 100; j++){
new Thread(new Runnable() {
@Override
public void run() {
// 每個線程執行10000次add()操作
for (int m = 0; m< 10000; m++){
add();
}
System.out.println(Thread.currentThread().getName()+" has run finished !");
}
}).start();
}

// 當前線程休眠2s,確保所有子線程執行完畢
Thread.sleep(2000);
System.out.println("k: "+k);
}
}

Main主函數

主線程調用同一包下的AtomicityWithNoDeal,AtomicityWithAtomic,AtomicityWithCAS三個類,觀察運行效果。

package tech.jarry.learning.netease;

public class Main {

public static void main(String[] args) throws InterruptedException {

// (new AtomicityWithNoDeal()).run();
/**
* 運行結果:
* Thread-1 has run finished !
* 。。。。。。(略98個線程)
* Thread-83 has run finished !
* i: 440239
*/

// (new AtomicityWithAtomic()).run();
/**
* 運行結果:
* Thread-0 has run finished !
* 。。。。。。(略98個線程)
* Thread-69 has run finished !
* atomicInteger: 1000000
*/

// (new AtomicityWithCAS()).run();
/**
* 運行結果:
* Thread-1 has run finished !
* 。。。。。。(略98個線程)
* Thread-80 has run finished !
* k: 1000000
*/

}
}

注釋

其實上述代碼中,重要的地方,我都寫上了相關的注釋。如果還有什麼不清楚的地方,可以@我。

CAS缺點

CAS當然是有缺點的,否則就沒Synchronized什麼事情了。

  1. 從概念及代碼示例中可以看出,當CAS操作執行失敗時,會繼續進入下一個循環執行,直到CAS操作執行成功,這種行為稱為自旋。自旋的實現讓所有線程都處於高頻運行,爭搶CPU執行時間的狀態。如果操作長時間不成功,會帶來很大的CPU資源消耗(所以Java有鎖的粗化/升級)。
  2. CAS僅能針對單個變量進行操作,不能用於多個變量來實現原子操作。
  3. ABA問題。

正如,我之前所提到的,看待技術問題要找到其特性的最初來源。如第二點中CAS之所以不能支持多個變量的原子操作,是因為CAS操作的原子性來源於硬體的支撐,而硬體只支持單個變量的原子操作,故CAS只能針對單個變量的原子操作進行操作。而有些文章或代碼中提到通過CAS執行多個變量的原子操作,其實本質並不是針對多個變量,而是針對這些變量的集合或者總的對象的Reference操作的 。這有點抽象,舉個栗子。我將通過CAS操作轉變了某個數組的引用變量的指向,看起來我實現了整個數組內多個元素轉變的原子操作。但實際是我通過改變當前引用變量的指向實現的,CAS的原子操作針對的是這個指向Reference。具體代碼可以參照Atomic包中的AtomicIntegerArray與AtomicReference等實現。

至於第一點,細究起來有非常多的內容,如鎖的粗化,自旋是否可以優化等。其實CAS的自旋操作實際是確保了一定有CAS操作在執行,但這是通過犧牲CPU實現的。舉個栗子,為了能夠監聽硬體串口返回的消息,我通過while(true)來不斷獲取串口發送過來的數據,直到我獲得了一個完整數據包。

話頭收回來,讓我們談談第三點-ABA問題。

ABA概念

ABA問題,說白了就是鑽了CAS機制的空子。

為了更好地說明這個問題,我們設定兩個線程,同時對變量i進行操作。

正常場景:

初始i=0;

線程-1(打算對i進行CAS操作)

線程-1:獲取i的舊值-0;

線程-1:設定i的新值-2;

線程-1:對i進行CAS操作,舊值i=0符合實際內存中i現有的值,執行swap操作,i=2;

看似正常的場景:

線程-1(打算對i進行CAS操作)

線程-1:獲取i的舊值-0;

線程-2:對i進行了CAS操作,將i改為10;

線程-1:設定i的新值-2;

線程-2:對i進行了CAS操作,將i重新改為0;

線程-1:對i進行CAS操作,舊值i=0符合實際內存中i現有的值,執行swap操作,i=2;

上述的兩個場景中線程-1都完成了想要完成的CAS操作,區別就是其中線程2曾經進行過一些操作。

當然這裡肯定有朋友要說,這對程序的結果沒有任何的影響。是的,在現有的例子中確實對程序的運行結果毫無影響。

這裡我舉出兩個大佬給出的非常經典的例子,分別是極簡與複雜的代表。

極簡:你從銀行取出一箱子錢,放在了車上。結果你一個轉頭,小偷將裝滿錢的箱子拿走,並在原來的位置放了一個看起來一模一樣,但裝滿廢紙的箱子。你並沒有發現這一切,拿著這個箱子開開心心地回家了。囧。

複雜:通過單向鍊表展現ABA的潛在威脅 。由於例子比較複雜,我就不在這裡贅述。感興趣的朋友,可以看看。

其實這兩個例子本質都是一樣的,想表達的就是我們CAS操作的不是簡簡單單的數值,更有著其背後的深層信息(然後通過內存,鍊表,引用來證明觀點)。

這裡我要開始表達我的觀點了:現有的大部分博客或者文章都解釋得或多或少有一定問題。。只有部分大佬的博客提到了核心,但是為了說了核心,又舉了很多例子(ABA問題的例子本來就不好舉,例子大多容易被誤解,後面會談到),導致核心論點被忘卻。然後又有很多人去借鑑,或者直接拿來這些例子,但是又不能很好地通過這些例子說明ABA,然後就通過自己的理解解釋了一番(更好理解,但是卻開始歪了),不斷有人進入這個圈子,然後解釋越來越歪。造成很多剛了解ABA的小白一臉茫然,看著那套看似正確的解釋,就入坑(雖然這個坑影響不那麼大,起碼對於絕大部分人員都沒太大影響。就像很多Java開發者不懂JMM,工作做得不也還可以嘛)了。。。

(這裡插句題外話,那就是有關博客抄襲複製的問題。其實我不反對技術的之間的借鑑,畢竟重複造輪子是不可取的,只有有效的思想碰撞才可以產生推動力嘛。但是通過爬蟲無腦爬取,或者直接複製粘貼全文,就真的有些過分了。之所以有這種感慨,是因為我現在有時候在百度查詢一些資料,十多篇博客看下來,居然大部分都是一樣的,太影響效率了)

當然,說話要講道理的,不能只做「批評家」。就上述兩個經典例子存在一個很大的問題,那就是即使脫離了CAS,上述兩個例子中存在的問題,還是存在 。另外一點佐證就是很多時候,我們需要解決的就是簡單的數值的CAS問題 ,這個數值不牽涉什麼複雜的依賴關係。關於這點佐證的最有力說明就是輕量級鎖的CAS為什麼不需要考慮ABA問題 (因為其根本就不涉及什麼複雜依賴)。

話說回來,其實上述兩個例子,以及我的兩點說明,其實更傾向於表現ABA,距離ABA問題的本質,還差了那麼一句畫龍點睛的總結。

總結:ABA問題的本質就是由於對多線程下CAS流程控制的缺乏,而導致的信息缺失。表現出來的就是由於缺乏必要信息(小偷對箱子進行了操作),而產生了隱患。

如果你還是有些無法理解這個結論,那你還記得程序的一個重要原則-程序置於控制下。如果你都無法控制你的程序的行為,那麼無疑,你的程序是有問題的。

ABA示例:

接下來通過銀行非法洗錢的例子,來簡單闡述由信息缺失,造成的問題。

ABATest

這是一個產生了ABA問題的示例。示例中銀行無法發現客戶帳戶上的非法洗錢行為。

package tech.jarry.learning.netease.casWithABA;

import sun.misc.Unsafe;
import tech.jarry.learning.netease.test.CounterUnsafe;

import java.lang.reflect.Field;

/**
* @Description:
* @Author: jarry
*/
public class ABATest {

volatile int k = 10;
private static Unsafe unsafe = null;
private static long valueOffset;

static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe)field.get(null);

Field iField = CounterUnsafe.class.getDeclaredField("i");
iField.setAccessible(true);
valueOffset = unsafe.objectFieldOffset(iField);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}

private void transferOld() throws InterruptedException {
System.out.println("開始轉帳(舊系統:存在ABA問題)");
while(true) {
int current = unsafe.getIntVolatile(this,valueOffset);
System.out.println("由於CPU搶占問題,轉帳程序阻塞100ms(為了將可能出現的ABA問題,變成肯定出現)");
Thread.sleep(100);
if (unsafe.compareAndSwapInt(this,valueOffset,current,current+1)){
System.out.println("銀行轉帳"+1+"元,成功。餘額:"+k);
break;
}
System.err.println("警告:帳戶存在交易記錄以外的資金流動");
}
}

private void cleanMoneySub() {
while(true) {
int current = unsafe.getIntVolatile(this,valueOffset);
if (unsafe.compareAndSwapInt(this,valueOffset,current,current-2)){
break;
}
}
System.out.println("非法組織洗錢,盜走2元,餘額:"+k);
}

private void cleanMoneyAdd(){
while(true) {
int current = unsafe.getIntVolatile(this,valueOffset);
if (unsafe.compareAndSwapInt(this,valueOffset,current,current+2)){
break;
}
}
System.out.println("非法組織洗錢,加入2元,餘額:"+k);
}

public void oldSystemTransfer() throws InterruptedException {
ABATest abaTest = new ABATest();
System.out.println("帳戶餘額:"+k);

new Thread(new Runnable() {
@Override
public void run() {
try {
abaTest.transferOld();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

Thread.sleep(20);
abaTest.cleanMoneyAdd();
Thread.sleep(20);
abaTest.cleanMoneySub();

Thread.sleep(200);
System.out.println("銀行卡原來餘額為10,接收轉帳1元,故期望餘額為11元。實際餘額:"+abaTest.k);
}

public static void main(String[] args) throws InterruptedException {
ABATest abaTest = new ABATest();
abaTest.oldSystemTransfer();
/**
* 運行結果:
* 帳戶餘額:10
* 開始轉帳(舊系統:存在ABA問題)
* 由於CPU搶占問題,轉帳程序阻塞100ms(為了將可能出現的ABA問題,變成肯定出現)
* 非法組織洗錢,加入2元,餘額:12
* 非法組織洗錢,盜走2元,餘額:10
* 銀行轉帳1元,成功。餘額:11
* 銀行卡原來餘額為10,接收轉帳1元,故期望餘額為11元。實際餘額:11
*/
}
}

ABAResolveTest

這是一個修復了ABA問題的示例。示例中銀行正常發現客戶帳戶上的非法洗錢行為。

package tech.jarry.learning.netease.casWithABA;

import java.util.concurrent.atomic.AtomicStampedReference;

/**
* @Description:
* @Author: jarry
*/
public class ABAResolveTest {

private AtomicStampedReference kReference = new AtomicStampedReference<>(10,0);

private void transferNew() throws InterruptedException {
System.out.println("開始轉帳(新系統:解決了ABA問題)");
while(true) {
Integer currentReference = kReference.getReference();
int stamp = kReference.getStamp();
System.out.println("由於CPU搶占問題,轉帳程序阻塞100ms");
Thread.sleep(100);
if (kReference.compareAndSet(currentReference,currentReference+1,stamp,stamp+1)){
System.out.println("銀行轉帳"+1+"元,成功。餘額:"+kReference.getReference());
break;
}
System.err.println("警告:帳戶存在交易記錄以外的資金流動");
}
}

private void cleanMoneySub(){
int stamp = kReference.getStamp();
kReference.set(kReference.getReference()+2,stamp+1);
System.out.println("非法組織洗錢,盜走2元,餘額:"+kReference.getStamp());
}

private void cleanMoneyAdd(){
int stamp = kReference.getStamp();
kReference.set(kReference.getReference()-2,stamp+1);
System.out.println("非法組織洗錢,加入2元,餘額:"+kReference.getStamp());
}

private void newSystemTransfer() throws InterruptedException {
ABAResolveTest abaResolveTest = new ABAResolveTest();
System.out.println("帳戶餘額:"+abaResolveTest.kReference.getReference());

new Thread(new Runnable() {
@Override
public void run() {
try {
abaResolveTest.transferNew();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

Thread.sleep(20);
abaResolveTest.cleanMoneyAdd();
Thread.sleep(20);
abaResolveTest.cleanMoneySub();

Thread.sleep(200);
System.out.println("銀行卡原來餘額為10,接收轉帳1元,故期望餘額為11元。實際餘額:"+abaResolveTest.kReference.getReference());
}

public static void main(String[] args) throws InterruptedException {
ABAResolveTest abaResolveTest = new ABAResolveTest();
abaResolveTest.newSystemTransfer();
/**
* 運行結果:
* 帳戶餘額:10
* 開始轉帳(新系統:解決了ABA問題)
* 由於CPU搶占問題,轉帳程序阻塞100ms
* 非法組織洗錢,加入2元,餘額:1
* 非法組織洗錢,盜走2元,餘額:2
* 警告:帳戶存在交易記錄以外的資金流動
* 由於CPU搶占問題,轉帳程序阻塞100ms
* 銀行轉帳1元,成功。餘額:11
* 銀行卡原來餘額為10,接收轉帳1元,故期望餘額為11元。實際餘額:11
*/
}
}

上述的兩個例子,也許不是最適合的,但確實闡述了我想要表達的想法。

話說回來,只有到自己寫demo時,才能理解大佬寫ABA的demo時內心的掙扎啊。囧

ABA問題的解決

ABA問題的解決,說白了就是通過引入版本號,從而解決ABA問題的造成的隱患。

用我的話說呢,就是通過引入版本號,了解到線程執行操作時,是否有別的線程做了類似ABA的事情,從而使得本線程的CAS操作重新執行。這裡為什麼重新執行,因為簡單啊。當然,也可以如我那樣打個輸出或者注釋什麼的(可能會浪費系統資源)。不管怎麼處理,起碼這次我知道有這麼個問題了。囧。

小結

至此,CAS機制,ABA問題及解決方案,都已經敘述完畢了。

核心總結:

ABA問題的本質就是由於對多線程下CAS流程控制的缺乏,而導致的信息缺失。表現出來的就是由於缺乏必要信息,而產生了隱患

該說的差不多都說了,簡單回顧一下:

  • 凡事都有其利弊,往往弊端就是由於其優點帶來的。如CAS的硬體支持。
  • 技術的學習,需要追尋技術特性的真正來源,才可以一步步走向架構師。
  • 學習,一方面需要尋求多方資料,另一方面也需要自己的理解與驗證。
  • 遇到無法理解或者無法解讀的事物時,就去尋找它的定義,它的原則。
文章來源: https://twgreatdaily.com/zh-sg/-NHwSG0BJleJMoPMKJcy.html