AICon 上海站|日程100%上线,解锁Al未来! 了解详情
写点什么

从命令式编程到 Fork/Join 再到 Java 8 中的并行 Streams

  • 2014-03-20
  • 本文字数:4465 字

    阅读完需:约 15 分钟

Java 8 带来了很多可以使编码更简洁的特性。例如,像下面的代码:

复制代码
Collections.sort(transactions, new Comparator<Transaction>(){
public int compare(Transaction t1, Transaction t2){
return t1.getValue().compareTo(t2.getValue());
}
});

可以用替换为如下更为紧凑的代码,功能相同,但是读上去与问题语句本身更接近了:

复制代码
transactions.sort(comparing(Transaction::getValue));

Java 8 引入的主要特性是 Lambda 表达式、方法引用和新的 Streams API。它被认为是自 20 年前 Java 诞生以来语言方面变化最大的版本。要想通过详细且实际的例子来了解如何从这些特性中获益,可以参考本文作者和 Alan Mycroft 共同编写的《 Java 8 in Action: Lambdas, Streams and Functional-style programming 》一书。

这些特性支持程序员编写更简洁的代码,还使他们能够受益于多核架构。实际上,编写可以优雅地并行执行的程序还是 Java 专家们的特权。然而,借助新的 Streams API,Java 8 改变了这种状况,让每个人都能够更容易地编写利用多核架构的代码。

在这篇文章中,我们将使用以下三种风格,以不同方法计算一个大数据集的方差,并加以对比。

  1. 命令式风格
  2. Fork/Join 框架
  3. Streams API

方差是统计学中的概念,用于度量一组数的偏离程度。方差可以通过对每个数据与平均值之差的平方和求平均值来计算。例如,给定一组表示人口年龄的数:40、30、50 和 80,我们可以这样计算方差:

  1. 计算平均值:(40 + 30 + 50 + 80) / 4 = 50
  2. 计算每个数据与平均值之差的平方和:(40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400
  3. 最后平均:1400/4 = 350

命令式风格

下面是计算方差的一种典型的命令式风格实现:

复制代码
public static double varianceImperative(double[] population){
double average = 0.0;
for(double p: population){
average += p;
}
average /= population.length;
double variance = 0.0;
for(double p: population){
variance += (p - average) * (p - average);
}
return variance/population.length;
}

为什么说这是命令式的呢?我们的实现用 _ 修改状态的语句序列 _ 描述了计算过程。这里,我们显式地对人口年龄数组中的每个元素进行迭代,而且每次迭代时更新 average 和 variance 这两个局部变量。这种代码很适合只有一个 CPU 的硬件架构。确实,它可以非常直接地映射到 CPU 的指令集。

Fork/Join 框架

那么,如何编写适合在多核架构上执行的实现代码呢?应该使用线程吗?这些线程是不是要在某个点上同步?Java 7 引入的 Fork/Join 框架缓解了一些困难,所以让我们使用该框架来开发方差算法的一个并行版本吧。

复制代码
public class ForkJoinCalculator extends RecursiveTask<Double> {
public static final long THRESHOLD = 1_000_000;
private final SequentialCalculator sequentialCalculator;
private final double[] numbers;
private final int start;
private final int end;
public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
this(numbers, 0, numbers.length, sequentialCalculator);
}
private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator
sequentialCalculator) {
this.numbers = numbers;
this.start = start;
this.end = end;
this.sequentialCalculator = sequentialCalculator;
}
@Override
protected Double compute() {
int length = end - start;
if (length <= THRESHOLD) {
return sequentialCalculator.computeSequentially(numbers, start, end);
}
ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2,
sequentialCalculator);
leftTask.fork();
ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end,
sequentialCalculator);
Double rightResult = rightTask.compute();
Double leftResult = leftTask.join();
return leftResult + rightResult;
}
}

这里我们编写了一个 _RecursiveTask_ 类的子类,它对一个 double 数组进行切分,当子数组的长度小于等于给定阈值(THRESHOLD)时停止切分。切分完成后,对子数组进行顺序处理,并将下列接口定义的操作应用于子数组。

复制代码
public interface SequentialCalculator {
double computeSequentially(double[] numbers, int start, int end);
}

利用该基础设施,可以按如下方式并行计算方差。

复制代码
public static double varianceForkJoin(double[] population){
final ForkJoinPool forkJoinPool = new ForkJoinPool();
double total = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double total = 0;
for (int i = start; i < end; i++) {
total += numbers[i];
}
return total;
}
}));
final double average = total / population.length;
double variance = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double variance = 0;
for (int i = start; i < end; i++) {
variance += (numbers[i] - average) * (numbers[i] - average);
}
return variance;
}
}));
return variance / population.length;
}

