素材巴巴 > 程序开发 >

Fork-Join讲解

程序开发 2023-09-22 16:33:57

1.fork-join概括

fork-join使用的是一种分而治之的一种思想模式,大数据里面也用得很多这种思想比如常见的MapReduce编程模型,分而治之就是把一个很大的任务拆分成很多个小任务并且小任务之间毫无关联,这个就是分而治之,与之相关的还有一个动态规划,动态规划也是拆分很多个小任务,但是与分而治之不同的是他的小任务之间有依赖关联,听了这些小伙伴们应该了解fork-join框架是个什么了吧,fork-join本质上来说就是一个递归,把一个大任务利用递归拆分成很多小任务,任务处理完毕,在使用递归一级一级的往上面提交,有点消耗性能。

2.fork-join内部执行规划概括

一般使用fork-join是需要在合适的场景下使用的,比如数据量很大的一个统计,数据量很大的一个排序,等一些需要很大的计算能力的地方都可以使用fork-join,但是一些比如我只有1000个数据进行求和那就没必要用fork-join了,因为fork-join是采用递归的特别消耗性能,还有一些线程并行结果数据统计,运行的时间肯定会比单线程去进行1000个数据求和慢得多,需要自适应场景

forjoin内部并行处理线程数:根据电脑的CPU逻辑处理器个数来定的

注意:内部采用多线程并行处理执行,并不是单线程执行

ForJoinPool具体参数也是可以自己设置的:

parallelism:设置线程数量

在这里插入图片描述
下面介绍几个ForkJoinPool常用方法跟任务类:

invoke:跟主线程同步执行,他不代表ForkJoin内部线程采用同步执行,就是主线程执行到这一步阻塞,必须要等invoke执行完毕再往下面继续执行,有返回结果集

execute:不会跟主线程同步执行,就是主线程执行到这一步不会阻塞,可以继续往下面执行,没有返回结果集

submit:不会跟主线程同步执行,就是主线程执行到这一步不会阻塞,可以继续往下面执行,有任务返回结果集

任务的join方法:等待当前任务执行完毕,执行完毕才可以继续往下执行

RecursiveTask:ForkJoin的递归任务逻辑处理抽象类,继承了他重新compute计算方法实现自己的递归拆分逻辑,有返回结果集

RecursiveAction :ForkJoin的递归任务逻辑处理抽象类,继承了他重新compute计算方法实现自己的递归拆分逻辑,无返回结果集,适合使用execute启动使用


实战前准备实战的生成数组:

