写点什么

Oozie 简介

  • 2011-08-18
  • 本文字数:8060 字

    阅读完需:约 26 分钟

在 Hadoop 中执行的任务有时候需要把多个 Map/Reduce 作业连接到一起,这样才能够达到目的。[1] 在 Hadoop 生态圈中,有一种相对比较新的组件叫做 Oozie[2],它让我们可以把多个 Map/Reduce 作业组合到一个逻辑工作单元中,从而完成更大型的任务。本文中,我们会向你介绍 Oozie 以及使用它的一些方式。

什么是 Oozie?

Oozie 是一种 Java Web 应用程序,它运行在 Java servlet 容器——即 Tomcat——中,并使用数据库来存储以下内容:

  • 工作流定义
  • 当前运行的工作流实例,包括实例的状态和变量

Oozie 工作流是放置在控制依赖 DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop 的 Map/Reduce 作业、Pig 作业等),其中指定了动作执行的顺序。我们会使用 hPDL(一种 XML 流程定义语言)来描述这个图。

hPDL 是一种很简洁的语言,只会使用少数流程控制和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end 和 fail 节点)以及控制工作流执行路径的机制(decision、fork 和 join 节点)。动作节点是一些机制,通过它们工作流会触发执行计算或者处理任务。Oozie 为以下类型的动作提供支持: Hadoop map-reduce、Hadoop 文件系统、Pig、Java 和 Oozie 的子工作流(SSH 动作已经从 Oozie schema 0.2 之后的版本中移除了)。

所有由动作节点触发的计算和处理任务都不在 Oozie 之中——它们是由 Hadoop 的 Map/Reduce 框架执行的。这种方法让 Oozie 可以支持现存的 Hadoop 用于负载平衡、灾难恢复的机制。这些任务主要是异步执行的(只有文件系统动作例外,它是同步处理的)。这意味着对于大多数工作流动作触发的计算或处理任务的类型来说,在工作流操作转换到工作流的下一个节点之前都需要等待,直到计算或处理任务结束了之后才能够继续。Oozie 可以通过两种不同的方式来检测计算或处理任务是否完成,也就是回调和轮询。当 Oozie 启动了计算或处理任务的时候,它会为任务提供唯一的回调 URL,然后任务会在完成的时候发送通知给特定的 URL。在任务无法触发回调 URL 的情况下(可能是因为任何原因,比方说网络闪断),或者当任务的类型无法在完成时触发回调 URL 的时候,Oozie 有一种机制,可以对计算或处理任务进行轮询,从而保证能够完成任务。

Oozie 工作流可以参数化(在工作流定义中使用像 ${inputDir}之类的变量)。在提交工作流操作的时候,我们必须提供参数值。如果经过合适地参数化(比方说,使用不同的输出目录),那么多个同样的工作流操作可以并发。

一些工作流是根据需要触发的,但是大多数情况下,我们有必要基于一定的时间段和(或)数据可用性和(或)外部事件来运行它们。Oozie 协调系统(Coordinator system)让用户可以基于这些参数来定义工作流执行计划。Oozie 协调程序让我们可以以谓词的方式对工作流执行触发器进行建模,那可以指向数据、事件和(或)外部事件。工作流作业会在谓词得到满足的时候启动。

经常我们还需要连接定时运行、但时间间隔不同的工作流操作。多个随后运行的工作流的输出会成为下一个工作流的输入。把这些工作流连接在一起,会让系统把它作为数据应用的管道来引用。Oozie 协调程序支持创建这样的数据应用管道。

安装 Oozie

我们可以把 Oozie 安装在现存的 Hadoop 系统中,安装方式包括 tarball、RPM 和 Debian 包等。我们的 Hadoop 部署是 Cloudera 的 CDH3,其中已经包含了 Oozie。因此,我们只是使用 yum 把它拉下来,然后在 edge 节点 [1] 上执行安装操作。在 Oozie 的发布包中有两个组件——Oozie-client 和 Oozie-server。根据簇集的规模,你可以让这两个组件安装在同一台 edge 服务器上,也可能安装在不同的计算机上。Oozie 服务器中包含了用于触发和控制作业的组件,而客户端中包含了让用户可以触发 Oozie 操作并与 Oozie 服务器通信的组件。

