阿里云「飞天发布时刻」2024来啦!新产品、新特性、新能力、新方案,等你来探~ 了解详情
写点什么

从命令式编程到 Fork/Join 再到 Java 8 中的并行 Streams

  • 2014-03-20
  • 本文字数:4465 字

    阅读完需:约 15 分钟

Java 8 带来了很多可以使编码更简洁的特性。例如,像下面的代码:

复制代码
Collections.sort(transactions, new Comparator<Transaction>(){
public int compare(Transaction t1, Transaction t2){
return t1.getValue().compareTo(t2.getValue());
}
});

可以用替换为如下更为紧凑的代码,功能相同,但是读上去与问题语句本身更接近了:

复制代码
transactions.sort(comparing(Transaction::getValue));

Java 8 引入的主要特性是 Lambda 表达式、方法引用和新的 Streams API。它被认为是自 20 年前 Java 诞生以来语言方面变化最大的版本。要想通过详细且实际的例子来了解如何从这些特性中获益,可以参考本文作者和 Alan Mycroft 共同编写的《 Java 8 in Action: Lambdas, Streams and Functional-style programming 》一书。

这些特性支持程序员编写更简洁的代码,还使他们能够受益于多核架构。实际上,编写可以优雅地并行执行的程序还是 Java 专家们的特权。然而,借助新的 Streams API,Java 8 改变了这种状况,让每个人都能够更容易地编写利用多核架构的代码。

在这篇文章中,我们将使用以下三种风格,以不同方法计算一个大数据集的方差,并加以对比。

  1. 命令式风格
  2. Fork/Join 框架
  3. Streams API

方差是统计学中的概念,用于度量一组数的偏离程度。方差可以通过对每个数据与平均值之差的平方和求平均值来计算。例如,给定一组表示人口年龄的数:40、30、50 和 80,我们可以这样计算方差:

  1. 计算平均值:(40 + 30 + 50 + 80) / 4 = 50
  2. 计算每个数据与平均值之差的平方和:(40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400
  3. 最后平均:1400/4 = 350

命令式风格

下面是计算方差的一种典型的命令式风格实现:

复制代码
public static double varianceImperative(double[] population){
double average = 0.0;
for(double p: population){
average += p;
}
average /= population.length;
double variance = 0.0;
for(double p: population){
variance += (p - average) * (p - average);
}
return variance/population.length;
}

为什么说这是命令式的呢?我们的实现用 _ 修改状态的语句序列 _ 描述了计算过程。这里,我们显式地对人口年龄数组中的每个元素进行迭代,而且每次迭代时更新 average 和 variance 这两个局部变量。这种代码很适合只有一个 CPU 的硬件架构。确实,它可以非常直接地映射到 CPU 的指令集。

Fork/Join 框架

那么,如何编写适合在多核架构上执行的实现代码呢?应该使用线程吗?这些线程是不是要在某个点上同步?Java 7 引入的 Fork/Join 框架缓解了一些困难,所以让我们使用该框架来开发方差算法的一个并行版本吧。

复制代码
public class ForkJoinCalculator extends RecursiveTask<Double> {
public static final long THRESHOLD = 1_000_000;
private final SequentialCalculator sequentialCalculator;
private final double[] numbers;
private final int start;
private final int end;
public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
this(numbers, 0, numbers.length, sequentialCalculator);
}
private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator
sequentialCalculator) {
this.numbers = numbers;
this.start = start;
this.end = end;
this.sequentialCalculator = sequentialCalculator;
}
@Override
protected Double compute() {
int length = end - start;
if (length <= THRESHOLD) {
return sequentialCalculator.computeSequentially(numbers, start, end);
}
ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2,
sequentialCalculator);
leftTask.fork();
ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end,
sequentialCalculator);
Double rightResult = rightTask.compute();
Double leftResult = leftTask.join();
return leftResult + rightResult;
}
}

这里我们编写了一个 _RecursiveTask_ 类的子类,它对一个 double 数组进行切分,当子数组的长度小于等于给定阈值(THRESHOLD)时停止切分。切分完成后,对子数组进行顺序处理,并将下列接口定义的操作应用于子数组。

复制代码
public interface SequentialCalculator {
double computeSequentially(double[] numbers, int start, int end);
}

利用该基础设施,可以按如下方式并行计算方差。

