Java語言開發人員在開發並發的多線程應用時,可能會覺得編寫在多線程環境中執行良好的健壯代碼並不總是那麼簡單。
業界流傳著一句話說:「初級程式設計師認為並發性很難,有經驗的程式設計師認為並發很容易,高級程式設計師認為並發性很難。」
什麼意思呢?開始理解和學習並發多線編程感覺很難理解和掌握,但是當學習一段時間後發現並發多線程編程就是藉助部分接口和類實現,基本固定模式的去編寫,所以覺得很簡單就實現了。但是隨著對並發編程和多線程的理解,那些有經驗的高級程式設計師卻發現要寫出高效安全的並發多線程應用程式本身就不是一件容易的事情。
我們知道Java在開發多線程應用程式時,創建線程最基礎的方式是實現Runnable接口,然後將實現類傳遞給Thread構造函數,或者通過擴展Thread類重寫其run()方法來實現新的Thread擴展類,直接實例化該類實例,啟動線程。
它涉及到安全的數據結構和類的定義,以及通過各種安全同步處理的操作方法,因此Java的語言庫為我們提供了一些經過測試和考驗的線程安全的固定類庫,來簡化並發多線程應用的開發。它包括一些可以直接使用的數據結構和功能,它們都包含在我們熟悉的java.util.concurrent Jar包里。
對於這個Jar包的內容,我們理解它的入口點是接口Executor。它定義了一個唯一的方法execute()接收一個Runnable接口類型的實現對象。
public interface Executor {
\tvoid execute(Runnable command);
}
該接口是對一個指定Runnable實例的執行的封裝。它表示在將來的某個時間點執行指定的Runnable實現的指令。
我們可以簡單實現一下該Executor接口:
如此我們就有了一個個在未來某個時間點上執行指令的類,實例化它就可以調用其execute()方法執行某條指令實現。
由於我們的並發多線程編程過程中不可能只使用一到兩個線程,我們可以手動去實例化定義以及管理它們,而當使用多個線程時,多次創建線程也是一個耗時的過程,為此JDK提供了一個ThreadPoolExecutor,它是一個線程池執行成熟的、可擴展的實現。
在底層,ThreadPoolExecutor維護一個線程池,並將為給定execute()方法的Runnable實例分派給池中線程實例執行。
傳遞給構造函數的參數控制線程池的行為。參數最多的構造函數如下:
下面我們來看一個簡單的使用示例:
這裡我們的run()實現只休眠5秒,但這不是這段代碼的主要關注點。
ThreadPoolExecutor從啟動5個核心線程開始,允許池最多增長8個線程。出於演示目的,我們只允許未使用的線程空閒大約1秒。
這裡的隊列實現是一個LinkedBlockingQueue,具有10個可運行實例的容量。為了跟蹤線程的創建,我們還實現了一個簡單的ThreadFactory。
為處理跟蹤處理因線程飽和拒絕執行實例,我們實現了RejectedExecutionHandler。
main()方法中的循環現在在短時間內向池發出20個Runnable實例。
示例的輸出顯示,我們必須創建8個線程(最多8個)來處理所有掛起的可運行項:
結果還顯示,taskId大於18的所有任務都被轉發給RejectedExecutionHandler。
這是因為我們的Runnable實現休眠5秒。在前10個線程啟動之後,隊列只能容納另外8個可運行實例。然後必須拒絕所有其他實例。
最後,shutdown()方法允許ThreadPoolExecutor拒絕所有進一步的任務,並等待已經發出的任務被執行。
我們可以將shutdown()調用替換為shutdownNow()調用。
shutdownNow()試圖中斷所有正在運行的線程,並在不等待所有線程完成的情況下關閉線程池。
在上面的示例中,我們將看到8個InterruptedException異常,因為我們的8個休眠線程將立即被喚醒。
我們知道上面定義的基於Runnable接口執行任務,其執行函數是一個無返回值run()方法,用於執行返回值為void的任務。
如果需要線程執行的任務有返回值,為此我們定義了Callable接口,該接口定義了方法
V call();
Executor接口非常簡單,它只強制底層實現實現execute()方法。而ExecutorService在擴展Executor接口並添加一系列實用程序方法(例如,添加了完整的任務集合)、關閉線程池的方法,以及查詢執行一個任務的結果的實現的能力。
那麼JDK如何來處理任務返回值並提交給線程池以執行呢?
由於任務的提交者無法提前知道任務何時執行以及執行持續多長時間。所以讓當前線程等待結果顯然不是解決方案。
那麼我們就需要通過另外實現一個檢查線程結果是否已經可用的功能,來阻塞或等待一定的時間從而獲取執行的結果。
這就是java.util.concurrent.Future
另外我們還有一個接口,它通過增加一些方法擴展Executor接口和ExecutorService接口,使得可以在給定的時間點調度任務。這個接口的名稱是ScheduledExecutorService,它基本上提供了一個schedule()方法,該方法需要一個參數,知道任務執行需要多長時間:
schedule(Callablecallable, long delay, TimeUnit unit);
schedule(Runnable command, long delay, TimeUnit unit);
與ExecutorService類似,schedule()方法有兩種變體:
一個用於Runnable接口,另一個用於Callable接口返回值的任務。
ScheduledExecutorService還提供了一種定期執行任務的方法:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
period參數可以指定任務應該運行的周期,unit是單位。
下面我們編寫一個示例來展示了如何創建ThreadPoolExecutor。
這裡ScheduledExecutorService的實現名為ScheduledThreadPoolExecutor,必須與上面使用的ThreadPoolExecutor非常相似地處理。但是通常不需要完全控制ExecutorService的所有特性。
比如我們想像一個簡單的測試客戶機,它應該使用一個簡單的ThreadPool調用一些伺服器方法。
為此JDK為我們創建了一個名為Executors的簡單工廠類,這裡請注意後面的「s」,該類提供了一些靜態方法來創建一個可讀的ThreadPoolExecutor。所有這些都可以讓我們實現了一個簡單的線程池,它執行一系列計算一些數字的任務。
註:這裡的數字處理操作是為了演示目的,用一個簡單的Thread.sleep()代替。
ExecutorService的創建只需要一行程序代碼就完成了。這裡為了執行一些任務,我們只需要一個for循環,它創建了ExecutorsExample的幾個新實例,並將返回的Future存儲在一個數組中。在向服務提交任務之後,我們只需要等待結果。Future的get()方法正在阻塞,即當前線程休眠,直到結果可用為止。
當然,此方法可以採用超時規範來重寫,以便在任務未在定義的時間段內完成時,讓等待的線程繼續執行。
在Java的多線性並發編程中,線程的創建一般不要採用手動去創建,最好使用線程池來創建,從而減少線程創建過程中的性能損耗。
為此,我們可以使用java.util.concurrent包里定義的一些線程池實現接口和類,從而方便有效從處理多線程並發編程過程中線程的創建問題。其中Executors工廠方法還提供了很多諸如創建固定線程數量的線程池,還有創建彈性線程數量的線程池,以及指定線程執行周期等具體的方法,請參見Java類庫的說明文檔。