当前位置:   article > 正文

多个线程调用FutureTask导致我们写的Callable的call方法只执行了一次(原因:源码详解)(解决问题:详解)_futuretask callable设计模式

futuretask callable设计模式

首先我们知道FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

所以他就是一个适配类,负责在new Thread()中可以使用Callable。用的适配器设计模式的思想

好,我们看下面这段代码:

  1. public class FutureTaskTest {
  2. public static void main(String[] args) throws Exception {
  3. FutureTask<String> task=new FutureTask<>(()->{
  4. System.out.println("run...");
  5. System.out.println(Thread.currentThread().getName());
  6. TimeUnit.SECONDS.sleep(1);
  7. return "小贱哥哥";
  8. });
  9. new Thread(task).start();
  10. new Thread(task).start();
  11. System.out.println(task.get());
  12. }
  13. }

运行结果:

  1. run...
  2. Thread-0
  3. 小贱哥哥

到这里,我们发现了一个问题,我明明用了两个线程去执行,为什么只输出一次结果?

这是怎么回事呢?

其实问题很简单,我们先看看我们写的代码依次对照源码进行解析:

  1. FutureTask<String> task=new FutureTask<>(()->{
  2. System.out.println("run...");
  3. System.out.println(Thread.currentThread().getName());
  4. TimeUnit.SECONDS.sleep(1);
  5. return "小贱哥哥";
  6. });

我们new 了一个FutureTask,对应的FutureTask内部执行了:

  1. public FutureTask(Callable<V> callable) {
  2. if (callable == null)
  3. throw new NullPointerException();
  4. this.callable = callable;
  5. this.state = NEW; // ensure visibility of callable
  6. }

我们注意一下:this.state = NEW; 和 this.callable = callable;

然后我们就开始执行了:

  1. new Thread(task).start();
  2. new Thread(task).start();

我们先思考一个问题,既然FutureTask间接的实现了Runnable接口,所以FutureTask里面肯定实现重写了run方法,所以线程启动执行,还是走的FutureTask的run方法,好,我们来看看他的run方法:

  1. public void run() {
  2. if (state != NEW ||
  3. !RUNNER.compareAndSet(this, null, Thread.currentThread()))
  4. return;
  5. try {
  6. Callable<V> c = callable;
  7. if (c != null && state == NEW) {
  8. V result;
  9. boolean ran;
  10. try {
  11. result = c.call();
  12. ran = true;
  13. } catch (Throwable ex) {
  14. result = null;
  15. ran = false;
  16. setException(ex);
  17. }
  18. if (ran)
  19. set(result);
  20. }
  21. } finally {
  22. // runner must be non-null until state is settled to
  23. // prevent concurrent calls to run()
  24. runner = null;
  25. // state must be re-read after nulling runner to prevent
  26. // leaked interrupts
  27. int s = state;
  28. if (s >= INTERRUPTING)
  29. handlePossibleCancellationInterrupt(s);
  30. }
  31. }

我们看到只有当state == NEW和callable != null时,才会有可能执行到call方法( result = c.call(); )。

(这个c.call();方法的内容就是我们之前写的内容)(这里用了lambda表达式):

  1. FutureTask<String> task=new FutureTask<>(()->{
  2. System.out.println("run...");
  3. System.out.println(Thread.currentThread().getName());
  4. TimeUnit.SECONDS.sleep(1);
  5. return "小贱哥哥";
  6. });

好,到这,我们继续看,因为我们的程序是属于正常运行到结束的,所以会继续往下面走,一直走到set方法(还是上面的run方法):

  1. public void run() {
  2. ......
  3. if (ran)
  4. set(result);
  5. ......
  6. }

那我们来看看set方法:

  1. protected void set(V v) {
  2. if (STATE.compareAndSet(this, NEW, COMPLETING)) {
  3. outcome = v;
  4. STATE.setRelease(this, NORMAL); // final state
  5. finishCompletion();
  6. }
  7. }

发现STATE.setRelease(this, NORMAL);他把state改为了NORMAL。