复制代码
public static double varianceForkJoin(double[] population){
final ForkJoinPool forkJoinPool = new ForkJoinPool();
double total = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double total = 0;
for (int i = start; i < end; i++) {
total += numbers[i];
}
return total;
}
}));
final double average = total / population.length;
double variance = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double variance = 0;
for (int i = start; i < end; i++) {
variance += (numbers[i] - average) * (numbers[i] - average);
}
return variance;
}
}));
return variance / population.length;
}

本质上,即便使用 Fork/Join 框架,相对于顺序版本,并行版本的编写和最后的调试仍然困难许多。

并行 Streams

Java 8 让我们可以以不同的方式解决这个问题。不同于编写代码指出计算如何实现,我们可以使用 Streams API 粗线条地描述让它做什么。作为结果,库能够知道如何为我们实现计算,并施以各种各样的优化。这种风格被称为 _ 声明式编程 _。Java 8 有一个为利用多核架构而专门设计的并行 Stream。我们来看一下如何使用它们来更快地计算方差。

假定读者对本节探讨的 Stream 有些了解。作为复习,Stream是 T 类型元素的一个序列,支持聚合操作。我们可以使用这些操作来创建表示计算的一个管道(pipeline)。这里的管道和 UNIX 的命令管道一样。并行 Stream 就是一个可以并行执行管道的 Stream,可以通过在普通的 Stream 上调用 parallel() 方法获得。要复习 Stream,可以参考 Javadoc 文档

好消息是,Java 8 API 内建了一些算术操作,如 max、min 和 average。我们可以使用 Stream 的几种基本类型特化形式来访问前面几个方法:IntStream(int 类型元素)、LongStream(long 类型元素)和 DoubleStream(double 类型元素)。例如,可以使用 IntStream.rangeClosed() 创建一系列数,然后使用 max() 和 min() 方法计算 Stream 中的最大元素和最小元素。

回到最初的问题,我们想使用这些操作来计算一个规模较大的人口年龄数据的方差。第一步是从人口年龄数组创建一个 Stream,可以通过 Arrays.stream() 静态方法实现:

复制代码
DoubleStream populationStream = Arrays.stream(population).parallel();

我们可以使用 DoubleStream 所支持的 average() 方法:

复制代码
double average = populationStream.average().orElse(0.0);

下一步是使用 average 计算方差。人口年龄中的每个元素首先需要减去平均值,然后计算差的平方。可以将其视作一个 Map 操作:使用一个 Lambda 表达式 (double p) -> (p - average) * (p - average) 把每个元素转换为另一个数,这里是转换为该元素与平均值差的平方。一旦转换完成,我们就可以调用 sum() 方法来计算所有结果元素的和了。.

不过别那么着急。Stream 只能消耗一次。如果复用 populationStream,我们会碰到下面这个令人惊讶的错误:

复制代码
java.lang.IllegalStateException: stream has already been operated upon or closed

所以我们需要使用第二个流来计算方差,如下所示:

复制代码
public static double varianceStreams(double[] population){
double average = Arrays.stream(population).parallel().average().orElse(0.0);
double variance = Arrays.stream(population).parallel()
.map(p -> (p - average) * (p - average))
.sum() / population.length;
return variance;
}

通过使用 Streams API 内建的操作,我们以声明式、而且非常简洁的方式重写了最初的命令式风格代码,而且声明式风格读上去几乎就是方差的数学定义。我们再来研究一下三种实现版本的性能。

基准测试

我们以非常不同的风格编写了三个版本的方差算法。Stream 版本是最简洁的,而且是以声明式风格编写的,它让类库去确定具体的实现,并利用多核基础设施。不过你可能想知道它们的执行效果如何。为找出答案,让我们创建一个基准测试,对比一下三个版本的表现。我们先随机生成 1 到 140 之间的 3000 万个人口年龄数据,然后计算其方差。我们使用 jmh 来研究每个版本的性能。Jmh 是 OpenJDK 支持的一个 Java 套件。读者可以从 GitHub 克隆该项目,自己运行基准测试。

基准测试运行的机器是 Macbook Pro,配备 2.3 GHz 的 4 核 Intel Core i7 处理器,16GB 1600MHz DDR3 内存。此外,我们使用的 JDK 8 版本如下:

复制代码
java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

结果用下面的柱状图说明。命令式版本用了 60 毫秒,Fork/Join 版本用了 22 毫秒,而流版本用了 46 毫秒。

