并发编程系列-分而治之思想Forkjoin

2023-09-20 15:58:24

我们介绍过一些有关并发编程的工具和概念,包括线程池、Future、CompletableFuture和CompletionService。如果仔细观察,你会发现这些工具实际上是帮助我们从任务的角度来解决并发问题的,而不是让我们陷入线程之间如何协作的繁琐细节(比如等待和通知等)。

对于简单的并行任务,你可以使用“线程池+Future”的方式来处理。而对于任务之间存在聚合关系的情况,无论是AND聚合还是OR聚合,你都可以利用CompletableFuture来解决。至于批量的并行任务,则可以借助CompletionService来实现。

我们一直强调,并发编程可以从三个层面来思考,分别是分工、协作和互斥。当你关注于任务本身时,你会发现自己的思维模式已经超越了并发编程的技术细节,更加贴近了现实世界中的工作方式。因此,我将线程池、Future、CompletableFuture和CompletionService都归类到了“分工”这个层面。

下面我将通过现实世界里的工作流程图来描述并发编程领域中的简单并行任务、聚合任务和批量并行任务。相信通过这些图示,你能够更好地将自己的思维模式与现实世界联系起来。

alt

在前面提到的简单并行任务、聚合任务和批量并行任务模型之外,还有一种任务模型被称为“分治”。如字面意义所示,分治是一种解决复杂问题的思维方法和模式;具体而言,它将一个复杂的问题分解成多个相似的子问题,然后再将这些子问题进一步分解成更小的子问题,直到每个子问题变得足够简单从而可以直接求解。

从理论上讲,每个问题都对应着一个任务,因此分治实际上就是对任务的划分和组织。分治思想在许多领域都有广泛的应用。例如,在算法领域,我们经常使用分治算法来解决问题(如归并排序和快速排序都属于分治算法,二分查找也是一种分治算法)。在大数据领域,MapReduce计算框架背后的思想也是基于分治。

由于分治这种任务模型的普遍性,Java并发包提供了一种名为Fork/Join的并行计算框架,专门用于支持分治任务模型的应用。

分治任务模型

这里你需要先深入了解一下分治任务模型,分治任务模型可分为两个阶段:一个阶段是 任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是 结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。

alt

简版分治任务模型图

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。

Fork/Join的使用

Fork/Join是一个并行计算框架,主要用于支持分治任务模型。在这个计算框架中,Fork代表任务的分解,而Join代表结果的合并。Fork/Join计算框架主要由两部分组成:分治任务的线程池ForkJoinPool和分治任务ForkJoinTask。这两部分的关系类似于ThreadPoolExecutor和Runnable之间的关系,都是用于提交任务到线程池的,只不过分治任务有自己独特的类型ForkJoinTask。

ForkJoinTask是一个抽象类,其中有许多方法,其中最核心的是fork()方法和join()方法。fork()方法用于异步执行一个子任务,而join()方法通过阻塞当前线程来等待子任务的执行结果。ForkJoinTask有两个子类:RecursiveAction和RecursiveTask。从它们的名字就可以看出,它们都使用递归的方式处理分治任务。这两个子类都定义了一个抽象方法compute(),不同之处在于RecursiveAction的compute()方法没有返回值,而RecursiveTask的compute()方法有返回值。这两个子类也都是抽象类,在使用时需要创建自定义的子类来扩展功能。

接下来,让我们来实现一下如何使用Fork/Join并行计算框架来计算斐波那契数列(下面的代码示例源自Java官方示例)。首先,我们需要创建一个ForkJoinPool线程池以及一个用于计算斐波那契数列的Fibonacci分治任务。然后,通过调用ForkJoinPool线程池的invoke()方法来启动分治任务。由于计算斐波那契数列需要返回结果,所以我们的Fibonacci类继承自RecursiveTask。Fibonacci分治任务需要实现compute()方法,在这个方法中,逻辑与普通计算斐波那契数列的方法非常相似,只是在计算Fibonacci(n - 1)时使用了异步子任务,这通过f1.fork()语句来实现。

static void main(String[] args){
  //创建分治任务线程池
  ForkJoinPool fjp =
    new ForkJoinPool(4);
  //创建分治任务
  Fibonacci fib =
    new Fibonacci(30);
  //启动分治任务
  Integer result =
    fjp.invoke(fib);
  //输出结果
  System.out.println(result);
}
//递归任务
static class Fibonacci extends
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 =
      new Fibonacci(n - 1);
    //创建子任务
    f1.fork();
    Fibonacci f2 =
      new Fibonacci(n - 2);
    //等待子任务结果,并合并结果
    return f2.compute() + f1.join();
  }
}

