skip to content
Honwhy Wang

正确地使用CompletableFuture

/ 10 min read

概述

多线程编程并不是一项容易的事情,但是多线程有利于提高效率,减少耗时,在一个方法内使用多线程有助于提高这个方法的qps如此。在Java中编写多线程有很多代码可以参考的,朴素一点的做法是使用Future、FutureTask,而本文关注的是另外一种做法CompletableFuture,相比Future使用起来更加方便,但是不小心的话也会出毛病,本文会对此给出解决方法和优化思路,勉之共同进步。

CompletableFuture的重要api方法

使用CompletableFuture会涉及到supplyAsync、allOf、get、join等方法,一个例子如下

@Test
public void test() throws InterruptedException {
ThreadPoolExecutor executor = ThreadPoolExecutorBuilder.build(2, 4, 100);
List<CompletableFuture<?>> fs = new ArrayList<>(size);
List<CompletableFuture<?>> others = new ArrayList<>();
for (int i = 0; i < size; i++) {
MyRandomSleepTask task = new MyRandomSleepTask(ai.incrementAndGet());
CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> {
task.run();
return null;
}, executor);
if (i % 2 == 0) {
System.out.println("pick task-" + (i + 1));
fs.add(cf);
} else {
others.add(cf);
}
}
CompletableFuture<?>[] fa = new CompletableFuture[0];
try {
CompletableFuture.allOf(fs.toArray(fa)).get(2500, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
CompletableFuture.allOf(others.toArray(fa)).join();
}

使用supplyAsync发起异步执行任务,使用allOf聚合所有的Future,再选择get或者join等待所有任务执行完成,get方法还可以使用超时参数。在实际项目中应用超时等待的需求还是大量存在的,一方面可以控制接口的响应时间,一方面防止线程池的工作线程繁忙被长耗时的任务拖垮,用快速失败的方式换接口的稳定保障。

使用get超时等待可能会出现问题

由于项目实践中发现了很多get方法的Exception,即等待超时的TimeoutException,这不得不引起我们的注意,关于线程池的参数如何正确配置,关于get方法超时后会做什么;1)根据请求量,简单地数学计算,增加了核心线程数,调整了线程池队列的长度,我们的目的是当队列满的时候可以使用更多线程工作;2)关于get的怀疑,发生高并发时一次请求的Runnable应该是分散插入到线程池队列里的(假设已经出现了排队的请求),那么get先于Runnable超时结束了,它关联的Runnable是什么状态呢,会不会继续占据线程池队列呢

调参并不容易的,经过调整有一定的改善,但是并发请求量一旦提高了之后,导致了几乎所有的请求都出现了get超时,因此,大胆猜测get超时后,它关联的Runnable还是保留在队列里面,而这些Runnable的执行都可以看作是无意义的耗时,这些耗时通通会影响到其他请求的get等待。我们并不确定请求的先来后到和它们关联的Runnable的执行顺序的关系,不管是谁的Runnable出现了长耗时,惩罚的是所有请求。

这里做了一个实验,如上文的test方法,打印日志如下,

pick task-1
pick task-3
pick task-5
pick task-6
pick task-10
pick task-14
[pool-1-thread-1]MyRandomSleepTask-1 will Sleep(211ms)...
[pool-1-thread-2]MyRandomSleepTask-2 will Sleep(530ms)...
[pool-1-thread-1]MyRandomSleepTask-3 will Sleep(1622ms)...
[pool-1-thread-2]MyRandomSleepTask-4 will Sleep(174ms)...
[pool-1-thread-2]MyRandomSleepTask-5 will Sleep(274ms)...
[pool-1-thread-2]MyRandomSleepTask-6 will Sleep(1669ms)...
[pool-1-thread-1]MyRandomSleepTask-7 will Sleep(1581ms)...
java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at com.github.honwhy.CompletableFutureTest.test(CompletableFutureTest.java:59)
...
[pool-1-thread-2]MyRandomSleepTask-8 will Sleep(1248ms)...
[pool-1-thread-1]MyRandomSleepTask-9 will Sleep(1185ms)...
[pool-1-thread-2]MyRandomSleepTask-10 will Sleep(1870ms)...
[pool-1-thread-1]MyRandomSleepTask-11 will Sleep(1990ms)...
[pool-1-thread-2]MyRandomSleepTask-12 will Sleep(588ms)...
[pool-1-thread-2]MyRandomSleepTask-13 will Sleep(87ms)...
[pool-1-thread-2]MyRandomSleepTask-14 will Sleep(868ms)...
[pool-1-thread-1]MyRandomSleepTask-15 will Sleep(321ms)...

get方法需要等待task-1,3,5,6,10,14完成,但是执行到task-6时就因为超时结束了,但是从日志中还可以看到task-10,14都执行了,这里验证了我们的猜测。

解决多余的Runnable被执行的问题

在网上也找到了关于这个get超时后没有清理Runnable的问题,

http://arganzheng.life/writing-asynchronous-code-with-completablefuture.html
默认情况下,allOf 会等待所有的任务都完成,即使其中有一个失败了,也不会影响其他任务继续执行。 但是大部分情况下,一个任务的失败,往往意味着整个任务的失败,继续执行完剩余的任务意义并不大。 在 谷歌的 Guava 的 allAsList 如果其中某个任务失败整个任务就会取消执行:

引入Guava后解决方案并没有解决这个问题(未继续验证),最后采用的方法也很简单,

