Frok/Join框架概述

Frok/Join框架概述Fork Join 框架是 Java7 提供了的一个用于并行执行的任务的框架 是一个把大任务分割成若干个小任务 最终汇总每个小任务结果后得到大任务结果的框架

大家好,欢迎来到IT知识分享网。

Frok/Join框架简介

Fork/Join框架是Java7提供了的一个用于并行执行的任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架

​ Fork递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。

​ Join 将所有子任务的结果递归地连接成单个结果,或者在返回void的任务的情况下,程序只是等待每个子任务执行完毕。

Fork/Join的特性:

1、ForkJoinPool是 ExecutorService的补充,在某些应用场景下性能比ExecutorService更好

2、ForkJoinPool主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等;

3、ForkJoinPool 最适合的是计算密集型的任务

4、ForkJoinPool的宗旨是使用少量的线程来处理大量的任务,而CPU密集型任务,当一个大任务分解成多个子任务后,多个线程获取到多个处理器的时间分片,可以并行的执行子任务。

Fork/Join框架原理

Fork/Join框架其实就是指由ForkJoinPool作为线程池、ForkJoinTask为任务、ForkJoinWorkerThread作为执行任务的具体线程实体这三者构成的任务调度机制。

ForkJoinPool线程池

ForkJoinPool是fork/join框架的核心,是ExecutorService的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。工作线程一次只能执行一个任务。ForkJoinPool线程池中的每个线程都有自己的双端队列用于存储任务(double-ended queue 或 deque),这种架构使用了一种名为工作窃取(work-stealing)算法来平衡线程的工作负载。

工作窃取(work-stealing)算法

工作窃取(work-stealing)算法:空闲的线程试图从繁忙线程的deques中窃取工作

默认情况下,每个工作线程从其自己的双端队列中获取任务。但如果自己的双端队列中的任务已经执行完毕,双端队列为空,工作线程就会从另一个忙线程的双端队列头部全局入口队列中获取任务,因为这是最大概率可能找到工作的地方。

通常会使用双端队列,被窃取任务线程永远从双端队列的尾部拿任务执行,而窃取任务线程永远从双端队列的头部拿任务执行。

优缺点

​ 优点:是充分利用线程进行并行计算,并减少了线程间的竞争。

​ 缺点:是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

说明:

1、ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。

2、每个工作线程在运行中产生新的任务(调用了 fork())时,会放入工作队列的队尾工作线程每次使用LIFO方式获取任务,也就是说每次从队尾取出任务来执行

3、每个工作线程在处理自己的工作队列过程中,同时会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在工作的过程中,使用FIFO方式去窃取其他工作线程的任务

4、在遇到 join() 通知时,当前需要 join 的任务尚未完成,则会先处理其他任务,直到目标的任务方法被告知已经结束(通过isDone()方法),所有的任务都是无阻塞的完成。(类似协作通知)

5、在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

ForkJoinWorkerThread

​ ForkJoinWorkerThread直接继承了Thread,但只是增加了额外的功能。ForkJoinWorkerThread是被ForkJoinPool管理的工作线程,由它来执行ForkJoinTask。ForkJoinWorkerThread 依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。

ForkJoinTask

​ ForkJoinTask 是 ForkJoinPool线程之中执行的任务的基本类型,实现了Future接口

使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,并实现任务重写 compute() 方法。Fork/Join框架提供类以下几个子类:

1、RecursiveAction:用于没有返回结果的任务。

2、RecursiveTask:用于有返回结果的任务。

常用方法

1、fork()方法:把任务推入当前工作线程的工作队列里,不保证任务的顺序 (安排任务异步执行)

2、join()方法:等待执行结果,不保证任务的顺序

3、invoke() 方法 立即执行任务,并等待返回结果

4、invokeAll()方法 批量执行任务,并等待它们执行结束,保证任务的顺序

join() 的工作过程:

1、检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞

2、查看任务的完成状态,如果已经完成,直接返回结果。

3、如果任务尚未完成,但处于自己的工作队列内,则调用doExec()完成它。

4、如果任务不再当前队列的尾部位置,(已经被其他的工作线程偷走)。调用awaitJoin(w, this, 0L)窃取这个小偷的工作队列内的任务(实际上是以 FIFO 方式)

5、如果偷走任务的小偷(Thread)也已经把自己的任务全部做完,正在等待需要 Join 的任务时,则找到小偷的小偷,帮助它完成它的任务。

private int doJoin() { 
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //<0 已完成(包含任务取消、任务异常) return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } 

Fork/Join的使用

//最佳应用场景:多核、多内存、可以分割计算再合并的计算密集型任务

public class TaskDemo extends RecursiveTask<Integer> { 
    private static final int THRESHOLD = 100;//阈值 private int from; private int to; public TaskDemo(int from, int to) { 
    super(); this.from = from; this.to = to; } @Override protected Integer compute() { 
    //判断任务大小是否合适(是否到达范围) if (THRESHOLD > (to - from)) { 
    //若是,则进行汇总统计 return IntStream.range(from, to + 1) .reduce((a, b) -> a + b) .getAsInt(); } else { 
   //否则继续拆分 int forkNumber = (from + to) / 2; System.out.println(String.format("拆分%d - %d ==> %d ~ %d, %d~%d",from,to,from,forkNumber,forkNumber+1,to)); TaskDemo left = new TaskDemo(from, forkNumber); TaskDemo right = new TaskDemo(forkNumber + 1, to); // fork()方法:将子任务放入队列并安排异步执行 left.fork(); right.fork(); // invokeAll(left,right); //分别拿到两个子任务的值 return left.join() + right.join();//阻塞当前线程并等待获取结果 } } public static void main(String[] args) throws ExecutionException, InterruptedException { 
    //在 Java 8 中,创建 ForkJoinPool 实例的最简单的方式就是使用其静态方法。commonPool()提供了对公共池的引用,公共池是每个 ForkJoinTask 的默认线程池。 //使用预定义的公共池可以减少资源消耗,因为它会阻止每个任务创建一个单独的线程池。 ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); ForkJoinTask<Integer> result = forkJoinPool.submit(new TaskDemo(1, 1000)); System.out.println("计算结果为"+result.get()); forkJoinPool.shutdown(); } } 

总结

ForkJoinPool很适合执行计算密集型的任务(拆分大任务再汇总小任务计算结果),若拆分逻辑比计算逻辑还要复杂时,ForkJoinPool并不会带来性能的提升,反而可能会起到负面作用。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/130044.html

(0)
上一篇 2025-08-19 20:15
下一篇 2025-08-19 20:20

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信