这些数据应该谨慎对待。比如,如果在 32 位 JVM 上运行测试,结果很可能有较大的差别。然而有趣的是,使用 Java 8 中的 Streams API 这种不同的编程风格,为在场景背后执行一些优化打开了一扇门,而这在严格的命令式风格中是不可能的;相对于使用 Fork/Join 框架,这种风格也更为直接。

关于作者

Raoul-Gabriel Urma 20 岁开始在剑桥大学攻读计算机科学博士学位。他的研究主要关注的是编程语言与软件工程。他以一等荣誉生的成绩获得了伦敦帝国理工学院的计算机科学工程硕士学位,并赢得一些技术创新奖项。他曾经为 Google、eBay、Oracle 和 Goldman Sachs 等很多大公司工作过,也参与过不少创业项目。此外,他还经常在 Java 开发者会议上发表讲话,也是一位 Java 课程讲师。他的 Twitter: @raoulUK 网站

Mario Fusco 是 Red Hat 的一位高级软件工程师,主要从事 Drools 核心和 JBoss 规则引擎方面的开发工作。作为 Java 开发者,他有着丰富的经验,参与了从媒体公司到金融部门等行业的很多企业级项目,而且经常作为项目的领导者。他的兴趣包括函数式编程和领域特定语言(Domain Specific Language,DSL)。凭借在这两个方面的激情,他创建了开源类库 lambdaj,意在在 Java 中为操作集合提供一种内部 Java DSL,并支持一点函数式编程。他的 Twitter 是 @mariofusco

查看英文原文: From Imperative Programming to Fork/Join to Parallel Streams in Java 8

2014-03-20 00:148159
用户头像
臧秀涛 略懂技术的运营同学。

发布了 300 篇内容, 共 130.2 次阅读, 收获喜欢 34 次。

关注

评论

发布
暂无评论
发现更多内容

模块八作业-设计消息队列存储消息的MySQL表

CH

架构实战营 #架构实战营 「架构实战营」

架构实战营4期-模块八作业

木几丶

「架构实战营」

模块八作业

李晓笛

「架构实战营」

Go 语言入门很简单:从 goroutine 出发到并发

宇宙之一粟

Go 语言 goroutine 2月月更

模块八作业

hunk

云原生训练营

卫星商业价值的尽头,谁来善后?

脑极体

EventBridge消息路由|高效构建消息路由能力

阿里巴巴云原生

阿里云 云原生 消息队列 EventBridge

作业:架构实战营模块 8

Poplar89

「架构实战营」

编写 Kubernetes 部署脚本将 httpserver 部署到 Kubernetes 集群

tom

不能Hook的人生不值得 jsHook和模拟执行

奋飞安全

安全 js hook jshook

分享两个常见的搜索算法:BFS和DFS

华为云开发者联盟

算法 DFS 深度优先搜索 BFS 搜索算法

云原生训练营-Week02

jjn0703

云原生训练营

日志管理系统,多种方式总结

架构 日志 slf4j logback

「架构实战营」模块八《如何设计贴合业务的高性能高可用中间件系统》作业

DaiChen

作业 「架构实战营」 模块八

7大迹象,表明你的DevOps 做对了!

SoFlu软件机器人

2022重磅:增长法则-巧用数字营销 突破企业困局

博文视点Broadview

模块八

Geek_59dec2

架构训练营模块八作业

zhongwy

消息队列基于Mysql存储表设计

tony

「架构实战营」

如何写好一个Java类?

蜜糖的代码注释

Java 整洁代码 2月月更

喜报!龙蜥操作系统&龙蜥社区双双荣登2021“科创中国”开源创新榜!

OpenAnolis小助手

开源 操作系统 创新

架构训练营模块八作业

沈益飞

架构训练营 架构师训练营 4 期

【架构实战营】模块八:命题作业

wgl

「架构实战营」

模块八 - 消息队列存储数据表结构设计

圈圈gor

架构实战营 「架构实战营」

Meta启示:AI是通往元宇宙的关键变量

脑极体

架构实战营 4 期第八模块作业

jialuooooo

架构实战营

基于STM32+ESP8266+华为云IoT设计的智能门锁

DS小龙哥

2月月更

史上最强代码自测方法,没有之一!

万俊峰Kevin

微服务 单元测试 go-zero 测试工具 Go 语言

大数据培训:Flink的提交模式

@零度

大数据 flink

架构实战模块八作业

Anlumina

「架构实战营」

第八周作业

cqyanbo

从命令式编程到Fork/Join再到Java 8中的并行Streams_Java_Raoul-Gabriel Urma_InfoQ精选文章