背景:在并发编程中,我们可以使用Future来获取子线程执行的结果,然后在主线程一起进行业务处理; 那么Future是如何来工作的;

1 使用:
demo1:使用Future每次都阻塞获取任务的执行结果:

public static void main(String[] args) {
	// 声明线程池
     ExecutorService executorService = Executors.newFixedThreadPool(1);
     // 声明 Callable
     Callable<Map<String, Object>> commonUseQuatoCall = () -> testGetFutureMap("param");
     // 开启一个线程进行业务处理
     Future<Map<String, Object>> submitcommonCall = executorService.submit(commonUseQuatoCall);
     Map<String, Object> commonUseQuatoData = null;
     try {
     	// 阻塞获取结果
         commonUseQuatoData = submitcommonCall.get(50000, TimeUnit.MILLISECONDS);
     }catch (Exception ex){
     }finally {
     	// 最后关闭线程池
         executorService.shutdown();
     }
     if (null != commonUseQuatoData){
         /**
          * do some thing
          */
     }
 }
 
// 业务处理
 private static Map<String, Object> testGetFutureMap(String param) {
     // 处理业务逻辑
     Map<String, Object>  mapData = new HashMap<>();
     /**
      * do some thing
      */
     mapData.put("flag","sucess");
     return mapData;
 }

demo2:使用ExecutorCompletionService 优先处理返回结果最快的任务:

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
    ExecutorService executorService = Executors.newFixedThreadPool(2);

     ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executorService);
     completionService.submit(() -> {
         return methodA();
     });
     completionService.submit(() -> {
         return methodB();
     });
     for (int i = 0; i < 2; i++) {
         // 获得结果并处理
         try {
             Map<String, Object> oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);
             if ("methodAResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {
                 // 方法A 返回的结果
                 System.out.println("\"methodAResult\" = " + "methodAResult");
             }
             if ("methodBResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {
                 // 方法B 返回的结果
                 System.out.println("\"methodBResult\" = " + "methodBResult");
             }
         }catch (Exception ex){
             ex.printStackTrace();
         }

     }

     System.out.println("\"finish\" = " + "finish");
     executorService.shutdown();
 }

 private static Map<String, Object> methodB() throws InterruptedException {
     Map<String, Object> mapData = new HashMap<>(3);
     Object data = null;
     /**
      * 业务处理
      * data = xxx;
      */
     Thread.sleep(10000);
     // 返回结果
     mapData.put("type", "methodBResult");
     mapData.put("data", data);
     return mapData;
 }

 private static Map<String, Object> methodA() throws InterruptedException {
     Map<String, Object> mapData = new HashMap<>(3);
     Object data = null;
     /**
      * 业务处理
      * data = xxx;
      */
     Thread.sleep(20000);
     // 返回结果
     mapData.put("type", "methodAResult");
     mapData.put("data", data);
     return mapData;
 }

2 工作过程:
2.1 封装线程:
声明线程池

ExecutorService executorService = Executors.newFixedThreadPool(1);
public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(nThreads, nThreads,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>());
 }

2.1.1 Future 提交任务:

Callable<Map<String, Object>> commonUseQuatoCall = () -> testGetFutureMap("param");
Future<Map<String, Object>> submitcommonCall = executorService.submit(commonUseQuatoCall);

调用 AbstractExecutorService.submit(Callable task)

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();// 回调为空抛出异常
    RunnableFuture<T> ftask = newTaskFor(task);// 包装回调
    execute(ftask);// 开启线程执行ftask 的任务
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
/**
  * Creates a {@code FutureTask} that will, upon running, execute the
  * given {@code Callable}.
  *
  * @param  callable the callable task
  * @throws NullPointerException if the callable is null
  */
 public FutureTask(Callable<V> callable) {
     if (callable == null)
         throw new NullPointerException();
     this.callable = callable;
     this.state = NEW;       // ensure visibility of callable
 }
private static final int NEW   = 0;

2.1.2 ExecutorCompletionService 提交任务:

 ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executorService);
        completionService.submit(() -> {
            return methodA();
        });

调用 ExecutorCompletionService.submit:

public Future<V> submit(Callable<V> task) {
     if (task == null) throw new NullPointerException();
     RunnableFuture<V> f = newTaskFor(task);
     // 使用 QueueingFuture 
     executor.execute(new QueueingFuture(f));
     return f;
 }
private RunnableFuture<V> newTaskFor(Callable<V> task) {
     if (aes == null)
         return new FutureTask<V>(task);
     else
         return aes.newTaskFor(task);
 }
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {
     QueueingFuture(RunnableFuture<V> task) {
         super(task, null);
         this.task = task;
     }
     // 在执行任务获取获取结果后调用
     protected void done() { completionQueue.add(task); }
     private final Future<V> task;
 }

2.2 线程执行:
ThreadPoolExecutor. execute(Runnable command):

 public void execute(Runnable command) {
   if (command == null)
         throw new NullPointerException();
     int c = ctl.get();
     if (workerCountOf(c) < corePoolSize) {// 当前工作的线程数小于声明的核心现车数
         if (addWorker(command, true))// 添加任务
             return;
         c = ctl.get();
     }
     if (isRunning(c) && workQueue.offer(command)) {// 将任务添加到 BlockingQueue 队列中
         int recheck = ctl.get();
         if (! isRunning(recheck) && remove(command))
             reject(command);
         else if (workerCountOf(recheck) == 0)
             addWorker(null, false);
     }
     else if (!addWorker(command, false))
         reject(command);
 }

addWorker(command, true):

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();// 线程执行
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

2.3 线程执行的结果数据填充:

FutureTask.run():

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
            	// 调用目标方法并阻塞获的获取执行结果
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)// 获取结果后设置结果
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;// 赋值结果
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
private void finishCompletion() {
     // assert state > COMPLETING;
     for (WaitNode q; (q = waiters) != null;) {
         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
             for (;;) {
                 Thread t = q.thread;
                 if (t != null) {
                     q.thread = null;
                     LockSupport.unpark(t);
                 }
                 WaitNode next = q.next;
                 if (next == null)
                     break;
                 q.next = null; // unlink to help gc
                 q = next;
             }
             break;
         }
     }

     done();

     callable = null;        // to reduce footprint
 }

done() 方法 :将结果放入到 ExecutorCompletionService 下BlockingQueue<Future> completionQueue 中

private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {
     QueueingFuture(RunnableFuture<V> task) {
         super(task, null);
         this.task = task;
     }
     // 在执行任务获取获取结果后调用
     protected void done() { completionQueue.add(task); }
     private final Future<V> task;
 }

2.4 获取线程执行结果:
2.4.1 future 获取结果

/**
* @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
/**
 * Returns result or throws exception for completed task.
 *
 * @param s completed state value
 */
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

2.4.2 ExecutorCompletionService 获取结果:

 Map<String, Object> oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);

ExecutorCompletionService 下take() 方法:

public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}
// 获取FutureTask的结果
/**
 * @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

/**
 * @throws CancellationException {@inheritDoc}
 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

3 过程总结:
3.1 线程异步任务的执行,通过Callable 封装目标方法,通过FutureTask 发起线程完成任务的执行;执行完成将结果放入到FutureTask 的outcome中;再通过FutureTask获取线程异步执行的结果;
3.2 ExecutorCompletionService 通过将线程返回的结果放入到一个队列中,然后在从队列中获取到结果,使用ExecutorCompletionService时,需要注意每次从队列中获取结果后,将改结果从队列中移除,否则改队列中元素的容量会越来越大;

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