More on @crxjs/vite-plugin
Improving Performance with Java’s CompletableFuture, CompletableFuture
is what we use when we want to run tasks in parallel mode. This is just the example from the website,
Executor executor = Executors.newFixedThreadPool(10);
var futureCategories = Stream.of(
new Transaction("1", "description 1"),
new Transaction("2", "description 2"),
new Transaction("3", "description 3"),
new Transaction("4", "description 4"),
new Transaction("5", "description 5"),
new Transaction("6", "description 6"),
new Transaction("7", "description 7"),
new Transaction("8", "description 8"),
new Transaction("9", "description 9"),
new Transaction("10", "description 10")
)
.map(transaction -> CompletableFuture.supplyAsync(
() -> CategorizationService.categorizeTransaction(transaction), executor)
)
.collect(toList());
We wait in a mount of time for all tasks to finish.
CompletableFuture
from each taskstatic void play() {
Random random = new Random();
Map<Integer, Object> map = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> all = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
int finalI = i;
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
if (random.nextDouble() < 0.5) {
map.put(finalI, PRESENT);
} else {
try {
TimeUnit.MILLISECONDS.sleep(1_500 + Math.round(random.nextDouble() * 500));
} catch (InterruptedException ignore) {
//
}
}
return null;
}, threadPoolExecutor);
all.add(future);
}
try {
CompletableFuture.allOf(all.toArray(new CompletableFuture<?>[0])).get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
}
Set<Integer> set = map.keySet();
System.out.println("play result: " + set);
}
From a macroscopic perspective, play
method shall run within 1000ms, while from a microscopic perspective, any task may have not begin to run or have not finished.
Every task should begin to run as quickly as possible, and finish up in a short time. If any task is unlucky, it get stuck in the ThreadPoolExecutore
’s queue, or holds resource of ThreadPoolExecutore
’s Thread
. That’s the matter.
When the application is at a high traffic, every millisecond counts, which we don’t like to waste any. For example, with a thread pool, core size at 4, max size at 8, having a queue, size at 100.
Task of User-8 may have enter the queue earlier than User-5’s, as a result, User-8’s task get chance to run earlier than User-5’s.
When the play
method is called, the clock is ticked in at the same moment, but its tasks will be executed by thread pool some time later. In another word, it may happen that play
method finish work earlier than its tasks, cause tasks remain in the queue.
To sum up, tasks running or to-be-run beyond timeout of its calling method, have to be abondoned, or they will set others to wait, and others to waste.
We clean tasks that have not run.
try {
CompletableFuture.allOf(all.toArray(new CompletableFuture<?>[0])).get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
all.forEach(f -> {
if (!f.isDone() && !f.isCancelled() && !f.isCompletedExceptionally()) {
f.cancel(false);
}
});
}
We clean tasks that shall not run.
@Override
protected void beforeExecute(Thread t, Runnable r) {
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.schedule(() -> {
t.interrupt();
}, 1000, TimeUnit.MILLISECONDS);
workerSet.put(r, scheduledFuture);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture scheduledFuture = workerSet.remove(r);
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
}
Decorate task within a schedule task, while its timeout equals to play
calling method, if this task finish up and call afterExecute
schedule task get remove, or it will be interrupted by schedule task.
code,
public class App {
static BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(100);
static ThreadPoolExecutor threadPoolExecutor = new InterruptableThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, blockingQueue);
static Object PRESENT = new Object();
public static void main(String[] args) {
for (int j = 1; j <= 10; j++) {
play(j);
}
threadPoolExecutor.shutdown();
}
/**
* as like call 10 http request
*/
static void play(int j) {
Random random = new Random();
Map<Integer, Object> map = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> all = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
int finalI = i;
System.out.printf("%d round %d step enter running\n", j, finalI);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
if (random.nextDouble() < 0.5) {
map.put(finalI, PRESENT);
} else {
try {
TimeUnit.MILLISECONDS.sleep(1_500 + Math.round(random.nextDouble() * 500));
System.err.printf("%d round %d step, finish sleep\n", j, finalI);
} catch (InterruptedException ignore) {
//
}
}
return null;
}, threadPoolExecutor);
all.add(future);
}
try {
CompletableFuture.allOf(all.toArray(new CompletableFuture<?>[0])).get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
System.out.println(j + " round, before clean: " + threadPoolExecutor.getQueue().size());
all.forEach(f -> {
if (!f.isDone() && !f.isCancelled() && !f.isCompletedExceptionally()) {
f.cancel(false);
}
});
System.out.println(j + " round, after clean: " + threadPoolExecutor.getQueue().size());
}
Set<Integer> set = map.keySet();
System.out.println(j + " round, play result: " + set);
}
static class InterruptableThreadPoolExecutor extends ThreadPoolExecutor {
static ScheduledExecutorService scheduledThreadPoolExecutor = Executors.newScheduledThreadPool(10);
static Map<Runnable, ScheduledFuture> workerSet = new ConcurrentHashMap<>();
public InterruptableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.schedule(() -> {
t.interrupt();
}, 1000, TimeUnit.MILLISECONDS);
workerSet.put(r, scheduledFuture);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture scheduledFuture = workerSet.remove(r);
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
}
@Override
public void shutdown() {
scheduledThreadPoolExecutor.shutdownNow();
super.shutdown();
}
}
}