本质上,即便使用 Fork/Join 框架,相对于顺序版本,并行版本的编写和最后的调试仍然困难许多。

并行 Streams

Java 8 让我们可以以不同的方式解决这个问题。不同于编写代码指出计算如何实现,我们可以使用 Streams API 粗线条地描述让它做什么。作为结果,库能够知道如何为我们实现计算,并施以各种各样的优化。这种风格被称为 _ 声明式编程 _。Java 8 有一个为利用多核架构而专门设计的并行 Stream。我们来看一下如何使用它们来更快地计算方差。

假定读者对本节探讨的 Stream 有些了解。作为复习,Stream是 T 类型元素的一个序列,支持聚合操作。我们可以使用这些操作来创建表示计算的一个管道(pipeline)。这里的管道和 UNIX 的命令管道一样。并行 Stream 就是一个可以并行执行管道的 Stream,可以通过在普通的 Stream 上调用 parallel() 方法获得。要复习 Stream,可以参考 Javadoc 文档

好消息是,Java 8 API 内建了一些算术操作,如 max、min 和 average。我们可以使用 Stream 的几种基本类型特化形式来访问前面几个方法:IntStream(int 类型元素)、LongStream(long 类型元素)和 DoubleStream(double 类型元素)。例如,可以使用 IntStream.rangeClosed() 创建一系列数,然后使用 max() 和 min() 方法计算 Stream 中的最大元素和最小元素。

回到最初的问题,我们想使用这些操作来计算一个规模较大的人口年龄数据的方差。第一步是从人口年龄数组创建一个 Stream,可以通过 Arrays.stream() 静态方法实现:

复制代码
DoubleStream populationStream = Arrays.stream(population).parallel();

我们可以使用 DoubleStream 所支持的 average() 方法:

复制代码
double average = populationStream.average().orElse(0.0);

下一步是使用 average 计算方差。人口年龄中的每个元素首先需要减去平均值,然后计算差的平方。可以将其视作一个 Map 操作:使用一个 Lambda 表达式 (double p) -> (p - average) * (p - average) 把每个元素转换为另一个数,这里是转换为该元素与平均值差的平方。一旦转换完成,我们就可以调用 sum() 方法来计算所有结果元素的和了。.

不过别那么着急。Stream 只能消耗一次。如果复用 populationStream,我们会碰到下面这个令人惊讶的错误:

复制代码
java.lang.IllegalStateException: stream has already been operated upon or closed

所以我们需要使用第二个流来计算方差,如下所示:

复制代码
public static double varianceStreams(double[] population){
double average = Arrays.stream(population).parallel().average().orElse(0.0);
double variance = Arrays.stream(population).parallel()
.map(p -> (p - average) * (p - average))
.sum() / population.length;
return variance;
}

通过使用 Streams API 内建的操作,我们以声明式、而且非常简洁的方式重写了最初的命令式风格代码,而且声明式风格读上去几乎就是方差的数学定义。我们再来研究一下三种实现版本的性能。

基准测试

我们以非常不同的风格编写了三个版本的方差算法。Stream 版本是最简洁的,而且是以声明式风格编写的,它让类库去确定具体的实现,并利用多核基础设施。不过你可能想知道它们的执行效果如何。为找出答案,让我们创建一个基准测试,对比一下三个版本的表现。我们先随机生成 1 到 140 之间的 3000 万个人口年龄数据,然后计算其方差。我们使用 jmh 来研究每个版本的性能。Jmh 是 OpenJDK 支持的一个 Java 套件。读者可以从 GitHub 克隆该项目,自己运行基准测试。

基准测试运行的机器是 Macbook Pro,配备 2.3 GHz 的 4 核 Intel Core i7 处理器,16GB 1600MHz DDR3 内存。此外,我们使用的 JDK 8 版本如下:

复制代码
java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

结果用下面的柱状图说明。命令式版本用了 60 毫秒,Fork/Join 版本用了 22 毫秒,而流版本用了 46 毫秒。

这些数据应该谨慎对待。比如,如果在 32 位 JVM 上运行测试,结果很可能有较大的差别。然而有趣的是,使用 Java 8 中的 Streams API 这种不同的编程风格,为在场景背后执行一些优化打开了一扇门,而这在严格的命令式风格中是不可能的;相对于使用 Fork/Join 框架,这种风格也更为直接。

关于作者

