Streaming Java CompletableFutures in Complete Order - هل تريد أن ترى ماذا يحدث عندما تجمع Stream و CompletableFutures في Java؟ ..
جلبت لنا Java 8 أدوات مثل CompletableFuture
و Stream API ... دعونا نحاول دمجهما معًا وإنشاء Stream
تقوم بإرجاع قيم من مجموعة CompletableFutures
عند وصولهم.
تم استخدام هذا النهج أيضًا عند تطوير 1.0.0 من المجمعات المتوازية .
تدفق CompletableFutures
بشكل أساسي ، ما نحاول القيام به هو تنفيذ حل من شأنه أن يسمح لنا بتحويل مجموعة من العقود الآجلة إلى سلسلة من القيم التي تعود بها تلك العقود الآجلة:
Collection -> Stream
في عالم Java ، يمكن تحقيق ذلك باستخدام ، على سبيل المثال ، طريقة ثابتة:
public static Stream inCompletionOrder(Collection futures) { // ... }
لإنشاء Stream
مخصص ، يحتاج المرء إلى تنفيذ java.util.Spliterator مخصص:
final class CompletionOrderSpliterator implements Spliterator { ... }
والآن يمكننا الانتهاء من تنفيذ طريقتنا الثابتة:
public static Stream completionOrder(Collection futures) { return StreamSupport.stream( new CompletionOrderSpliterator(futures), false); }
هذا هو الجزء السهل ، فلننفذ CompletionOrderSpliterator
حاليا.
تنفيذ CompletionOrderSpliterator
لتنفيذ Spliterator
، سنحتاج إلى ملء الفراغات لتوفير تطبيقات مخصصة للطرق التالية:
final class CompletionOrderSpliterator implements Spliterator { CompletionOrderSpliterator(Collection futures) { // TODO } @Override public boolean tryAdvance(Consumer action) { // TODO } @Override public Spliterator trySplit() { // TODO } @Override public long estimateSize() { // TODO } @Override public int characteristics() { // TODO } }
بطبيعة الحال ، نحن بحاجة إلى مُنشئ مناسب أيضًا.
قد تتضمن الطريقة الأكثر طبيعية للتعامل مع المشكلة عمل نسخة عمل من مجموعة المصدر ، وانتظار استكمال أي مستقبل ، وإزالتها من المجموعة وإطعامها إلى Spliterator
بحد ذاتها.
يمكن إجراء انتظار اكتمال أي مستقبل سريعًا باستخدام CompletableFuture#anyOf
، وهو يعالج نشر الاستثناءات بشكل صحيح خارج الصندوق.
ومع ذلك ، هناك تعقيد بسيط.
إذا نظرت إلى توقيع CompletableFuture#anyOf
، فسترى أنه ليس عمليًا للغاية لأنه يقبل التعدد CompletableFutures
وإرجاع CompletableFuture<
واحد Object>
، ولكن هذه ليست المشكلة الرئيسية هنا (مجرد إزعاج بسيط).
المشكلة الحقيقية هي أن CompletableFuture
المُعاد بواسطة الطريقة ليس المستقبل الذي اكتمل أولاً ، ولكنه جديد CompletableFuture
المثال الذي يكتمل عند اكتمال أي مستقبل.
هذا يجعل فكرة انتظار المستقبل بأكملها ثم إزالتها من قائمة العقود الآجلة المتبقية معقدة بعض الشيء. لا يمكننا الاعتماد على المساواة المرجعية ، لذلك يمكننا إما إجراء مسح خطي بعد كل إشارة من CompletableFuture#anyOf
، أو محاولة ابتكار شيء أفضل.
يمكن أن يبدو الحل الساذج كما يلي:
نسيت كلمة المرور الخاصة بي فيريزون
private T takeNextCompleted() { anyOf(futureQueue.toArray(new CompletableFuture[0])).join(); CompletableFuture next = null; for (CompletableFuture future : futureQueue) { if (future.isDone()) { next = future; break; } } futureQueue.remove(next); return next.join(); }
أقوم بإجراء مسح خطي وتخزين الفهرس من أجل الإزالة في وقت ثابت. إذا كنت تريد معرفة سبب تمرير 0 إلى CompletableFuture[]
، على الرغم من أنني أعرف الحجم ، تحقق من هذه المقالة.
إذا نظرت إلى المشكلة من وجهة نظر عملية ، يجب أن يكون هذا جيدًا بما يكفي لأن لا أحد يتوقع على الإطلاق أن يتكرر على مجموعة من العقود الآجلة التي يزيد حجمها عن 10-20 ألفًا (على الرغم من ذلك ، فمن الممكن تمامًا لأن CompletableFutures ليست مرتبطة بالخيوط ويمكن إكمال الملايين منها بواسطة شخص واحد مسلك) بسبب قيود عدد خيوط الأجهزة (يمكن أن يختلف العدد الفعلي كثيرًا اعتمادًا على عوامل متعددة ، على سبيل المثال ، حجم المكدس).
ومع ذلك ، قد يتغير ذلك مرة واحدة مشروع لوم يذهب مباشرة.
ومع ذلك ، فإن 20000 تكرار سيؤدي إلى زيارة أي شيء بين 20000 عقدة بشكل متفائل (دائمًا ما يكون المستقبل الأول الذي يكتمل) 200000000 عقدة بتشاؤم.
قناة tcm في السنة
ماذا يمكننا أن نفعل حيال ذلك إذا لم نتمكن من الاعتماد على المساواة المرجعية أو أكواد التجزئة الخاصة بـ CompletableFutures
؟
يمكننا تعيين معرفاتنا لهم وتخزينها في خريطة جنبًا إلى جنب مع العقود الآجلة المطابقة ثم جعل العقود الآجلة تحدد نفسها عن طريق إرجاع الفهارس جنبًا إلى جنب مع القيم الفعلية عن طريق إرجاع زوج.
لذلك ، دعونا نخزن العقود الآجلة لدينا في خريطة:
private final Map > indexedFutures;
الآن ، يمكننا تعيين معرفات يدويًا من تسلسل متزايد بشكل رتيب ، وجعل العقود الآجلة تعيدها أيضًا:
private static Map > toIndexedFutures(List futures) { Map > map = new HashMap(futures.size(), 1); // presizing the HashMap since we know the capacity and expected collisions count (0) int seq = 0; for (CompletableFuture future : futures) { int index = seq++; map.put( index, future.thenApply( value -> new AbstractMap.SimpleEntry(index, value))); } return map; }
والآن ، يمكننا العثور على المستقبل المكتمل التالي ومعالجته بكفاءة من خلال انتظاره وقراءة رقم التسلسل ثم استخدامه لإزالة المستقبل من قائمة العناصر المتبقية:
private T nextCompleted() { return anyOf(indexedFutures.values() .toArray(new CompletableFuture[0])) .thenApply(result -> ((Map.Entry) result)) .thenApply(result -> { indexedFutures.remove(result.getKey()); return result.getValue(); }).join(); }
تنفيذ tryAdvance()
يصبح تافها:
@Override public boolean tryAdvance(Consumer action) { if (!indexedFutures.isEmpty()) { action.accept(nextCompleted()); return true; } else { return false; } }
أصبح الجزء الأصعب وراءنا ، والآن نحتاج إلى تنفيذ ثلاث طرق متبقية:
@Override public Spliterator trySplit() { return null; // because splitting is not allowed } @Override public long estimateSize() { return indexedFutures.size(); // because we know the size } @Override public int characteristics() NONNULL; // because nulls in source are not accepted
ونحن هنا.
مثال العمل
يمكننا التحقق بسرعة من أنه يعمل بشكل مناسب عن طريق إدخال تأخر في المعالجة العشوائية عند المرور عبر تسلسل مرتب:
public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); List futures = Stream .iterate(0, i -> i + 1) .limit(100) .map(i -> CompletableFuture.supplyAsync( withRandomDelay(i), executorService)) .collect(Collectors.toList()); completionOrder(futures) .forEach(System.out::println); } private static Supplier withRandomDelay(Integer i) { return () -> { try { Thread.sleep(ThreadLocalRandom.current() .nextInt(10000)); } catch (InterruptedException e) { // ignore shamelessly, don't do this on production } return i; }; }
ويمكنك أن ترى أن القيم لا يتم إرجاعها بالترتيب الأصلي:
6 5 2 4 1 11 8 12 3
تدفق العقود الآجلة بالترتيب الأصلي
ماذا لو أردنا دلالات مختلفة وحافظنا ببساطة على الترتيب الأصلي؟
لحسن الحظ ، يمكن تحقيق ذلك بسرعة دون أي بنية تحتية معينة:
public static Stream originalOrder( Collection futures) { return futures.stream().map(CompletableFuture::join); }
مثال كامل
package com.pivovarit.collectors; import java.util.AbstractMap; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Spliterator; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import static java.util.concurrent.CompletableFuture.anyOf; /** * @author Grzegorz Piwowarek */ final class CompletionOrderSpliterator implements Spliterator { private final Map > indexedFutures; CompletionOrderSpliterator(Collection futures) { indexedFutures = toIndexedFutures(futures); } @Override public boolean tryAdvance(Consumer action) { if (!indexedFutures.isEmpty()) { action.accept(nextCompleted()); return true; } else { return false; } } private T nextCompleted() { return anyOf(indexedFutures.values().toArray(new CompletableFuture[0])) .thenApply(result -> ((Map.Entry) result)) .thenApply(result -> { indexedFutures.remove(result.getKey()); return result.getValue(); }).join(); } @Override public Spliterator trySplit() { return null; } @Override public long estimateSize() { return indexedFutures.size(); } @Override public int characteristics() IMMUTABLE private static Map > toIndexedFutures(Collection futures) { Map > map = new HashMap(futures.size(), 1); int counter = 0; for (CompletableFuture f : futures) { int index = counter++; map.put(index, f.thenApply(value -> new AbstractMap.SimpleEntry(index, value))); } return map; } }
مثال العمل الكامل يمكن العثور عليها على GitHub أيضًا.
هل لديك فكرة عن كيفية تحسينها؟ لا تتردد ، واسمحوا لي أن أعرف!
# جافا
dzone.com
تدفق Java CompletableFutures في ترتيب الإكمال
Streaming Java CompletableFutures in Complete Order - هل تريد أن ترى ماذا يحدث عندما تجمع Stream و CompletableFutures في Java؟ ...