JAVA并发编程工具篇--1.1理解Future获取线程执行结果
理解Future获取线程执行结果
背景:在并发编程中,我们可以使用Future来获取子线程执行的结果,然后在主线程一起进行业务处理; 那么Future是如何来工作的;
1 使用:
demo1:使用Future每次都阻塞获取任务的执行结果:
public static void main(String[] args) {
// 声明线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
// 声明 Callable
Callable<Map<String, Object>> commonUseQuatoCall = () -> testGetFutureMap("param");
// 开启一个线程进行业务处理
Future<Map<String, Object>> submitcommonCall = executorService.submit(commonUseQuatoCall);
Map<String, Object> commonUseQuatoData = null;
try {
// 阻塞获取结果
commonUseQuatoData = submitcommonCall.get(50000, TimeUnit.MILLISECONDS);
}catch (Exception ex){
}finally {
// 最后关闭线程池
executorService.shutdown();
}
if (null != commonUseQuatoData){
/**
* do some thing
*/
}
}
// 业务处理
private static Map<String, Object> testGetFutureMap(String param) {
// 处理业务逻辑
Map<String, Object> mapData = new HashMap<>();
/**
* do some thing
*/
mapData.put("flag","sucess");
return mapData;
}
demo2:使用ExecutorCompletionService 优先处理返回结果最快的任务:
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executorService);
completionService.submit(() -> {
return methodA();
});
completionService.submit(() -> {
return methodB();
});
for (int i = 0; i < 2; i++) {
// 获得结果并处理
try {
Map<String, Object> oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);
if ("methodAResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {
// 方法A 返回的结果
System.out.println("\"methodAResult\" = " + "methodAResult");
}
if ("methodBResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {
// 方法B 返回的结果
System.out.println("\"methodBResult\" = " + "methodBResult");
}
}catch (Exception ex){
ex.printStackTrace();
}
}
System.out.println("\"finish\" = " + "finish");
executorService.shutdown();
}
private static Map<String, Object> methodB() throws InterruptedException {
Map<String, Object> mapData = new HashMap<>(3);
Object data = null;
/**
* 业务处理
* data = xxx;
*/
Thread.sleep(10000);
// 返回结果
mapData.put("type", "methodBResult");
mapData.put("data", data);
return mapData;
}
private static Map<String, Object> methodA() throws InterruptedException {
Map<String, Object> mapData = new HashMap<>(3);
Object data = null;
/**
* 业务处理
* data = xxx;
*/
Thread.sleep(20000);
// 返回结果
mapData.put("type", "methodAResult");
mapData.put("data", data);
return mapData;
}
2 工作过程:
2.1 封装线程:
声明线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2.1.1 Future 提交任务:
Callable<Map<String, Object>> commonUseQuatoCall = () -> testGetFutureMap("param");
Future<Map<String, Object>> submitcommonCall = executorService.submit(commonUseQuatoCall);
调用 AbstractExecutorService.submit(Callable task)
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();// 回调为空抛出异常
RunnableFuture<T> ftask = newTaskFor(task);// 包装回调
execute(ftask);// 开启线程执行ftask 的任务
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
private static final int NEW = 0;
2.1.2 ExecutorCompletionService 提交任务:
ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executorService);
completionService.submit(() -> {
return methodA();
});
调用 ExecutorCompletionService.submit:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
// 使用 QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 在执行任务获取获取结果后调用
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
2.2 线程执行:
ThreadPoolExecutor. execute(Runnable command):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {// 当前工作的线程数小于声明的核心现车数
if (addWorker(command, true))// 添加任务
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {// 将任务添加到 BlockingQueue 队列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker(command, true):
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();// 线程执行
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.3 线程执行的结果数据填充:
FutureTask.run():
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 调用目标方法并阻塞获的获取执行结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)// 获取结果后设置结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;// 赋值结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
done() 方法 :将结果放入到 ExecutorCompletionService 下BlockingQueue<Future> completionQueue 中
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 在执行任务获取获取结果后调用
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
2.4 获取线程执行结果:
2.4.1 future 获取结果
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
2.4.2 ExecutorCompletionService 获取结果:
Map<String, Object> oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);
ExecutorCompletionService 下take() 方法:
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
// 获取FutureTask的结果
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
3 过程总结:
3.1 线程异步任务的执行,通过Callable 封装目标方法,通过FutureTask 发起线程完成任务的执行;执行完成将结果放入到FutureTask 的outcome中;再通过FutureTask获取线程异步执行的结果;
3.2 ExecutorCompletionService 通过将线程返回的结果放入到一个队列中,然后在从队列中获取到结果,使用ExecutorCompletionService时,需要注意每次从队列中获取结果后,将改结果从队列中移除,否则改队列中元素的容量会越来越大;
更多推荐
所有评论(0)