【锁定直播】字节、华为云、阿里云等技术专家讨论如何将大模型接入 AIOps 解决实际问题,戳>>> 了解详情
写点什么

Oozie 简介

  • 2011-08-18
  • 本文字数:8060 字

    阅读完需:约 26 分钟

在 Hadoop 中执行的任务有时候需要把多个 Map/Reduce 作业连接到一起,这样才能够达到目的。[1] 在 Hadoop 生态圈中,有一种相对比较新的组件叫做 Oozie[2],它让我们可以把多个 Map/Reduce 作业组合到一个逻辑工作单元中,从而完成更大型的任务。本文中,我们会向你介绍 Oozie 以及使用它的一些方式。

什么是 Oozie?

Oozie 是一种 Java Web 应用程序,它运行在 Java servlet 容器——即 Tomcat——中,并使用数据库来存储以下内容:

  • 工作流定义
  • 当前运行的工作流实例,包括实例的状态和变量

Oozie 工作流是放置在控制依赖 DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop 的 Map/Reduce 作业、Pig 作业等),其中指定了动作执行的顺序。我们会使用 hPDL(一种 XML 流程定义语言)来描述这个图。

hPDL 是一种很简洁的语言,只会使用少数流程控制和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end 和 fail 节点)以及控制工作流执行路径的机制(decision、fork 和 join 节点)。动作节点是一些机制,通过它们工作流会触发执行计算或者处理任务。Oozie 为以下类型的动作提供支持: Hadoop map-reduce、Hadoop 文件系统、Pig、Java 和 Oozie 的子工作流(SSH 动作已经从 Oozie schema 0.2 之后的版本中移除了)。

所有由动作节点触发的计算和处理任务都不在 Oozie 之中——它们是由 Hadoop 的 Map/Reduce 框架执行的。这种方法让 Oozie 可以支持现存的 Hadoop 用于负载平衡、灾难恢复的机制。这些任务主要是异步执行的(只有文件系统动作例外,它是同步处理的)。这意味着对于大多数工作流动作触发的计算或处理任务的类型来说,在工作流操作转换到工作流的下一个节点之前都需要等待,直到计算或处理任务结束了之后才能够继续。Oozie 可以通过两种不同的方式来检测计算或处理任务是否完成,也就是回调和轮询。当 Oozie 启动了计算或处理任务的时候,它会为任务提供唯一的回调 URL,然后任务会在完成的时候发送通知给特定的 URL。在任务无法触发回调 URL 的情况下(可能是因为任何原因,比方说网络闪断),或者当任务的类型无法在完成时触发回调 URL 的时候,Oozie 有一种机制,可以对计算或处理任务进行轮询,从而保证能够完成任务。

Oozie 工作流可以参数化(在工作流定义中使用像 ${inputDir}之类的变量)。在提交工作流操作的时候,我们必须提供参数值。如果经过合适地参数化(比方说,使用不同的输出目录),那么多个同样的工作流操作可以并发。

一些工作流是根据需要触发的,但是大多数情况下,我们有必要基于一定的时间段和(或)数据可用性和(或)外部事件来运行它们。Oozie 协调系统(Coordinator system)让用户可以基于这些参数来定义工作流执行计划。Oozie 协调程序让我们可以以谓词的方式对工作流执行触发器进行建模,那可以指向数据、事件和(或)外部事件。工作流作业会在谓词得到满足的时候启动。

经常我们还需要连接定时运行、但时间间隔不同的工作流操作。多个随后运行的工作流的输出会成为下一个工作流的输入。把这些工作流连接在一起,会让系统把它作为数据应用的管道来引用。Oozie 协调程序支持创建这样的数据应用管道。

安装 Oozie

