原標題:關於聚合和多線程的處理套路
作者:Honwhy
來源:SegmentFault 思否
概述
無差別地請求多個外部接口並聚合所有請求結果,應該有屬於它自己的套路,應該將所有多線程的操作屏蔽之,我們只關心參數和結果。因此,應該拋棄Callable/FutureTask/Future等這些手工模式,這些代碼應該交給框架來實現。
手工模式
何為手工模式,我們以Callable為例設計請求外部的接口,可能像下面這樣子,參數是NumberParam,兩個外部接口分別是IntToStringCallable和DoubleToStringCallable,
class IntToStringCallable implements Callable {
private final NumberParam param;
IntToStringCallable(NumberParam numberParam) {this.param = numberParam;}@Overridepublic String call {return Integer.toHexString(param.getAge);}}
class DoubleToStringCallable implements Callable {
private final NumberParam param;
DoubleToStringCallable(NumberParam numberParam) {this.param = numberParam;}@Overridepublic String call {return Double.toHexString(param.getMoney);}}
如果採用FutureTask的方式多線程執行這兩個接口,可能是這樣子的,
FutureTask r1 = new FutureTask<>(new IntToStringCallable(numberParam));new Thread(r1).start;FutureTask r2 = new FutureTask<>(new DoubleToStringCallable(numberParam));new Thread(r2).start;try {List ret = new ArrayList<>;ret.add(r1.get);ret.add(r2.get);log.info("ret=" + ret);} catch (Exception ignore) {}
需要首先構造FutureTask,然後使用Thread比較原始的api去執行,當然還可以再簡化一下,比如使用Future方式,
ExecutorService threadPool = Executors.newFixedThreadPool(2);Future r1 = threadPool.submit(new IntToStringCallable(numberParam));Future r2 = threadPool.submit(new DoubleToStringCallable(numberParam));try {List ret = new ArrayList<>;ret.add(r1.get);ret.add(r2.get);log.info("ret=" + ret);} catch (Exception ignore) {}
我相信這是一種普遍常見的做法了。這裡沒有必要繼續評論這些做法的問題了。
Java 8之後
Java 8之後有了更加方便的異步編程方式了,不用再辛苦地去寫Callable的,一句話就可以表達Callable+FutureTask/...,
CompletableFuture pf = CompletableFuture.supplyAsync( -> new IntToStringCallable(numberParam).call);改造之前的做法結果可能就是這個樣子了,
CompletableFuture r1 = CompletableFuture.supplyAsync( -> new IntToStringCallable(numberParam).call);CompletableFuture r2 = CompletableFuture.supplyAsync( -> new DoubleToStringCallable(numberParam).call);try {List ret = new ArrayList<>;ret.add(r1.get);ret.add(r2.get);log.info("ret=" + ret);} catch (Exception ignore) {}
其實可以看出來,這個時候我們不一定需要一個Callable了,提供異步的能力是supplyAsync來完成的,我們只需要正常的入參出參的普通方法就可以了。
Java 8之後再之後
Java 8之後的異步編程方式確實簡單了很多,但是在我們的業務代碼中還是出現了和異步編程相關的無關業務邏輯的事情,可否繼續簡化呢。本案的設計靈感來自同樣Java 8的優秀設計——ParallelStream,舉個簡單的例子,
Arrays.asList("a", "b", "c").parallelStream.map(String::toUpperCase).collect(Collectors.toList);
異步及多線程是ParallelStream來完成的,用戶只需要完成String::toUpperCase部分。
本案的設計主要有三個interface來實現,分別是,
public interface MyProvider {T provide(V v);}public interface MyCollector {void collectList(T t);
List retList;}public interface MyStream {
List toList(List> providers, V v);}
其實MyProvider表達是請求外部接口,MyStream表示一種類似ParallelStream的思想,一種內化異步多線程的操作模式,MyCollector屬於內部設計api可以不暴露給用戶;
一個改寫上面的例子的例子,
@Testpublic void testStream {MyProvider p1 = new IntToStringProvider;MyProvider p2 = new DoubleToStringProvider;List> providers = Arrays.asList(p1, p2);MyStream myStream = new CollectStringStream;List strings = myStream.toList(providers, numberParam);log.info("ret=" + strings);}
在這個方法內一點異步編程的內容都沒有的,用戶只需要編程自己關心的邏輯即可,當然是要按照Provider的思路去寫,這或許有一點心智負擔。
這個CollectStringStream幫我們完成來一些髒活累活,
public List toList(List> myProviders, NumberParam param) {MyCollector myCollector = new NoMeaningCollector;List> pfs = new ArrayList<>(myProviders.size);for (MyProvider provider : myProviders) {CompletableFuture pf = CompletableFuture.runAsync( -> myCollector.collectList(provider.provide(param)), executor);pfs.add(pf);}try {CompletableFuture.allOf(pfs.toArray(new CompletableFuture[0])).get(3, TimeUnit.SECONDS);} catch (Exception e) {if (e instanceof TimeoutException) {pfs.forEach(p -> {if (!p.isDone){p.cancel(true);}});}}return myCollector.retList;}這樣看起這個設計又不美了,但是如果有更多的外部接口需要調用,CollectStringStream就顯得很有價值了,新加入再多的請求外部接口要改動的代碼很少很少,所以這種思想我覺得是值得推廣的。
總結
照例附上參考代碼,不過值得思考的是我們如何像優秀的代碼學習並運用到自己的項目中。
參考代碼
java-toy:https://github.com/honwhy/java-toy
SegmentFault 思否社區和文章作者展開更多互動和交流。
文章來源: https://twgreatdaily.com/zh/F-MAuHMBd8y1i3sJNSGN.html