ForkJoinPool工作原理

Fork/Join并行计算的核心组件是ForkJoinPool。下面简单介绍一下ForkJoinPool的工作原理。

与ThreadPoolExecutor类似,ForkJoinPool现的。不同之处在部有多个任务队列,用于生产者和消费者之间的通信。当我们通过ForkJoinPool的invoke()或submit()方法提交任务时,ForkJoinPool根据一定的路由规则将任务分配到一个任务队列中。如果任务执行过程中创建了子任务,那么子任务会被提交到对应工作线程的任务队列中。

当工作线程的任务队列为空时,它是否无事可做呢?不是的。ForkJoinPool引入了一种称为"任务窃取"的机制。当工作线程空闲时,它可以从其他工作线程的任务队列中"窃取"任务。例如,在下图中线程T2的任务队列已经为空,它可以窃取线程T1的任务队列中的任务。这样,所有的工作线程都能保持忙碌状态。

ForkJoinPool中的任务队列采用双端队列的形式。工作线程从任务队列的一个端获取任务,而"窃取任务"则从另一端进行消费。这种设计能够避免许多不必要的数据竞争。我们介绍的是ForkJoinPool的简化原理,实际上它的实现比我们介绍的要复杂得多。如果你对此感兴趣,建议阅读其源码。

alt

ForkJoinPool工作原理图

模拟MapReduce统计单词数量

Fork/Join并行计算框架被用来实现学习MapReduce的入门程序,该程序用于统计文件中每个单词的数量。以下是如何使用Fork/Join并行计算框架实现此功能。

首先,我们可以使用二分法递归地将文件拆分为更小的部分,直到每个部分只有一行数据。然后,在每个部分中统计单词的数量,并逐级汇总结果。你可以参考之前提到的简化版分治任务模型图以理解该过程。

现在,让我们开始实现。下面的示例程序使用字符串数组String[] fc来模拟文件内容,其中每个元素与文件中的行数据一一对应。关键代码位于compute()方法中,这是一个递归方法。它将前半部分数据fork一个递归任务进行处理(关键代码:mr1.fork()),而后半部分数据在当前任务中递归处理(mr2.compute())。

mport java.util.concurrent.RecursiveTask;

public class WordCountTask extends RecursiveTask<Integer> {
    private final String[] fc;
    private final int start, end;
    
    public WordCountTask(String[] fc, int start, int end) {
        this.fc = fc;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        if (end - start <= 1) {
            // 对单行数据进行统计
            return countWords(fc[start]);
        } else {
            int mid = (start + end) / 2;
            WordCountTask mr1 = new WordCountTask(fc, start, mid);
            mr1.fork();
            WordCountTask mr2 = new WordCountTask(fc, mid, end);
            int result2 = mr2.compute();
            int result1 = mr1.join();
            // 汇总结果
            return result1 + result2;
        }
    }
    
    private int countWords(String line) {
        String[] words = line.split(" ");
        return words.length;
    }
}

这个示例程序是对Fork/Join模型的简化,实际上在真正的MapReduce框架中,还涉及到数据划分、映射阶段、归约阶段等更多的步骤。但是通过此示例,你可以初步了解如何使用Fork/Join并行计算框架来处理类似的任务。

总结

Fork/Join并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的MapReduce,所以你可以把Fork/Join看作单机版的MapReduce。

Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8提供的Stream API里面并行流也是以ForkJoinPool为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个ForkJoinPool,这个共享的ForkJoinPool默认的线程数是CPU的核数;如果所有的并行流计算都是CPU密集型计算的话,完全没有问题,但是如果存在I/O密集型的并行流计算,那么很可能会因为一个很慢的I/O计算而拖慢整个系统的性能。所以 建议用不同的ForkJoinPool执行不同类型的计算任务

如果你对ForkJoinPool详细的实现细节感兴趣,也可以参考 Doug Lea的论文。

顶尖架构师栈

关注回复关键字

【C01】超10G后端学习面试资源

【IDEA】最新IDEA激活工具和码及教程

【JetBrains软件名】 最新软件激活工具和码及教程

工具&码&教程

转载于: https://mp.weixin.qq.com/s/baP7S6tA9i_Hgu6RhKcvew

本文由 mdnice 多平台发布

更多推荐

Python Functions-函数

目录创建函数调用函数参数还是自变量?参数数量任意参数,*args关键字参数任意关键字参数,**kwargs默认参数值将列表作为参数传递ThepassStatement递归函数是一个只有在被调用时才运行的代码块。可以将称为参数的数据传递到函数中。函数可以作为结果返回数据。创建函数在Python中,函数是使用def关键字定

