【ArchSummit架构师峰会】探讨数据与人工智能相互驱动的关系>>> 了解详情
写点什么

Apache Crunch:用于简化 MapReduce 编程的 Java 库

  • 2013-03-18
  • 本文字数:2454 字

    阅读完需:约 8 分钟

Apache Crunch(孵化器项目)是基于 Google 的 FlumeJava 库编写的 Java 库,用于创建 MapReduce 流水线。与其他用来创建 MapReduce 作业的高层工具(如 Apache Hive、Apache Pig 和 Cascading 等)类似,Crunch 提供了用于实现如连接数据、执行聚合和排序记录等常见任务的模式库。而与其他工具不同的是,Crunch 并不强制所有输入遵循同一数据类型。相反,Crunch 使用了一种定制的类型系统,非常灵活,能够直接处理复杂数据类型,如时间序列、HDF5 文件、Apache HBase 表和序列化对象(像 protocol buffer 或 Avro 记录)等。

Crunch 并不想阻止开发者以 MapReduce 方式思考,而是尝试使之简化。尽管 MapReduce 有诸多优点,但对很多问题而言,并非正确的抽象级别:大部分有意思的计算都是由多个 MapReduce 作业组成的,情况往往是这样——出于性能考虑,我们需要将逻辑上独立的操作(如数据过滤、数据投影和数据变换)组合为一个物理上的 MapReduce 作业。

本质上,Crunch 设计为 MapReduce 之上的一个薄层,希望在不牺牲 MapReduce 力量(或者说不影响开发者使用 MapReduce API)的前提下,更容易在正确的抽象级别解决手头问题。

尽管 Crunch 会让人想起历史悠久的 Cascading API,但是它们各自的数据模型有很大不同:按照常识简单总结一下,可以认为把问题看做数据流的人会偏爱 Crunch 和 Pig,而考虑 SQL 风格连接的人会偏爱 Cascading 和 Hive。

Crunch 的理念

PCollection 和 PTable<K, V> 是 Crunch 的核心抽象,前者代表一个分布式、不可变的对象集合,后者是 Pcollection 的一个子接口,其中包含了处理键值对的额外方法。这两个核心类支持如下四个基本操作:

  1. parallelDo:将用户定义函数应用于给定 PCollection,返回一个新的 PCollection 作为结果。
  2. groupByKey:将一个 PTable 中的元素按照键值排序并分组(等同于 MapReduce 作业中的 shuffle 阶段)
  3. combineValues:执行一个关联操作来聚合来自 groupByKey 操作的值。
  4. union:将两个或多个 Pcollection 看做一个虚拟的 PCollection。

Crunch 的所有高阶操作(joins、cogroups 和 set operations 等)都是通过这些基本原语实现的。Crunch 的作业计划器(job planner)接收流水线开发者定义的操作图,将操作分解为一系列相关的 MapReduce 作业,然后在 Hadoop 集群上执行。Crunch 也支持内存执行引擎,可用于本地数据上流水线的测试与调试。

有些问题可以从能够操作定制数据类型的大量用户定义函数受益,而 Crunch 就是为这种问题设计的。Crunch 中的用户定义函数设计为轻量级的,为满足应用程序的需要,仍然提供了完整的访问底层 MapReduce API 的功能。Crunch 开发者也可以使用 Crunch 原语来定义 API,为客户提供涉及一系列复杂 MapReduce 作业的高级 ETL、机器学习和科学计算功能。

Crunch 起步

可以从 Crunch 的网站下载最新版本的源代码或二进制文件,或者使用在 Maven Central 发布的 dependencies

源代码中有很多示例应用。下面是 Crunch 中 WordCount 应用的源代码:

复制代码
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.type.writable.Writables;
public class WordCount {
public static void main(String[] args) throws Exception {
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class);
// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(args[0]);
// Define a function that splits each line in a PCollection of Strings into a
// PCollection made up of the individual words in the file.
PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
public void process(String line, Emitter<String> emitter) {
for (String word : line.split("\\s+")) {
  emitter.emit(word);
}
}
}, Writables.strings()); // Indicates the serialization format
// The count method applies a series of Crunch primitives and returns
// a map of the top 20 unique words in the input PCollection to their counts.
// We then read the results of the MapReduce jobs that performed the
// computations into the client and write them to stdout.
for (Pair<String, Long> wordCount : words.count().top(20).materialize()) {
System.out.println(wordCount);
}
}
}

Crunch 优化方案

Crunch 优化器的目标是尽可能减少运行的 MapReduce 作业数。大多数 MapReduce 作业都是 IO 密集型的,因此访问数据的次数越少越好。公平地说,每种优化器(Hive、Pig、Cascading 和 Crunch)的工作方式本质上是相同的。但与其他框架不同的是,Crunch 把优化器原语暴露给了客户开发人员,对于像构造 ETL 流水线或构建并评估一组随机森林模型这样的任务而言,构造可复用的高阶操作更容易。