继续再set方法中往下看到finishCompletion方法,点进去,我们发现:

  1. private void finishCompletion() {
  2. // assert state > COMPLETING;
  3. for (WaitNode q; (q = waiters) != null;) {
  4. if (WAITERS.weakCompareAndSet(this, q, null)) {
  5. for (;;) {
  6. Thread t = q.thread;
  7. if (t != null) {
  8. q.thread = null;
  9. LockSupport.unpark(t);
  10. }
  11. WaitNode next = q.next;
  12. if (next == null)
  13. break;
  14. q.next = null; // unlink to help gc
  15. q = next;
  16. }
  17. break;
  18. }
  19. }
  20. done();
  21. callable = null; // to reduce footprint
  22. }

这个方法最后把我们的callable改为了null

好,到这里一切就清楚了。

第一个线程执行的时候FutureTask的state已经不是NEW了,callable变为null了。又因为我们两个线程用的FutureTask的同一个实例,所以第二个线程执行的时候,就直接到了这里return了:

  1. public void run() {
  2. if (state != NEW ||
  3. !RUNNER.compareAndSet(this, null, Thread.currentThread()))
  4. return;
  5. .......
  6. }

所以这也就是为什么只执行了一次的原因。

我们不想这样,怎么修改这种情况呢?

其实就是想办法让state的值为NEW,callable值不为null

方法一:再重新new一个FutureTask:

((因为重新new的时候,他会走FutureTask的构造方法把state改为NEW,并重新赋值callable)前面已经说了这个东西:this.state = NEW; 和 this.callable = callable;

  1. public class FutureTaskTest {
  2. public static void main(String[] args) throws Exception{
  3. final int[] i={0};
  4. Callable callable=()->{
  5. System.out.println("run...");
  6. System.out.println(Thread.currentThread().getName());
  7. TimeUnit.SECONDS.sleep(1);
  8. return "小贱哥哥"+(++i[0]);
  9. };
  10. FutureTask<String> task1=new FutureTask<>(callable);
  11. FutureTask<String> task2=new FutureTask<>(callable);
  12. new Thread(task1).start();
  13. new Thread(task2).start();
  14. System.out.println(task1.get());
  15. System.out.println(task2.get());
  16. }
  17. }

这次的结果就是:

  1. run...
  2. Thread-1
  3. run...
  4. Thread-0
  5. 小贱哥哥2
  6. 小贱哥哥1

方法二:先思考:FutureTask的run()方法因为FutureTask的state不为NEWFutureTask的callable为null导致的,我们有没有什么办法,把这两个值改了呢?我们看源码:

  1. private volatile int state;
  2. private Callable<V> callable;

发现都是私有的,我们没法直接调用修改,怎么办呢?(答案:利用反射

  1. public class FutureTaskTest {
  2. public static void main(String[] args) throws Exception {
  3. final int[] i = {0};
  4. FutureTask<String> task=new FutureTask<>(()->{
  5. System.out.println("run...");
  6. System.out.println(Thread.currentThread().getName());
  7. TimeUnit.SECONDS.sleep(1);
  8. return "小贱哥哥"+(++i[0]);
  9. });
  10. new Thread(task).start();
  11. Field callable = task.getClass().getDeclaredField("callable");
  12. callable.setAccessible(true);
  13. Object call = callable.get(task);
  14. System.out.println(task.get());
  15. Field state = task.getClass().getDeclaredField("state");
  16. state.setAccessible(true);
  17. state.set(task,0);
  18. callable.set(task,call);
  19. new Thread(task).start();
  20. System.out.println(task.get());
  21. }
  22. }

结果:

  1. run...
  2. Thread-0
  3. 小贱哥哥1
  4. run...
  5. Thread-1
  6. 小贱哥哥2

你可能会对上面的state.set(task,0);产生疑惑,为什么要设置成0呀?

所以我们还是得看源码:

  1. private volatile int state;
  2. private static final int NEW = 0; //任务新建和执行中
  3. private static final int COMPLETING = 1; //任务将要执行完毕
  4. private static final int NORMAL = 2; //任务正常执行结束
  5. private static final int EXCEPTIONAL = 3; //任务异常
  6. private static final int CANCELLED = 4; //任务取消
  7. private static final int INTERRUPTING = 5; //任务线程即将被中断
  8. private static final int INTERRUPTED = 6; //任务线程已中断

想必,现在你应该很透彻了。

好了,最后说一句,人家是故意这样开发的,其目的就是为了提高效率,因为对于返回值总是不变的,我们总是要考虑把他缓存一下,下一次的时候,直接把值返回出去,就不用再进行计算了,大大的提高了效率。

希望对你有帮助!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/197742
推荐阅读
相关标签
  

闽ICP备14008679号