我们可以把 Oozie 安装在现存的 Hadoop 系统中,安装方式包括 tarball、RPM 和 Debian 包等。我们的 Hadoop 部署是 Cloudera 的 CDH3,其中已经包含了 Oozie。因此,我们只是使用 yum 把它拉下来,然后在 edge 节点 [1] 上执行安装操作。在 Oozie 的发布包中有两个组件——Oozie-client 和 Oozie-server。根据簇集的规模,你可以让这两个组件安装在同一台 edge 服务器上,也可能安装在不同的计算机上。Oozie 服务器中包含了用于触发和控制作业的组件,而客户端中包含了让用户可以触发 Oozie 操作并与 Oozie 服务器通信的组件。

想要了解更多关于安装过程的信息,请使用 Cloudera 发布包,并访问 Cloudera 站点 [2]

注: 除了包括安装过程的内容之外,它还建议把下面的 shell 变量 OOZIE_URL 根据需要添加到.login、.kshrc 或者 shell 的启动文件中:

复制代码
(export OOZIE_URL=http://localhost:11000/oozie)

简单示例

为了向你展示 Oozie 的使用方法,让我们创建一个简单的示例。我们拥有两个 Map/Reduce 作业 [3] ——一个会获取最初的数据,另一个会合并指定类型的数据。实际的获取操作需要执行最初的获取操作,然后把两种类型的数据——Lidar 和 Multicam——合并。为了让这个过程自动化,我们需要创建一个简单的 Oozie 工作流(代码 1)。

复制代码
<!--
Copyright (c) 2011 NAVTEQ! Inc. All rights reserved.
NGMB IPS ingestor Oozie Script
-->
<workflow-app xmlns='uri:oozie:workflow:0.1' name='NGMB-IPS-ingestion'>
<start to='ingestor'/>
<action name='ingestor'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.MapReduce.ips.IPSLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>${driveID}</arg>
</java>
<ok to="merging"/>
<error to="fail"/>
</action>
<fork name="merging">
<path start="mergeLidar"/>
<path start="mergeSignage"/>
</fork>
<action name='mergeLidar'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.hdfs.merge.MergerLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>-drive</arg>
<arg>${driveID}</arg>
<arg>-type</arg>
<arg>Lidar</arg>
<arg>-chunk</arg>
<arg>${lidarChunk}</arg>
</java>
<ok to="completed"/>
<error to="fail"/>
</action>
<action name='mergeSignage'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.navteq.assetmgmt.hdfs.merge.MergerLoader</main-class>
<java-opts>-Xmx2048m</java-opts>
<arg>-drive</arg>
<arg>${driveID}</arg>
<arg>-type</arg>
<arg>MultiCam</arg>
<arg>-chunk</arg>
<arg>${signageChunk}</arg>
</java>
<ok to="completed"/>
<error to="fail"/>
</action>
<join name="completed" to="end"/>
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end'/>
</workflow-app>

代码 1: 简单的 Oozie 工作流

这个工作流定义了三个动作:ingestor、mergeLidar 和 mergeSignage。并把每个动作都实现为 Map/Reduce [4] 作业。这个工作流从 start 节点开始,然后把控制权交给 Ingestor 动作。一旦 ingestor 步骤完成,就会触发 fork 控制节点 [4],它会并行地开始执行 mergeLidar 和 mergeSignage [5] 。这两个动作完成之后,就会触发 join 控制节点 [6] 。join 节点成功完成之后,控制权就会传递给 end 节点,它会结束这个过程。

创建工作流之后,我们需要正确地对其进行部署。典型的 Oozie 部署是一个 HDFS 目录,其中包含 workflow.xml(代码 1)、config-default.xml 和 lib 子目录,其中包含有工作流操作所要使用的类的 jar 文件。

(点击可以查看大图)

图1: Oozie 部署

config-default.xml 文件是可选的,通常其中会包含对于所有工作流实例通用的工作流参数。代码 2 中显示的是 config-default.xml 的简单示例。

复制代码
<configuration>
<property>
<name>jobTracker</name>
<value>sachicn003:2010</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://sachicn001:8020</value>
</property>
<property>
<name>queueName</name>
<value>default</value>
</property>
</configuration>

代码 2: Config-default.xml

完成了工作流的部署之后,我们可以使用 Oozie 提供的命令行工具 [5],它可以用于提交、启动和操作工作流。这个工具一般会运行在 Hadoop 簇集 [7] 的 edge 节点上,并需要一个作业属性文件(参见配置工作流属性),见代码 3。

复制代码
oozie.wf.application.path=<span color="#0000ff"><u>hdfs</u>://sachicn001:8020/user/<u>blublins</u>/<u>workflows</u>/IPSIngestion</span>
jobTracker=<span color="#0000ff">sachicn003:2010</span>
nameNode=<span color="#0000ff"><u>hdfs</u>://sachicn001:8020</span>

代码 3: 作业属性文件

有了作业属性,我们就可以使用代码 4 中的命令来运行 Oozie 工作流。

复制代码
oozie job –oozie http://sachidn002.hq.navteq.com:11000/oozie/ -D driveID=729-pp00002-2011-02-08-09-59-34 -D lidarChunk=4 -D signageChunk=20 -config job.properties –run

列表 4: 运行工作流命令

配置工作流属性

在 config-default.xml、作业属性文件和作业参数中有一些重叠,它们可以作为命令行调用的一部分传递给 Oozie。尽管文档中没有清晰地指出何时使用哪个,但总体上的建议如下:

  • 使用 config-default.xml 定义对于指定工作流从未改变过的参数。
  • 对于给定的工作流部署通用的参数,建议使用作业属性。
  • 对于指定的工作流调用特定的参数使用命令行参数。

Oozie 处理这三种参数的方式如下:

  • 使用所有命令行调用的参数
  • 如果那里有任何无法解析的参数,那么就是用作业配置来解析
  • 一旦所有其它方式都无法处理,那么就试着使用 config-default.xm。

我们可以使用 Oozie 控制台(图 2)来观察工作流执行的进程和结果。

(点击可以查看大图)

图2: Oozie 控制台

我们还可以使用Oozie 控制台来获得操作执行的细节,比方说作业的日志 [8] (图 3)。

(点击可以查看大图)

图 3: Oozie 控制台——作业日志

编程方式的工作流调用

尽管上面所述的命令行界面能够很好地用于手动调用 Oozie,但有时使用编程的方式调用 Oozie 更具有优势。当 Oozie 工作流是特定的应用程序或者大型企业过程的一部分,这就会很有用。我们可以使用 Oozie Web Services APIs [6] 或者 Oozie Java client APIs [7] 来实现这种编程方式的调用。代码 5 中展现的就是很简单的 Oozie Java 客户端的例子,它会触发上面描述的过程。

复制代码
package com.navteq.assetmgmt.oozie;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.WorkflowJob.Status;
public class WorkflowClient {
private static String OOZIE_URL = "http://sachidn002.hq.navteq.com:11000/oozie/";
private static String JOB_PATH = "hdfs://sachicn001:8020/user/blublins/workflows/IPSIngestion";
private static String JOB_Tracker = "sachicn003:2010";
private static String NAMENode = "hdfs://sachicn001:8020";
OozieClient wc = null;
public WorkflowClient(String url){
wc = new OozieClient(url);
}
public String startJob(String wfDefinition, List<WorkflowParameter> wfParameters)
throws OozieClientException{
// create a workflow job configuration and set the workflow application path
Properties conf = wc.createConfiguration();
conf.setProperty(OozieClient.APP_PATH, wfDefinition);
// setting workflow parameters
conf.setProperty("jobTracker", JOB_Tracker);
conf.setProperty("nameNode", NAMENode);
if((wfParameters != null) && (wfParameters.size() > 0)){
for(WorkflowParameter parameter : wfParameters)
conf.setProperty(parameter.getName(), parameter.getValue());
}
// submit and start the workflow job
return wc.run(conf);
}
public Status getJobStatus(String jobID) throws OozieClientException{
WorkflowJob job = wc.getJobInfo(jobID);
return job.getStatus();
}
public static void main(String[] args) throws OozieClientException, InterruptedException{
// Create client
WorkflowClient client = new WorkflowClient(OOZIE_URL);
// Create parameters
List<WorkflowParameter> wfParameters = new LinkedList<WorkflowParameter>();
WorkflowParameter drive = new WorkflowParameter("driveID","729-pp00004-2010-09-01-09-46");
WorkflowParameter lidar = new WorkflowParameter("lidarChunk","4");
WorkflowParameter signage = new WorkflowParameter("signageChunk","4");
wfParameters.add(drive);
wfParameters.add(lidar);
wfParameters.add(signage);
// Start Oozing
String jobId = client.startJob(JOB_PATH, wfParameters);
Status status = client.getJobStatus(jobId);
if(status == Status.RUNNING)
System.out.println("Workflow job running");
else
System.out.println("Problem starting Workflow job");
}
}

代码 5: 简单的 Oozie Java 客户端

在此,我们首先使用 Oozie 服务器 URL 对工作流客户端进行初始化。初始化过程完成之后,我们就可以使用客户端提交并启动作业(startJob 方法),获得正在运行的作业的状态(getStatus 方法),以及进行其他操作。

构建 java 动作,向工作流传递参数

在之前的示例中,我们已经展示了如何使用标签向 Java 节点传递参数。由于 Java 节点是向 Oozie 引入自定义计算的主要方法,因此能够从 Java 节点向 Oozie 传递数据也同样重要。

根据 Java 节点的文档 [3],我们可以使用“capture-output””元素把 Java 节点生成的值传递回给 Oozie 上下文。然后,工作流的其它步骤可以通过 EL-functions 访问这些值。返回值需要以 Java 属性格式文件写出来。我们可以通过“JavaMainMapper.OOZIE_JAVA_MAIN_CAPTURE_OUTPUT_FILE”常量从 System 属性中获得这些属性文件的名称。代码 6 是一个简单示例,演示了如何完成这项操作。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;
public class GenerateLookupDirs {
/**
* @param args
*/
public static final long dayMillis = 1000 * 60 * 60 * 24;
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
Calendar curDate = new GregorianCalendar();
int year, month, date;
String propKey, propVal;
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
for (int i = 0; I < 8; ++i) {
year = curDate.get(Calendar.YEAR);
month = curDate.get(Calendar.MONTH) + 1;
date = curDate.get(Calendar.DATE);
propKey = "dir"+i;
propVal = year + "-" +
(month < 10 ? "0" + month : month) + "-" +
(date < 10 ? "0" + date : date);
props.setProperty(propKey, propVal);
curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
}
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 6: 向 Oozie 传递参数

在这个示例中,我们假设在 HDFS 中有针对每个日期的目录。这样,这个类首先会获得当前日期,然后再获得离现在最近的 7 个日期(包括今天),然后把目录名称传递回给 Oozie。

结论

在本文我们介绍了 Oozie,它是针对 Hadoop 的工作流引擎,并且提供了使用它的简单示例。在下一篇文章中,我们会看到更复杂的例子,让我们可以更进一步讨论 Oozie 的特性。

致谢

非常感谢我们在 Navteq 的同事 Gregory Titievsky,他为我们提供了一些例子。

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。 他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。 Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。


[1] edge 节点是安装有 Hadoop 库的计算机,但不是真正簇集中的一部分。它是为能够连接到簇集中的应用程序所用的,并且会部署辅助服务以及能够直接访问簇集的最终用户应用程序。

[2] 请参看Oozie 安装的链接。

[3] 这些作业的细节和本文无关,所以在其中没有描述。

[4] Map/Reduce 作业能够以两种不同的方式在 Oozie 中实现——第一种是作为真正的 Map/Reduce 动作 [2],其中你会指定 Mapper 和 Reducer 类以及它们的配置信息;第二种是作为 Java 动作 [3],其中你会使用 Hadoop API 来指定启动 Map/Reduce 作业的类。因为我们所有的 Java 主函数都是使用 Hadoop API,并且还实现了一些额外的功能,所以我们选择了第二种方法。

[5] Oozie 确保两个动作会并行地提交给作业跟踪程序。在执行过程中实际的并行机制并不在 Oozie 的控制之内,并且依赖于作业的需求、簇集的能力以及 Map/Reduce 部署所使用的调度程序。

[6] join 动作的功能是要同步 fork 动作启动的多个并行执行的线程。如果 fork 启动的所有执行的线程都能够成功完成,那么 join 动作就会等待它们全部完成。如果有至少一个线程执行失败,kill 节点会“杀掉”剩余运行的线程。

[7] 这个节点不需要是安装了 Oozie 的计算机。

[8] Oozie 的作业日志会包含工作流执行的细节,想要查看动作执行的细节,我们需要切换到 Hadoop 的 Map/Reduce 管理页面。

查看英文原文: Introduction to Oozie


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2011-08-18 00:0053779
用户头像

发布了 340 篇内容, 共 126.0 次阅读, 收获喜欢 13 次。

关注

评论

发布
暂无评论
发现更多内容

Gradle构建多模块SpringBoot应用,mybatis面试常问问题

Java 程序员 后端

HashMap详解,hadoop源码分析完整版

Java 程序员 后端

hive学习笔记之三:内部表和外部表,java面试手写算法

Java 程序员 后端

Intellij IDEA神器那些让人爱不释手的小技巧,java高级程序员面试笔试

Java 程序员 后端

IDEA 一键部署 SpringBoot 项目到远程服务器 Docker 内

Java 程序员 后端

IDEA这样配置,好用到爆炸!(1),java基础入门第二版课后答案

Java 程序员 后端

Fluid 给数据弹性一双隐形的翅膀 -- 自定义弹性伸缩,多线程面试题2021

Java 程序员 后端

GitHub上标星90k+的《Java知识总结,java基础选择题填空

Java 程序员 后端

HashMap源码解析,操作系统原理与实践教程第三版答案

Java 程序员 后端

HashMap(jdk1,Java程序员进大厂面试必备基础技能

Java 程序员 后端

hive学习笔记之八:Sqoop,大厂Offer拿到手软啊

Java 程序员 后端

Elasticsearch的高阶使用方法有哪些?,rabbitmq入门案例

Java 程序员 后端

Github又爆神作,阿里JVM垃圾回收全解小册全网开源!,已开源

Java 程序员 后端

hive学习笔记之九:基础UDF,java入门书籍下载

Java 程序员 后端

IDEA-2021首个大版本发布,Java开发者感动哭了(附新亮点演示

Java 程序员 后端

IDEA这样配置,好用到爆炸!,金九银十怎么从中小企业挤进一线大厂

Java 程序员 后端

Hello Git快速入门,redis常见数据结构以及使用场景分析

Java 程序员 后端

GitHub上访问下载破百万的神仙文档《Java面试神技》看完我呆了

Java 程序员 后端

gRPC学习之三:初试GO版gRPC开发,Java面试题中高级

Java 程序员 后端

HTTP-2做错了什么?刚刚辉煌2年就要被弃用了!,mybatis底层工作原理

Java 程序员 后端

Github点赞接近 70k 的Spring Cloud学习教程+实战项目推荐!牛批

Java 程序员 后端

Helm部署的服务如何修改配置,nginx面试题负载均衡

Java 程序员 后端

hive学习笔记之七:内置函数,mybatis防止sql注入原理

Java 程序员 后端

hive学习笔记之三:内部表和外部表(1),贼好用的Java学习路线集合

Java 程序员 后端

透过表象看REST

Jxin

GateWay 网关服务,java程序员进阶路线

Java 程序员 后端

Github神作!2021Java秋招高级面试指南,吃透至少阿里P6

Java 程序员 后端

GitHub调优热榜,居然是腾讯T9熬肝撰写的594页MySQL优化手册,简直太香

Java 程序员 后端

golang实战之flag包,Redis灵魂14问

Java 程序员 后端

HTML笔记 —— 列表,和快手大佬的技术面谈

Java 程序员 后端

IDEA 一键部署 SpringBoot 项目到远程服务器 Docker 内(1)

Java 程序员 后端

Oozie简介_Java_Boris Lublinsky_InfoQ精选文章