前言
前面我們通過一篇文章講過多線程應用開發過程中,我們遇到的如何對線程進行設計和應用的問題,其中我們提到了一個分離和聯合線程模型。
在Java的1.7版以後,它提供了一個Fork/Join框架。用來實現一種常見的多線程處理模型設計。
Fork/Join
Fork/Join框架的基類是java.util.concurrent.ForkJoinPool。
這個類實現了Executor和ExecutorService兩個接口以及AbstractExecutorService抽象類。
所以ForkJoin是一個特殊的線程池實現。
因此,ForkJoin線程池基本上是一個執行特殊任務的線程池,即ForkJoinTask。
這個類實現了已知的接口Future,並使用了get()、cancel()和isDone()等方法。
除此之外,這個類還提供了兩個方法,它們為整個框架提供了名稱:fork()和join()。
調用fork()將啟動任務的異步執行,而調用join()將等待任務完成並檢索其結果。
因此,我們可以將一個給定的任務分解成多個較小的任務,派生每個任務,最後等待所有任務完成。這使得複雜問題的實現更加容易。
在計算機科學中,這種方法也稱為分治法。
每當一個問題太複雜而不能一次解決時,它就被分解成多個更小、更容易解決的問題。這可以用偽代碼寫成:
首先,我們檢查問題的當前大小是否大於給定的閾值。
如果是這種情況,我們將問題分成更小的問題,fork()每個新任務,然後通過調用join()等待結果。
當join()返回每個子任務的結果時,我們必須找到較小問題的最佳解決方案,並將其作為最佳解決方案返回。
重複這些步驟,直到給定的閾值太低,並且問題太小,我們可以直接計算其解決方案,而無需進一步的除法。
RecursiveTask
為了更好地理解這個過程,我們實現了一個算法,它可以找到整數值數組中最小的數。
這個問題不是我們在日常工作中使用ForkJoinPool可以解決的問題,但是下面的實現非常清楚地展示了基本原則。
在main()方法中,我們設置一個帶有隨機值的整數數組,並創建一個新的ForkJoinPool。
傳遞給其構造函數的第一個參數是指示所需並行度的級別。在這裡,我們查詢運行時中可用的CPU內核的數量。
然後調用invoke()方法並傳遞一個GetMinNumb實例。
GetMinNumb擴展了類RecursiveTask,它本身是前面提到的ForkJoinTask的子類。
ForkJoinTask類實際上有兩個子類:一個是為返回值的任務設計的(RecursiveTask),另一個是為沒有返回值的任務設計的(RecursiveAction)。
超類要求我們必須實現compute()方法。在這裡,我們查看整數數組的給定部分,並決定當前的問題是否太大而不能立即解決,需要分解處理。
在尋找數組中最小的數字時,需要直接解決的最小問題大小是比較兩個元素並返回它們的最小值。
如果當前有兩個以上的元素,則將數組分成兩部分,並再次找到這兩部分中最小的數。這是通過創建GetMinNumb的兩個新實例來實現的。
構造函數由數組和開始和結束索引組成。然後,通過調用fork()來異步地啟動這兩個任務的執行。這個調用提交線程池隊列中的兩個任務。
線程池實現了一種稱為「工作竊取」的策略,即如果所有其他線程都有足夠的工作要做,則當前線程從其他任務之一竊取其工作。這確保任務儘可能快地執行。
RecursiveAction
如上所述,在RecursiveTask旁邊還有RecursiveAction類。
與RecursiveTask不同的是,它不必返回值,因此它可以用於可以直接在給定數據結構上執行的異步計算。
這樣一個例子是計算灰度圖像的彩色圖像。我們所要做的就是對圖像的每個像素進行疊代,並使用以下公式從RGB值中計算出灰度值:
gray = 0.2126 * red + 0.7152 * green + 0.0722 * blue
浮點數表示特定顏色對人類對灰色感知的影響程度。由於綠色使用的是最大值,我們可以得出一個灰度圖像的計算結果接近綠色部分的3/4。
因此,基本實現應該是這樣的,假設image是我們表示實際像素數據的對象,setRGB()和getRGB()方法用於檢索實際的RGB值:
上面的實現在單CPU機器上運行良好。但是如果我們有多個可用的CPU,我們可能希望將這些工作分配給可用的核心。
因此,不需要在兩個嵌套的for循環中遍歷所有像素,我們可以用ForkJoinPool並為圖像的每一行(或每一列)提交一個新任務。
一旦將一行轉換為灰度,當前線程就可以處理下一行。
這個原則在下面的例子中實現:
在main()方法中,我們使用Java的ImageIO類讀取圖像。返回的BufferedImage實例具有我們需要的所有方法。
我們可以查詢行數和列數,並檢索和設置每個像素的RGB值。所以我們要做的就是遍歷所有行,並向我們的ForkJoinPool提交一個新的GrayscaleImageAction。後者收到了關於可用處理器的提示,作為其構造函數的參數。
現在,通過調用它們的compute()方法,ForkJoinPool可以異步啟動任務。在這個方法中,我們遍歷每一行並根據其灰度值更新相應的RGB值。
將所有任務提交到池後,我們在主線程中等待關閉整個池,然後使用ImageIO.write()方法將更新後的BufferedImage寫回磁碟。
令人驚訝的是,如果不使用可用的處理器,我們只需要幾行代碼。這裡我們再次通過使用java.util.concurrent包來完成實現。
ForkJoinPool為提交任務提供了三種不同的方法:
- execute(ForkJoinTask):此方法異步執行給定的任務。它沒有返回值。
- invoke(ForkJoinTask): 此方法等待任務返回值。
- submit(ForkJoinTask): 此方法異步執行給定的任務。它返回對任務本身的引用。因此,可以使用任務引用來查詢結果(因為它實現了Future接口)。
有了這些知識,我們就很清楚為什麼要使用execute()方法提交上面的GrayscaleImageAction。
如果我們使用invoke(),主線程就會等待任務完成,我們就不會利用可用的並行度。
當我們仔細研究ForkJoinTask-API時,我們發現了同樣的區別:
- ForkJoinTask.fork(): ForkJoinTask是異步執行的。它沒有返回值。
- ForkJoinTask.invoke(): 立即執行ForkJoinTask,並在完成後返回結果。
ForkJoinPool 和 ExecutorService
既然我們已經知道了ExecutorService和ForkJoinPool,您可能會問自己為什麼應該使用ForkJoinPool而不是ExecutorService。
兩者之間的差別並不大。兩者都有execute()和submit()方法,並使用一些公共接口的實例,如Runnable、Callable、RecursiveAction或RecursiveTask。
為了更好地理解這種差異,讓我們嘗試使用ExecutorService從上面實現GetMinNumb類:
代碼看起來非常相似,除了我們將任務提交給ExecutorService,然後使用返回的Future實例來等待結果之外。
這兩個實現之間的主要區別可以在線程池構建的地方找到。
在上面的例子中,我們創建了一個包含64個線程的固定線程池。
為什麼選擇這麼大的數字? 這裡的原因是,為每個返回的Future調用get()會阻塞當前線程,直到結果可用為止。
如果我們只向池提供可用cpu數量的線程,那麼程序將耗盡資源並無限期掛起。
ForkJoinPool實現了前面提到的工作竊取策略,即每次運行的線程都必須等待某個結果;
該線程從工作隊列中刪除當前任務,並執行一些準備運行的其他任務。
這樣,當前線程就不會被阻塞,可以用來執行其他任務。一旦計算了最初掛起的任務的結果,任務將再次執行,join()方法將返回結果。
這是與常規ExecutorService的一個重要區別,在常規ExecutorService中,我們必須在等待結果時阻塞當前線程。
文章來源: https://twgreatdaily.com/lgAAo24BMH2_cNUgwpnS.html