线程池:神秘的“轻量级线程”

当前我们的多线程部分已经学习了几个代码案例:1.单例模式2.阻塞队列->生产者消费者模型3.定时器4.线程池而线程存在的意义就是,使用进程来实现并发编程会“太重了”,创建和销毁进程都会比较耗资源。但是线程会更加高效。此时,使用多线程就可以在很多时候代替进程来实现并发编程了。但是随着并发程度的提高,随着我们对于性能要求的

【跟小嘉学 Rust 编程】二十九、Rust 中的零拷贝序列化解决方案(rkyv)

系列文章目录【跟小嘉学Rust编程】一、Rust编程基础【跟小嘉学Rust编程】二、Rust包管理工具使用【跟小嘉学Rust编程】三、Rust的基本程序概念【跟小嘉学Rust编程】四、理解Rust的所有权概念【跟小嘉学Rust编程】五、使用结构体关联结构化数据【跟小嘉学Rust编程】六、枚举和模式匹配【跟小嘉学Rust

STM32H5开发(4)----开发板介绍

STM32H5开发----4.开发板介绍套件概述样品申请特征系统控制和生态系统访问功能示意图系统框图跳线设置开发板原理图套件概述STM32H503RBTx_LQFP64是STM32H5系列微控制器的一款出色评估套件,它采用了先进的40nm工艺制造,为开发者提供了卓越的性能和能效。主频高达250MHz的Arm®Corte

华为云CodeArts产品体验的心得体会及想法

文章目录前言CodeArts的产品优势一站式软件开发生产线研发安全Built-In华为多年研发实践能力及规范外溢高质高效敏捷交付功能特性说明体验感受问题描述完结前言华为云作为一家全球领先的云计算服务提供商,致力于为企业和个人用户提供高效、安全、可靠的云服务。在其众多产品中,CodeArts作为一款全新的开发工具集,为开

【STM32】基础知识 第十一课 sys, delay & usart 文件夹

【STM32】基础知识第十一课sys,delay&usart文件夹sys文件介绍delay文件夹函数简介SysTickSysTick工作原理SysTick寄存器介绍delay_init()函数delay_us()函数usart文件夹介绍printf的使用常用输出控制符表常用转椅字符表半主机模式简介sys文件介绍函数分类

苹果从成熟到落地,Apple Newton 背后的工程师们 | 历史上的今天

整理|王启隆透过「历史上的今天」,从过去看未来,从现在亦可以改变未来。1983年3月的最后一个星期日,史蒂夫·乔布斯(SteveJobs)和时任百事公司总裁约翰·斯卡利(JonSculley)坐在阳台上俯瞰纽约中央公园。在深思熟虑之后,斯卡利对着眼前年轻人说:“我们已经互相了解彼此,但是,史蒂夫,我已经考虑过了,我不会

Linux系统使用(超详细)

目录Linux操作系统简介Linux和windows区别Linux常见命令Linux目录结构Linux命令提示符常用命令lscdpwdtouchcatechomkdirrmcpmvvimvim的基本使用grepnetstatLinux面试题Linux操作系统简介Linux操作系统是和windows操作系统是并列的关系。

STM32H5开发(3)----电源控制&RCC

STM32H5开发----3.电源控制&RCCSTM32H503供电样品申请STM32H562/563/573LDO供电STM32H562/563/573SMPS供电LDO/SMPS供电PWR特性电源电压监测温度监测低功耗模式低功耗模式-SLEEP模式低功耗模式-STOP模式低功耗模式-STANDBY模式低功耗模式监控

2023年中职组“网络安全”赛项吉安市竞赛任务书

2023年中职组“网络安全”赛项吉安市竞赛任务书一、竞赛时间总计:360分钟竞赛阶段竞赛阶段任务阶段竞赛任务竞赛时间分值A模块A-1登录安全加固180分钟200分A-2本地安全策略配置A-3流量完整性保护A-4事件监控A-5服务加固A-6防火墙策略B模块B-1Windows操作系统渗透测试400分B-2隐藏信息探索B-

STM32 ADC基础知识讲解

文章目录前言一、ADC的基本介绍二、STM32ADC讲解1.ADC分辨率2.ADC通道讲解3.ADC转换模式单次转换模式连续转换模式4.扫描模式5.数据对齐方式左对齐右对齐总结前言在正式的学习如何编写ADC代码时我们先来学习一下ADC的基础知识部分,只有掌握好了这些基础知识才能顺利的进行后面的代码编写。一、ADC的基本

热文推荐