try {
CompletableFuture.allOf(fs.toArray(fa)).get(2500, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
if (e instanceof TimeoutException) {
fs.forEach(f -> f.cancel(true));
}
}

出现异常时,在catch中取消这些future。打印日志,

pick task-1
pick task-3
pick task-5
pick task-6
pick task-10
pick task-14
[pool-1-thread-1]MyRandomSleepTask-1 will Sleep(1034ms)...
[pool-1-thread-2]MyRandomSleepTask-2 will Sleep(1409ms)...
[pool-1-thread-1]MyRandomSleepTask-3 will Sleep(954ms)...
[pool-1-thread-2]MyRandomSleepTask-4 will Sleep(42ms)...
[pool-1-thread-2]MyRandomSleepTask-5 will Sleep(174ms)...
[pool-1-thread-2]MyRandomSleepTask-6 will Sleep(359ms)...
[pool-1-thread-2]MyRandomSleepTask-7 will Sleep(851ms)...
[pool-1-thread-1]MyRandomSleepTask-8 will Sleep(973ms)...
java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at com.github.honwhy.CompletableFutureTest.test4(CompletableFutureTest.java:157)
...
[pool-1-thread-2]MyRandomSleepTask-9 will Sleep(836ms)...
[pool-1-thread-1]MyRandomSleepTask-11 will Sleep(181ms)...
[pool-1-thread-1]MyRandomSleepTask-12 will Sleep(1892ms)...
[pool-1-thread-2]MyRandomSleepTask-13 will Sleep(1753ms)...
[pool-1-thread-1]MyRandomSleepTask-15 will Sleep(1204ms)...

可以看到task-6之后get超时了,后续的task-10,14都没有执行了。

使用缓存加速响应

使用缓存当然是一个提高响应速度的一个好办法了,是不是在这个场景中也适用呢。实际上也是适用的,假设在业务的高峰期,用户的请求参数几乎没有改变的,不同用户的请求也可能存在参数相同的情况,偶尔的方法体内出现了get超时,用户获取了空的响应,再不断地重试,针对这些增加缓存肯定是有助于提高响应效率的。本文设计的解决方法是使用Guava Cache,它可以控制Cache的大小,命中的cache还继续延长存活时间,还可以使用弱引用对gc友好等,属于堆内缓存,相比Redis等外部缓存或许Guava Cache更适合。

private static final Cache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.SECONDS)
.expireAfterAccess(3, TimeUnit.SECONDS)
.weakKeys()
.weakValues()
.build();

模拟了http请求的缓存保存和读取,

public class HttpRequestTask implements Callable<String> {
@Override
public String call() {
try {
if (Thread.currentThread().isInterrupted()) {
System.out.printf("[" + Thread.currentThread().getName() + "]" + this.getClass().getSimpleName() + "-" + this.id + " is interrupted");
return null;
}
long left = score % 600;
try {
System.out.println("[" + Thread.currentThread().getName() + "]" + this.getClass().getSimpleName() + "-" + this.id + " will Sleep(" + left + "ms)...");
TimeUnit.MILLISECONDS.sleep(left);
} catch (InterruptedException e) {
//ignore
//Thread.currentThread().interrupt();
}
String result = cache.getIfPresent(query);
if (result != null) {
System.err.println("get result from cache: key=" + query);
return result;
}
String requestUrl = "https://cn.bing.com/search?q=" + query;
System.out.println("[" + Thread.currentThread().getName() + "]" + this.getClass().getSimpleName() + "-" + this.id + " request (" + requestUrl + ")...");
result = doRequest(requestUrl);
if (result != null) {
cache.put(query, result);
}
return result;
} catch (Exception e) {
}
return null;
}
protected String doRequest(String requestUrl) throws Exception {
return MyHttpClient.get(requestUrl);
}
}

测试方法,和上文的test区别在于MyRandomSleepTask换成了HttpRequestTask

for (int i = 0; i < size; i++) {
HttpRequestTask task = new HttpRequestTask(ai.incrementAndGet(), randomChar());
CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> {
return task.call();
}, executor);
if (i % 2 == 0) {
System.out.println("pick task-" + (i + 1));
fs.add(cf);
} else {
others.add(cf);
}
}

继续异步

如果方法体有需要多线程http请求外部接口的话,是否可以使用nio的方式,将执行线程等待http响应时将其挂起让出cpu,或者去执行其他任务,设想还算美好。 使用异步的http客户端只需要引入AsyncHttpClient

public class AsyncHttpRequestTask extends HttpRequestTask {
private static AsyncHttpClient asyncHttpClient = asyncHttpClient(Dsl.config().setConnectTimeout(2000).setReadTimeout(2000));
public AsyncHttpRequestTask(int id, String query) {
super(id, query);
}
@Override
protected String doRequest(String requestUrl) throws Exception {
Future<Response> whenResponse = asyncHttpClient.prepareGet("http://www.example.com/").execute();
Response response = whenResponse.get();
if (response != null) {
return response.getResponseBody(StandardCharsets.UTF_8);
}
return null;
}
public static void closeClient() {
try {
asyncHttpClient.close();
} catch (IOException e) {
//
}
}
}

对测试方法做调整,

for (int i = 0; i < size; i++) {
AsyncHttpRequestTask task = new AsyncHttpRequestTask(ai.incrementAndGet(), randomChar());
CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> {
return task.call();
}, executor);
if (i % 2 == 0) {
System.out.println("pick task-" + (i + 1));
fs.add(cf);
} else {
others.add(cf);
}
}

实际的测试效果并不理想,AsyncHttpRequestTask执行结果不如HttpRequestTask

总结

至此,本文关于CompletableFuture的正确使用及优化方案都所有提及,正确的使用方案一定是符合业务需求的,优化的目的和手段也是和业务需求一致的,在追求多线程编程正确及效率上还有很多的期待。
参考代码,fgc