结论

Crunch 目前仍处于 Apache 的孵化器阶段,我们非常欢迎社区贡献(参见项目主页)让这个库更好。特别的是,我们正在寻求更高效的MapReduce 编译思想(包括基于成本考虑的优化)、新的MapReduce 设计模式,还希望支持更多的数据源和目标,如HCatalog、Solr 和ElasticSearch 等。还有很多把Crunch 带向如 Scala Clojure 等其他 JVM 语言的项目,也有很多使用 Crunch以R 语言来创建MapReduce 流水线的工具。

关于作者

Josh Wills 是 Cloudera 的数据科学主管,主要负责与客户和工程师一起基于 Hadoop 为不同行业开发解决方案。他从杜克大学获得数学专业学士学位,又从得克萨斯大学奥斯汀分校获得运筹学专业硕士学位。

查看英文原文: Apache Crunch: A Java Library for Easier MapReduce Programming

2013-03-18 17:068349
用户头像
臧秀涛 略懂技术的运营同学。

发布了 300 篇内容, 共 130.1 次阅读, 收获喜欢 34 次。

关注

评论

发布
暂无评论
发现更多内容

使用验证码拦截爬虫和机器人实践分享

宙哈哈

php html 爬虫 机器人

量化交易场景下日增 144 万条数据,使用 MySQL 和 TDengine 分别如何建模?

TDengine

大数据 tdengine 物联网 时序数据库

ZRTC高并发策略在专属音视频中台场景的应用

中关村科金

RTC 中关村科金 音视频中台 高并发策略 对话式AI

MIAOYUN与OpenCloudOS、TencentOS Server 3完成产品兼容互认证

MIAOYUN

容器云 云平台 产品兼容性互认 互认证 兼容性互认证

低代码有哪些缺点?4千字深入解析

优秀

低代码 低代码缺点

Flink Table Store 独立孵化启动 , Apache Paimon 诞生

Openlab_cosmoplat

大数据 开源

聚焦用户精细化运营场景,极客邦科技与火山引擎数智平台达成合作

字节跳动数据平台

用户增长 数字化 用户运营 数字化案例 企业号 4 月 PK 榜

「刷起来」Go必看的进阶面试题详解

王中阳Go

golang 高效工作 学习方法 面试题 Go 语言

阿里云弹性计算资深技术专家徐海弘:云上自动化运维成熟度模型

云布道师

弹性计算

学了这么久的高并发编程,连Java中的并发原子类都不知道?

华为云开发者联盟

Java 开发 华为云 华为云开发者联盟 企业号 4 月 PK 榜

DataLeap数据资产实战:如何实现存储优化?

字节跳动数据平台

MySQL 数据库 大数据 数据治理 数据存储

助力企业数字化转型,「一体化」或是最佳实践路径

ToB行业头条

FastAPI 开发中数据校验利器 Pydantic 介绍与集成使用

宇宙之一粟

Python 后端 FastApi Pydantic

流媒体数字版权应用实践

Marvin Ma

数字版权 流媒体

尚硅谷新版Git视频教程发布

小谷哥

基于二代征信报告的信用评估模型实践

中关村科金

金融 征信 风控 对话式AI

数据库原理及MySQL应用 | 事件

TiAmo

MySQL 数据库 事件

软件测试/测试开发丨录制你的第一个web 自动化测试用例

测试人

软件测试 自动化测试 测试开发 Web自动化测试

软件测试/测试开发丨测试步骤代码修改,用 Yaml实现数据驱动

测试人

软件测试 自动化测试 yaml 测试开发 UI自动化测试

上网买个东西,居然需要那么多业务系统支撑!

产品海豚湾

SaaS 电商 供应链 电商平台 wms

景顺长城基于 Apache APISIX 在金融云原生的生产实践

API7.ai 技术团队

api 网关 APISIX 金融业务

KgCaptcha接入汇总

宙哈哈

Java php Python C# html

[验证码] KgCaptcha风险监测方法

宙哈哈

php html

从零学习SDK(1)什么是SDK,为什么要使用它

MobTech袤博科技

大模型打开了一层技术天花板,催生新场景变革老场景

中关村科金

人工智能 企业服务 大模型 对话式AI

如何在 Web 实现支持虚拟背景的视频会议

声网

Web 视频会议 RTE 虚拟背景

传输体积下降 85%,融云 HTTP 压缩算法解析

融云 RongCloud

算法 音视频 传输 融云 通讯

sync.WaitGroup:掌握并发编程中的重要工具

Jack

通过 NFTScan 追踪 NFT 钻石手持仓

NFT Research

NFT NFTScan

MobPush推送查询API

MobTech袤博科技

WebAssembly 助力云原生:APISIX 如何借助 Wasm 插件实现扩展功能?

API7.ai 技术团队

api 网关 APISIX Wasm

Apache Crunch:用于简化MapReduce编程的Java库_语言 & 开发_Josh Wills_InfoQ精选文章