多个CompletableFuture样例
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 注意:
* handle 和 thenApply方法的区别
* 它们与handle方法的区别在于handle方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。
* 而thenApply方法只是用来处理正常值,因此一旦有异常就会抛出。
*/
public class SimpleTest {
private ExecutorService executorService = new ThreadPoolExecutor(20, 40, 100, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
/**
* CompletableFuture 串行操作
*/
@Test
public void test01(){
CompletableFuture.supplyAsync(()->"hello ")
.thenApply(str -> str + "world ")
.thenCombine(CompletableFuture.completedFuture("java"),(s1, s2)->s1 + s2)
.thenAccept(System.out::println);
}
/**
* obtrudeException 主动抛出异常
* 1. 如果将主动抛异常时间延长到6s,
* 由于该节点计算完成, 则不会收到异常
*
* 收到异常:手动失败
*/
@Test
public void test02() throws InterruptedException {
CompletableFuture<String> fu = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("成功执行了");
} catch (InterruptedException ignored) {
}
return null;
});
CompletableFuture<String> end = fu.exceptionally(throwable -> {
System.out.println("收到异常:" + throwable.getMessage());
return null;
});
TimeUnit.SECONDS.sleep(3);
// 如果等6s后再抛,由于结果已经计算完成,则不会受到异常
// TimeUnit.SECONDS.sleep(6);
fu.obtrudeException(new RuntimeException("手动失败")); // 手动抛出一个异常,若结果未计算完成,则会抛出。
try {
String result = end.join();
System.out.println(result);
} catch (CompletionException e) {
System.out.println(e.getCause().getMessage());
}
}
/**
* join和get的区别
*/
@Test
public void test03(){
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int i =1/0;
return 1;
});
// CompletableFuture.allOf(f1).join();
// System.out.println("join()抛出包装的CompletionException,本质还是内部发生的异常,不需要手动try..catch");
try {
Integer result = f1.get();
System.out.println("result: " + result);
System.out.println("get()检查异常需要手动处理try..catch");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* handle手动处理异常
*/
@Test
public void test04(){
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + " resultB")
// 任务 C 抛出异常
.thenApply(resultB -> {throw new RuntimeException();})
// 处理任务 C 的返回值或异常
.handle(new BiFunction<Object, Throwable, Object>() {
@Override
public Object apply(Object re, Throwable throwable) {
if (throwable != null) {
return "errorResultC";
}
return re;
}
})
.thenApply(resultC -> resultC + " resultD");
System.out.println(future.join());
// errorResultC resultD
}
/**
* allOf 和 anyOf
* allOf 全部任务执行完成后执行,无返回值
* anyOf 任一任务执行后执行,返回值Object (任一任务的返回结果)
*/
@Test
public void test05(){
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<Integer> cfB = CompletableFuture.supplyAsync(() -> 123);
CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> "resultC");
CompletableFuture<Void> combinedFuture1 = CompletableFuture.allOf(cfA, cfB, cfC);
// 所以这里的 join() 将阻塞,直到所有的任务执行结束
Void join1 = combinedFuture1.join();
System.out.println(join1);
CompletableFuture<Object> combinedFuture2 = CompletableFuture.anyOf(cfA, cfB, cfC);
// join() 方法会返回最先完成的任务的结果,所以它的泛型用的是 Object,因为每个任务可能返回的类型不同。
Object join2 = combinedFuture2.join();
System.out.println(join2);
}
/**
* either 方法
* 各个带 either 的方法,表达的都是一个意思,指的是两个任务中的其中一个执行完成,就执行指定的操作
* 它们几组的区别也很明显,分别用于表达是否需要任务 A 和任务 B 的执行结果,是否需要返回值
*
* 注意:
* 1、cfA.acceptEither(cfB, result -> {}); 和 cfB.acceptEither(cfA, result -> {}); 是一个意思;
* 2、第二个变种,加了 Async 后缀的方法,代表将需要执行的任务放到 ForkJoinPool.commonPool() 中执行(非完全严谨);
* 第三个变种很好理解,将任务放到指定线程池中执行;
* 3、难道第一个变种是同步的?不是的,而是说,它由任务 A 或任务 B 所在的执行线程来执行,取决于哪个任务先结束。
*/
@Test
public void test06(){
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
cfA.acceptEither(cfB, result -> {});
cfA.acceptEitherAsync(cfB, result -> {});
cfA.acceptEitherAsync(cfB, result -> {}, executorService);
cfA.applyToEither(cfB, result -> {return result;});
cfA.applyToEitherAsync(cfB, result -> {return result;});
cfA.applyToEitherAsync(cfB, result -> {return result;}, executorService);
cfA.runAfterEither(cfA, () -> {});
cfA.runAfterEitherAsync(cfB, () -> {});
cfA.runAfterEitherAsync(cfB, () -> {}, executorService);
}
/**
* 计算结果完成时的回调方法
* public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
* public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
* public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
* public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
*/
@Test
public void test07(){
final int randomNum = new Random().nextInt(100);
System.out.println("randomNum: " + randomNum);
final CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
if (randomNum % 2 > 0) {
int i = 10 / 0;
}
return randomNum;
}).whenComplete((result, throwable) -> {
System.out.println(MessageFormat.format("result:{0}, throwable:{1}", result, throwable));
}).exceptionally((throwable -> {
System.out.println(throwable);
return -1;
}));
System.out.println("cf.join() = " + cf.join());
}
/**
* thenCombine 合并任务
* 把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
*/
@Test
public void test08(){
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello ");
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 120);
CompletableFuture<String> result = future1.thenCombine(future2, (t, u) -> t + u);
System.out.println(result.join());
}
/**
* thenCompose 将第一个future的返回结果传入第二个future中执行
* thenApply()转换的是泛型中的类型,是同一个CompletableFuture,相当于将CompletableFuture<T> 转换成CompletableFuture<U>
* thenCompose()用来连接两个CompletableFuture,是生成一个新的CompletableFuture。
*/
@Test
public void test09() {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{
int t = new Random().nextInt(10);
System.out.println("t1="+t);
return t;
}).thenCompose(t -> CompletableFuture.supplyAsync(()->{
return 100 + t;
}));
System.out.println(f1.join());
}
/**
* 异常处理
* 1. handle..() 不会抓获异常,所以配置多个都会被执行;
* 2. exceptionally() 会抓获异常所以只生效一次。
* 3. exceptionally() 必须在结束前调用,否则不生效
*
*我是1号异常处理器
*我是3号异常处理器
*我是2号异常处理器
*我是4号异常处理器
*/
@Test
public void test10() {
CompletableFuture<Object> handle = CompletableFuture.runAsync(() -> {
throw new RuntimeException("异常");
})
.handleAsync((aVoid, throwable) -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是1号异常处理器");
return null;
})
.handle((o, throwable) -> {
System.out.println("我是3号异常处理器");
return null;
})
.handleAsync((avoid, throwable) -> {
System.out.println("我是2号异常处理器");
throw new RuntimeException("处理异常中的异常");
// return null;
})
.exceptionally(throwable -> {
System.out.println("我是4号异常处理器");
return null;
})
.exceptionally(throwable -> {
System.out.println("我是5号异常处理器");
return null;
});
handle.join();
}
/**
* complete() 完成一个计算,触发客户端的等待
* completeExceptionally() 也可以抛出一个异常,触发客户端的等待
*
* 我们有两个后门方法可以重设这个值:obtrudeValue、obtrudeException
* 但是使用的时候要小心,因为complete已经触发了客户端,有可能导致客户端会得到不期望的结果。
*/
@Test
public void test11(){
CompletableFuture<String> f = new CompletableFuture<>();
// boolean flag1 = f.complete("100");
// System.out.println(flag1); // true
// System.out.println(f.join()); // 100
boolean flag2 = f.completeExceptionally(new Exception("自定义异常"));
System.out.println(flag2); // true
System.out.println(f.join());
}
/**
* Java Future 转 CompletableFuture
*/
@Test
public void test12(){
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "hello world");
CompletableFuture<String> cf = toCompletable(future, executorService);
}
// future -> CompletableFuture
public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}, executor);
}
/**
* 将多个CompletableFuture组合成一个CompletableFuture,这个组合后的CompletableFuture的计算结果是个List,
* 它包含前面所有的CompletableFuture的计算结果,guava的Futures.allAsList可以实现这样的功能,
* 但是对于java CompletableFuture,我们需要一些辅助方法
*/
@Test
public void test13(){
List<CompletableFuture<String>> futures = Lists.newArrayList(
CompletableFuture.supplyAsync(() -> "a"),
CompletableFuture.supplyAsync(() -> "b"),
CompletableFuture.supplyAsync(() -> "c"),
CompletableFuture.supplyAsync(() -> "d")
);
List<String> resultList = sequence(futures).join();
System.out.println(resultList); // [a, b, c, d]
}
// 多个CompletableFutureList的计算结果List包装成一个CompletableFuture
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
}
public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
return sequence(futureList);
}
}
1. 本站所有资源来源于用户上传和网络,如有侵权请及时联系删除,本站不承担任何法律责任!
2. 分享目的仅供大家学习和研究,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的教程、源码等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"www.94zyw.com",如遇到无法解压的请联系管理员!
94资源网 » 多个CompletableFuture样例
2. 分享目的仅供大家学习和研究,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的教程、源码等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"www.94zyw.com",如遇到无法解压的请联系管理员!
94资源网 » 多个CompletableFuture样例