Raoul-Gabriel Urma 20 岁开始在剑桥大学攻读计算机科学博士学位。他的研究主要关注的是编程语言与软件工程。他以一等荣誉生的成绩获得了伦敦帝国理工学院的计算机科学工程硕士学位,并赢得一些技术创新奖项。他曾经为 Google、eBay、Oracle 和 Goldman Sachs 等很多大公司工作过,也参与过不少创业项目。此外,他还经常在 Java 开发者会议上发表讲话,也是一位 Java 课程讲师。他的 Twitter: @raoulUK 网站

Mario Fusco 是 Red Hat 的一位高级软件工程师,主要从事 Drools 核心和 JBoss 规则引擎方面的开发工作。作为 Java 开发者,他有着丰富的经验,参与了从媒体公司到金融部门等行业的很多企业级项目,而且经常作为项目的领导者。他的兴趣包括函数式编程和领域特定语言(Domain Specific Language,DSL)。凭借在这两个方面的激情,他创建了开源类库 lambdaj,意在在 Java 中为操作集合提供一种内部 Java DSL,并支持一点函数式编程。他的 Twitter 是 @mariofusco

查看英文原文: From Imperative Programming to Fork/Join to Parallel Streams in Java 8

2014-03-20 00:148500
用户头像
臧秀涛 略懂技术的运营同学。

发布了 300 篇内容, 共 140.8 次阅读, 收获喜欢 35 次。

关注

评论

发布
暂无评论
发现更多内容

7分钟带你细致解析4个Java算法必刷题

好程序员IT教育

Java

Unload data from Databend | 新手篇(4)

Databend

databend

中国银河证券:缺少DevOps,企业数字化转型就是带着脚镣跳舞

嘉为蓝鲸

DevOps 数字化转型 金融 证券

打造用户喜爱的产品,离不开需求助推器|影响地图Impact Mapping

老彦

敏捷开发 软件工程 设计思维 用户故事地图 影响地图

从保险系统升级谈微服务架构的弊端

勇士

外包 微服务 系统架构 保险 运维开发

高标准企业级安全性,华为云会议为线上沟通保驾护航

秃头也爱科技

Meetup预告:SeaTunnel在天翼云数据集成平台的探索实践

Apache SeaTunnel

大数据 技术分享 数据同步 数据集成 Apache SeaTunnel

第二章STP应用配置

初学者

网络 11月月更

深入浅出学习透析Nginx服务器的基本原理和配置指南「Https安全控制篇」

码界西柚

nginx https ssl 11月日更 SSL/TLS 协议

前端培训后的学习方法有哪些

小谷哥

前端培训班学习真的靠谱吗?

小谷哥

在 Spring 生态中玩转 RocketMQ

Apache RocketMQ

RocketMQ spring could

华为云弹性负载均衡ELB,如何保障服务器不瘫痪?

清欢科技

边旅游边赚钱!数字游民离不开远程控制软件

RayLink远程工具

远程控制软件 远程办公软件 远控软件 RayLink 远程控制连接

“PKS+生态(智方舟)加速营”圆满结束,九科信息与中电智方舟达成战略合作

九科Ninetech

前端食堂技术周刊第 60 期:TypeScript 4.9、Ant Design 5.0、用 vanilla-extract 编写高性能的 CSS、Node.js 安全最佳实践

童欧巴

零基础转行前端培训学习好吗?

小谷哥

低代码平台中的“模型驱动”与“表单驱动”有何区别?

优秀

领域驱动模型DDD 中台架构 表单设计 低代码平台

前端培训入行35岁程序员有什么奔头

小谷哥

大数据培训的就业前景怎么样

小谷哥

唯一获奖容器厂商!灵雀云斩获2022信创“大比武”通信赛道大奖

York

容器 云原生 数字化转型 国产化 信创云

netty系列之:在netty中使用proxy protocol

程序那些事

Java 架构 Netty 程序那些事

深度解读隐语密态计算设备SPU

隐语SecretFlow

机器学习 开源 隐私计算 开源框架 隐语

第一章三层交换应用

初学者

网络 11月月更

MACH架构的质量工程指南

俞凡

架构 微服务 云原生

华为云智能云接入ICA,助力企业轻松上云

科技怪授

出海有“云”!华为云全球加速助力跨国企业提升网络体验

科技怪授

Doris Summit 2022 正式启航,演讲议题开启征集

SelectDB

开源 Doris 峰会 summit SelectDB

数字化转型,要把功夫炼到任督二脉

脑极体

数据仓库实战教程

kingcall

数据仓库 数据湖 数据安全 数仓 数仓建模

从命令式编程到Fork/Join再到Java 8中的并行Streams_Java_Raoul-Gabriel Urma_InfoQ精选文章