public class MakeArray {//数组最大长度public static int arrayLength=3000000;//任务拆分最小单位,拆分到这个单位就不能拆分了,即阈值public static int splitMixThreshold=(arrayLength/10);public static int[] make(){int[] origin=new int[arrayLength];Random random=new Random();for (int i = 0; i < arrayLength; ++i) {origin[i]=random.nextInt(arrayLength*3);}return origin;}}

3.fork-join实战实现数组累加

具体思路:

1.把随机生成3亿长度的数组拆分到最小单位,即上面数组生成中设置的阈值,(3亿除于10=30000000)一个数组长度,可以自己设置

2.符合阈值内的长度的数组直接用for循环累加运算

3.不符合阈值内的长度继续使用递归拆分,并且往上递交累加结果集

具体代码:

public class SumForkJoin {/** 创建ForkJoin框架  递归进入任务拆分,递归出来任务结果累加 **/static class SumTask extends RecursiveTask{int[] recursiveArray;//划分开始位置int form;//划分结束位置int to;public SumTask(int[] recursiveArray, int form, int to) {this.recursiveArray = recursiveArray;this.form = form;this.to = to;}@Overrideprotected Integer compute() {//判断当前数组最小长度阈值,没有达到继续拆分if((to-form)< MakeArray.splitMixThreshold){//满足阈值的数组处理,累加int count= 0;for(int i=form;i

我还特意写了个单线程执行累加的,让小伙伴们明白,fork-join框架不是随意什么地方都可以使用的。

单线程累加具体代码:

public class MainSum {public static void main(String[] args) {int[] array= MakeArray.make();int sum=0;System.out.println("开始计时!");long recordTime=System.currentTimeMillis();for (int i = 0; i < array.length; ++i) {sum+=array[i];}System.out.println("数组累加最终结果:"+sum+",耗时时长:"+(System.currentTimeMillis()-recordTime)+"ms");}}
 

单线程累加 VS ForkJoin并行线程累加

单线程启动:

在这里插入图片描述

ForkJoin并行线程启动:

在这里插入图片描述

4.fork-join实战异步读取磁盘文件

本次采用execute异步来启动ForkJoinPool,小伙伴们可以看启动结果跟invoke启动的区别

具体思路:

1.首先拿到我当前这个磁盘下的所有文件遍历循环

2.循环中判断文件是否是目录,如果是目录再次使用递归任务进行任务添加

3.如果不是目录就输出文件路径

4.对添加到目录递归任务集合中的数据再次进行递归

具体代码:

public class main {//读取文件任务对象static class FileTask extends RecursiveAction {private File file;public FileTask(File file) {this.file = file;}@Overrideprotected void compute() {List fileTasks=new LinkedList<>();File[] files=file.listFiles();if(files!=null){for (File file:files){//如果内部文件还是目录再次进行递归读取if(file.isDirectory()){fileTasks.add(new FileTask(file));}else{System.out.println(file.getAbsolutePath());}}//递归读取目录中的文件,把任务丢进去执行,然后这里进行使用join进行等待完成for (FileTask fileTask:invokeAll(fileTasks)){fileTask.join();}}}}public static void main(String[] args) {ForkJoinPool forkJoinPool=new ForkJoinPool();FileTask fileTask=new FileTask(new File("E:/"));//采用异步提交让主线程可以做其他的事情forkJoinPool.execute(fileTask);System.out.println("主线程还可以处理其他的事件");for (int i = 0; i < 3; ++i) {System.out.println("主线程吃了"+(i+1)+"碗饭");}//进入阻塞fileTask.join();System.out.println("所以线程执行完毕!");}}

执行结果:

画红框的表示执行到forkJoinPool.execute不会阻塞还可以继续往下面执行

在这里插入图片描述

5.fork-join实战数组归并排序(升序)

具体思路:

1.把随机生成3亿长度的数组拆分到最小单位,即上面数组生成中设置的阈值,(3亿除于10=30000000)一个数组长度,可以自己设置

2.符合阈值内的长度的数组直接采用(历史游走推箱子的方式比较)

3.不符合阈值内的长度继续使用递归拆分,并且使用(数组交叉比较合并)往上递交累加结果集
具体给小伙伴们说一下(历史游走推箱子的方式比较)跟(数组交叉比较合并)
历史游走推箱子的方式比较:

就跟小时候在大头电视机玩的推箱子游戏一样,只不过这个需要比较性质的推箱子
首先拿出目标数据的下一个数据往前进行比较,大于目标数据给他推到右边去

具体解析:

在这里插入图片描述
数组交叉比较合并:就是把左边数值跟右边数值交叉比较的一个过程

在这里插入图片描述

具体代码:

public class ForkJoinSort {static class SortTask extends RecursiveTask{int[] recursionArray;public SortTask(int[] recursionArray) {this.recursionArray = recursionArray;}/*** 采用历史游走推箱子比较* 比较目标跟下一个目标,下一个目标还会取跟上一个目标比较* 直到历史目标小于下一个目标就结束比较*@Param [array]*@return int[]*/public int[] sokobanAsc(int[] array){int nextTarget;for (int i = 0; i < array.length-1; ++i) {//历史指针序列int historyPointer=i;nextTarget=array[i+1];//游走已经比较过的,如果大于目前数值就把历史数据推箱子推给下一位while (historyPointer>=0 && nextTarget < array[historyPointer]){//如果之前数值的大于nextTarget这个数值,就把历史大于数据往前面移array[historyPointer+1]=array[historyPointer];--historyPointer;}//把nextTarget推入正确位置,跟推箱子游戏有些类似array[historyPointer+1]=nextTarget;}return array;}/*** 交叉比较合并* 左边数组跟右边数组进行交叉比较大小然后插入结果集*@Param [left, right]*@return int[]*/public int[] forkCompareMerge(int[] left,int[] right){int[] merge=new int[left.length+right.length];for (int mergeIndex = 0,leftIndex = 0, rightIndex = 0; mergeIndex < merge.length; ++mergeIndex) {if(leftIndex>=left.length){//消除多余比较值,如果左边数组比较完了,循环还是进来了,说明右边数组有多余值,直接把右边的值填入合并数组merge[mergeIndex]=right[rightIndex++];}else if(rightIndex>=right.length){//消除多余比较值,如果右边数组比较完了,循环还是进来了,说明左边数组有多余值,直接把左边的值填入合并数组merge[mergeIndex]=left[leftIndex++];}else if(left[leftIndex]>right[rightIndex]){//拿左边数组值循环取比较右边数组值,大于,我就复制右边数组的值merge[mergeIndex]=right[rightIndex++];}else{//拿左边数组值循环取比较右边数组值,小于,我就复制左边数组的值merge[mergeIndex]=left[leftIndex++];}}return merge;}@Overrideprotected int[] compute() {if(recursionArray.length<= MakeArray.splitMixThreshold){return sokobanAsc(recursionArray);}else{int half=recursionArray.length/2;SortTask left=new SortTask(Arrays.copyOfRange(recursionArray,0,half));SortTask right=new SortTask(Arrays.copyOfRange(recursionArray,half,recursionArray.length));invokeAll(left,right);return forkCompareMerge(left.join(),right.join());}}}public static void main(String[] args) {ForkJoinPool forkJoinPool=new ForkJoinPool();SortTask sortTask=new SortTask(MakeArray.make());long recordTime=System.currentTimeMillis();int[] join=forkJoinPool.invoke(sortTask);System.out.println("数组升序总用时:"+(System.currentTimeMillis()-recordTime));System.out.println(join);}}
 

执行结果:

在这里插入图片描述

6.手写fork-join内部使用的工作密取

首先手写之前让小伙伴们知道,工作密取是个什么东西

工作密取:比如我有一个线程A,线程A执行需要累加计算1个100次方,线程B执行需要累加计算80个100次方,那现在就会出现这种情况线程A的计算量比较小,线程B的计算量比较大,就会导致线程A提取计算完毕了处于空闲状态,线程B还在计算中,为了充分利用CPU处理器就出来了一个工作密取,处于空闲状态的线程A会从线程A的尾巴去拿去一个任务执行,就跟小偷一样哈哈

在这里插入图片描述

具体思路:

1.采用LinkedBlockingDeque(双向链阻塞队列)进行多线程通信拿取任务

2.使用死循环产生并发模拟消费线程从生产线程的队列尾巴拿去任务,为了确保可以消费线程可以执行特意加入了随机布尔跟随机生成0~5的循环数

3.启动4个线程,并且让每4个线程的队列互相依赖产生并发情况,如果当前producers队列为空的话我就判断当前这个线程是消费线程,让他密取生产线程的任务


这里用到了LinkedBlockingDeque,那我也就大致的给小伙伴们概括一下吧

LinkedBlockingDeque:可以双端的拿取跟双端的删除队列,队列最大容量为int最大数
(Integer.MAX_VALUE);

线程安全:因为使用了独占锁比如(synchronized),双端每次只能有一端进入操作

阻塞情况:

1.新增的情况:新增线程如果队列容量达到了(Integer.MAX_VALUE)最大直接阻塞,等待删除线程进行删除操作删除成功,再次唤醒新增线程进行操作

2.读取的情况:读取线程如果读取当前队列,队列中没有元素,就在进入阻塞,等待新增线程进行新增元素操作新增成功,再次唤醒读取线程进行操作

常用的几个方法:

takeFirst:从头部读取,读取完队列中会减去读取的这个元素

taktakeLast:从尾部读取,读取完队列中会减去读取的这个元素

putFirst:从头部添加元素

putLast:从尾部添加元素


具体代码:

public class main {//定义任务对象static class workTask {public void execution(String name){System.out.println("任务执行完毕,"+name);}}//定义消费者跟生成者对象,用来模拟工作密取static class consumersAndProducers implements Runnable{LinkedBlockingDeque producers;LinkedBlockingDeque consumers;public consumersAndProducers(LinkedBlockingDeque producers, LinkedBlockingDeque consumers) {this.producers = producers;this.consumers = consumers;}@Overridepublic void run() {try {while (true){Random random=new Random();//创建随机bool实现工作密取控制if(random.nextBoolean()){for (int i = 0; i < random.nextInt(5); ++i) {producers.putLast(new workTask());}}//如果生成队列为空,就让消费线程密取生成队列的任务if(producers.isEmpty()){if(!consumers.isEmpty()){consumers.takeLast().execution("由消费线程密取从尾巴拿执行");}}else{producers.takeFirst().execution("由生产线程正常从头部拿执行");}}} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {//创建双向链阻塞队列,因为他可以实现工作密取的功能ForkJoin内部用的也是他LinkedBlockingDeque producers=new LinkedBlockingDeque<>();LinkedBlockingDeque consumers=new LinkedBlockingDeque<>();//下面这四个线程主要实现线程并发//进入if(!consumers.isEmpty())这个工作密取判断时,把消费队列变成生产队列//实现消费线程帮生产线程执行任务,就从尾巴拿执行new Thread(new consumersAndProducers(producers,consumers)).start();new Thread(new consumersAndProducers(producers,consumers)).start();new Thread(new consumersAndProducers(consumers,producers)).start();new Thread(new consumersAndProducers(consumers,producers)).start();}}

执行结果:

在这里插入图片描述


标签:

素材巴巴 Copyright © 2013-2021 http://www.sucaibaba.com/. Some Rights Reserved. 备案号:备案中。