ListenableFuture的说明
Guava 定义了 ListenableFuture接口
并继承了JDK concurrent
包下的Future
接口,ListenableFuture 允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在JDK concurrent
中的Future
是不支持的。 在高并发并且需要大量Future对象的情况下,推荐尽量使用ListenableFuture
来代替..
ListenableFuture 中的基础方法是addListener(Runnable, Executor)
, 该方法会在多线程运算完的时候,在Executor中执行指定的Runnable。
不做过多入门介绍,直接上例子。
主动中断例子
maven引入
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.1-jre</version>
</dependency>
引入的包:
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Queue;
import java.util.concurrent.*;
main方法:
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executor);
final Queue<ListenableFuture<Boolean>> currentMiningTasks = new ConcurrentLinkedQueue<>();
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A(true)));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
currentMiningTasks.add(listeningExecutorService.submit(new A()));
// 一定要调用这个方法,isTerminated()永远不为true
listeningExecutorService.shutdown();
executor.shutdown();
for (final ListenableFuture<Boolean> task : currentMiningTasks) {
try {
System.out.println("结果..." + task.get());
if (task.get()) { // 如果是true,则取消其他任务
for (final ListenableFuture<Boolean> t : currentMiningTasks) {
if (t != null && !t.isCancelled()) {
t.cancel(true);
}
}
}
} catch (InterruptedException | CancellationException e) {
} catch (Exception e) {
}
}
while (true) {
if (listeningExecutorService.isTerminated() && executor.isTerminated()) {
System.out.println("listeningExecutorService executor结束");
break;
}
}
System.out.println("结束......");
}
static class A implements Callable {
Boolean flag = false;
public A() {
}
public A(Boolean flag) {
this.flag = flag;
}
@Override
public Object call() throws Exception {
Thread.sleep(1000);
return flag;
}
}
运行结果:
结果...false
结果...false
结果...true
结果...false
listeningExecutorService executor结束
结束......
可以看到,收到true的结果,其他的任务没有执行就结束了