想要了解更多关于安装过程的信息,请使用 Cloudera 发布包,并访问 Cloudera 站点 [2]

注: 除了包括安装过程的内容之外,它还建议把下面的 shell 变量 OOZIE_URL 根据需要添加到.login、.kshrc 或者 shell 的启动文件中:

复制代码
(export OOZIE_URL=http://localhost:11000/oozie)

简单示例

为了向你展示 Oozie 的使用方法,让我们创建一个简单的示例。我们拥有两个 Map/Reduce 作业 [3] ——一个会获取最初的数据,另一个会合并指定类型的数据。实际的获取操作需要执行最初的获取操作,然后把两种类型的数据——Lidar 和 Multicam——合并。为了让这个过程自动化,我们需要创建一个简单的 Oozie 工作流(代码 1)。

复制代码
<!--
Copyright (c) 2011 NAVTEQ! Inc. All rights reserved.
NGMB IPS ingestor Oozie Script
-->
<workflow-app xmlns='uri:oozie:workflow:0.1' name='NGMB-IPS-ingestion'>
<start to='ingestor'/>
<action name='ingestor'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.MapReduce.ips.IPSLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>${driveID}</arg>
</java>
<ok to="merging"/>
<error to="fail"/>
</action>
<fork name="merging">
<path start="mergeLidar"/>
<path start="mergeSignage"/>
</fork>
<action name='mergeLidar'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.hdfs.merge.MergerLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>-drive</arg>
<arg>${driveID}</arg>
<arg>-type</arg>
<arg>Lidar</arg>
<arg>-chunk</arg>
<arg>${lidarChunk}</arg>
</java>
<ok to="completed"/>
<error to="fail"/>
</action>
<action name='mergeSignage'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.hdfs.merge.MergerLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>-drive</arg>
<arg>${driveID}</arg>
<arg>-type</arg>
<arg>MultiCam</arg>
<arg>-chunk</arg>
<arg>${signageChunk}</arg>
</java>
<ok to="completed"/>
<error to="fail"/>
</action>
<join name="completed" to="end"/>
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end'/>
</workflow-app>

代码 1: 简单的 Oozie 工作流

这个工作流定义了三个动作:ingestor、mergeLidar 和 mergeSignage。并把每个动作都实现为 Map/Reduce [4] 作业。这个工作流从 start 节点开始,然后把控制权交给 Ingestor 动作。一旦 ingestor 步骤完成,就会触发 fork 控制节点 [4],它会并行地开始执行 mergeLidar 和 mergeSignage [5] 。这两个动作完成之后,就会触发 join 控制节点 [6] 。join 节点成功完成之后,控制权就会传递给 end 节点,它会结束这个过程。

创建工作流之后,我们需要正确地对其进行部署。典型的 Oozie 部署是一个 HDFS 目录,其中包含 workflow.xml(代码 1)、config-default.xml 和 lib 子目录,其中包含有工作流操作所要使用的类的 jar 文件。

(点击可以查看大图)

图1: Oozie 部署

config-default.xml 文件是可选的,通常其中会包含对于所有工作流实例通用的工作流参数。代码 2 中显示的是 config-default.xml 的简单示例。

复制代码
<configuration>
<property>
<name>jobTracker</name>
<value>sachicn003:2010</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://sachicn001:8020</value>
</property>
<property>
<name>queueName</name>
<value>default</value>
</property>
</configuration>

代码 2: Config-default.xml

完成了工作流的部署之后,我们可以使用 Oozie 提供的命令行工具 [5],它可以用于提交、启动和操作工作流。这个工具一般会运行在 Hadoop 簇集 [7] 的 edge 节点上,并需要一个作业属性文件(参见配置工作流属性),见代码 3。

复制代码
oozie.wf.application.path=<span color="#0000ff"><u>hdfs</u>://sachicn001:8020/user/<u>blublins</u>/<u>workflows</u>/IPSIngestion</span>
jobTracker=<span color="#0000ff">sachicn003:2010</span>
nameNode=<span color="#0000ff"><u>hdfs</u>://sachicn001:8020</span>

代码 3: 作业属性文件

有了作业属性,我们就可以使用代码 4 中的命令来运行 Oozie 工作流。

复制代码
oozie job –oozie http://sachidn002.hq.navteq.com:11000/oozie/ -D driveID=729-pp00002-2011-02-08-09-59-34 -D lidarChunk=4 -D signageChunk=20 -config job.properties –run

列表 4: 运行工作流命令

配置工作流属性

在 config-default.xml、作业属性文件和作业参数中有一些重叠,它们可以作为命令行调用的一部分传递给 Oozie。尽管文档中没有清晰地指出何时使用哪个,但总体上的建议如下:

  • 使用 config-default.xml 定义对于指定工作流从未改变过的参数。
  • 对于给定的工作流部署通用的参数,建议使用作业属性。
  • 对于指定的工作流调用特定的参数使用命令行参数。

Oozie 处理这三种参数的方式如下:

  • 使用所有命令行调用的参数
  • 如果那里有任何无法解析的参数,那么就是用作业配置来解析
  • 一旦所有其它方式都无法处理,那么就试着使用 config-default.xm。

我们可以使用 Oozie 控制台(图 2)来观察工作流执行的进程和结果。

(点击可以查看大图)

图2: Oozie 控制台

我们还可以使用Oozie 控制台来获得操作执行的细节,比方说作业的日志 [8] (图 3)。

(点击可以查看大图)

图 3: Oozie 控制台——作业日志

编程方式的工作流调用

尽管上面所述的命令行界面能够很好地用于手动调用 Oozie,但有时使用编程的方式调用 Oozie 更具有优势。当 Oozie 工作流是特定的应用程序或者大型企业过程的一部分,这就会很有用。我们可以使用 Oozie Web Services APIs [6] 或者 Oozie Java client APIs [7] 来实现这种编程方式的调用。代码 5 中展现的就是很简单的 Oozie Java 客户端的例子,它会触发上面描述的过程。

复制代码
package com.navteq.assetmgmt.oozie;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.WorkflowJob.Status;
public class WorkflowClient {
private static String OOZIE_URL = "http://sachidn002.hq.navteq.com:11000/oozie/";
private static String JOB_PATH = "hdfs://sachicn001:8020/user/blublins/workflows/IPSIngestion";
private static String JOB_Tracker = "sachicn003:2010";
private static String NAMENode = "hdfs://sachicn001:8020";
OozieClient wc = null;
public WorkflowClient(String url){
wc = new OozieClient(url);
}
public String startJob(String wfDefinition, List<WorkflowParameter> wfParameters)
throws OozieClientException{
// create a workflow job configuration and set the workflow application path
Properties conf = wc.createConfiguration();
conf.setProperty(OozieClient.APP_PATH, wfDefinition);
// setting workflow parameters
conf.setProperty("jobTracker", JOB_Tracker);
conf.setProperty("nameNode", NAMENode);
if((wfParameters != null) && (wfParameters.size() > 0)){
for(WorkflowParameter parameter : wfParameters)
conf.setProperty(parameter.getName(), parameter.getValue());
}
// submit and start the workflow job
return wc.run(conf);
}
public Status getJobStatus(String jobID) throws OozieClientException{
WorkflowJob job = wc.getJobInfo(jobID);
return job.getStatus();
}
public static void main(String[] args) throws OozieClientException, InterruptedException{
// Create client
WorkflowClient client = new WorkflowClient(OOZIE_URL);
// Create parameters
List<WorkflowParameter> wfParameters = new LinkedList<WorkflowParameter>();
WorkflowParameter drive = new WorkflowParameter("driveID","729-pp00004-2010-09-01-09-46");
WorkflowParameter lidar = new WorkflowParameter("lidarChunk","4");
WorkflowParameter signage = new WorkflowParameter("signageChunk","4");
wfParameters.add(drive);
wfParameters.add(lidar);
wfParameters.add(signage);
// Start Oozing
String jobId = client.startJob(JOB_PATH, wfParameters);
Status status = client.getJobStatus(jobId);
if(status == Status.RUNNING)
System.out.println("Workflow job running");
else
System.out.println("Problem starting Workflow job");
}
}

代码 5: 简单的 Oozie Java 客户端

在此,我们首先使用 Oozie 服务器 URL 对工作流客户端进行初始化。初始化过程完成之后,我们就可以使用客户端提交并启动作业(startJob 方法),获得正在运行的作业的状态(getStatus 方法),以及进行其他操作。

构建 java 动作,向工作流传递参数

在之前的示例中,我们已经展示了如何使用标签向 Java 节点传递参数。由于 Java 节点是向 Oozie 引入自定义计算的主要方法,因此能够从 Java 节点向 Oozie 传递数据也同样重要。

根据 Java 节点的文档 [3],我们可以使用“capture-output””元素把 Java 节点生成的值传递回给 Oozie 上下文。然后,工作流的其它步骤可以通过 EL-functions 访问这些值。返回值需要以 Java 属性格式文件写出来。我们可以通过“JavaMainMapper.OOZIE_JAVA_MAIN_CAPTURE_OUTPUT_FILE”常量从 System 属性中获得这些属性文件的名称。代码 6 是一个简单示例,演示了如何完成这项操作。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;
public class GenerateLookupDirs {
/**
* @param args
*/
public static final long dayMillis = 1000 * 60 * 60 * 24;
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
Calendar curDate = new GregorianCalendar();
int year, month, date;
String propKey, propVal;
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
for (int i = 0; I < 8; ++i) {
year = curDate.get(Calendar.YEAR);
month = curDate.get(Calendar.MONTH) + 1;
date = curDate.get(Calendar.DATE);
propKey = "dir"+i;
propVal = year + "-" +
(month < 10 ? "0" + month : month) + "-" +
(date < 10 ? "0" + date : date);
props.setProperty(propKey, propVal);
curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
}
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 6: 向 Oozie 传递参数

在这个示例中,我们假设在 HDFS 中有针对每个日期的目录。这样,这个类首先会获得当前日期,然后再获得离现在最近的 7 个日期(包括今天),然后把目录名称传递回给 Oozie。

结论

在本文我们介绍了 Oozie,它是针对 Hadoop 的工作流引擎,并且提供了使用它的简单示例。在下一篇文章中,我们会看到更复杂的例子,让我们可以更进一步讨论 Oozie 的特性。

致谢

非常感谢我们在 Navteq 的同事 Gregory Titievsky,他为我们提供了一些例子。

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。 他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。 Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。


[1] edge 节点是安装有 Hadoop 库的计算机,但不是真正簇集中的一部分。它是为能够连接到簇集中的应用程序所用的,并且会部署辅助服务以及能够直接访问簇集的最终用户应用程序。

[2] 请参看Oozie 安装的链接。

[3] 这些作业的细节和本文无关,所以在其中没有描述。

[4] Map/Reduce 作业能够以两种不同的方式在 Oozie 中实现——第一种是作为真正的 Map/Reduce 动作 [2],其中你会指定 Mapper 和 Reducer 类以及它们的配置信息;第二种是作为 Java 动作 [3],其中你会使用 Hadoop API 来指定启动 Map/Reduce 作业的类。因为我们所有的 Java 主函数都是使用 Hadoop API,并且还实现了一些额外的功能,所以我们选择了第二种方法。

[5] Oozie 确保两个动作会并行地提交给作业跟踪程序。在执行过程中实际的并行机制并不在 Oozie 的控制之内,并且依赖于作业的需求、簇集的能力以及 Map/Reduce 部署所使用的调度程序。

[6] join 动作的功能是要同步 fork 动作启动的多个并行执行的线程。如果 fork 启动的所有执行的线程都能够成功完成,那么 join 动作就会等待它们全部完成。如果有至少一个线程执行失败,kill 节点会“杀掉”剩余运行的线程。

[7] 这个节点不需要是安装了 Oozie 的计算机。

[8] Oozie 的作业日志会包含工作流执行的细节,想要查看动作执行的细节,我们需要切换到 Hadoop 的 Map/Reduce 管理页面。

查看英文原文: Introduction to Oozie


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2011-08-18 00:0053964
用户头像

发布了 340 篇内容, 共 129.7 次阅读, 收获喜欢 13 次。

关注

评论

发布
暂无评论
发现更多内容

Ingress Nginx 接连披露高危安全漏洞,是否有更好的选择?

阿里巴巴云原生

阿里云 Kubernetes 云原生 ingress

Go-Excelize API源码阅读(三)——OpenReader()

Regan Yue

Go 开源 源码分析 8月日更 8月月更

关于架构的认知

yuexin_tech

架构

leetcode 232. Implement Queue using Stacks 用栈实现队列(简单)

okokabcd

LeetCode 数据结构与算法 栈和队列

深入浅出边缘云 | 6. 监控与遥测

俞凡

架构 边缘计算 网络 深入浅出边缘云

OpenHarmony像素单位

坚果

开源 OpenHarmony 8月月更

【LeetCode】算术三元组的数目Java题解

Albert

LeetCode 8月月更

详解中断系统

timerring

8月月更

SpringBoot实战:国际化组件MessageSource的执行逻辑与源码

看山

源码 spring源码 MessageSource Spring原理 SpringBoot实战

MPLS网络向SRv6网络演进

穿过生命散发芬芳

8月月更 SRv6

《编程的原则》读书笔记(一):编程的前提和准则

Chares

软件工程 软件开发 程序开发 编程原理

低成本、大容量、高交互…Polkadot 引领 GameFi 实现新突破

One Block Community

区块链

rocketmq整合SpringCloudStream

急需上岸的小谢

8月月更

测试也应该具备的项目管理能力

老张

项目管理 质量保障

基于消息中间件开发的优点

阿泽🧸

消息中间件 8月月更

Python 教程之输入输出(7)—— 如何在 Python 中不使用换行符进行打印?

海拥(haiyong.site)

Python 8月月更

模块九(电商秒杀系统)

Geek_701557

毕业总结

Geek_701557

3 款非常实用的 Node.js 版本管理工具

Geek_z9ygea

JavaScript node.js 前端

Spring(四、配置数据源)

开源 MySQ Druid 8月月更

Java+EasyExcel实现文件导入导出

Bug终结者

Java 8月月更

深入了解 Spring篇之BeanDefinition结构

邱学喆

对象初始化 BeanDefinition 对象创建 属性注入 对象检索

即将开幕!阿里云飞天技术峰会邀您一同探秘云原生最佳实践

阿里巴巴云原生

阿里云 云原生 阿里云飞天技术峰会

云原生时代下,微服务体系与 Serverless 架构的发展、治理与融合

阿里巴巴云原生

阿里云 Serverless 微服务 云原生

MySQL 指令

武师叔

8月月更

一文带你搞懂OAuth2.0

闫同学

Go 后端 OAuth 2.0

企业文化如何治好“企业内耗”?

涛哥 数字产品和业务架构

企业文化 企业架构

Python爬虫eval混淆,爬虫进阶实战系列

梦想橡皮擦

Python 爬虫 8月月更

C++对象模型和this指针实例分析(二)

CtrlX

c++ 后端 面向对象思想 热门活动 8月月更

开源一夏 | 对于jQuery选择器和动画效果停止动画的实战心得【前端jQuery框架】

恒山其若陋兮

开源 8月月更

Sass.vs.Less | 简介

Jason199

SaaS 8月月更

Oozie简介_Java_Boris Lublinsky_InfoQ精选文章