最新发布《数智时代的AI人才粮仓模型解读白皮书(2024版)》,立即领取! 了解详情
写点什么

跟着示例学 Oozie

  • 2011-08-26
  • 本文字数:8273 字

    阅读完需:约 27 分钟

在前一篇文章《 Oozie 简介》中,我们已经描述了 Oozie 工作流服务器,并且展示了一个非常简单的工作流示例。我们还描述了针对 Oozie 的工作流的部署和配置,以及用来启动、停止和监控 Oozie 工作流的工具。

在本文中,我们会描述一个更加复杂的例子,通过它我们可以讨论更多 Oozie 特性,并演示如何来使用它们。

定义过程

我们在此描述的工作流会实现汽车 GPS 探测数据的获取过程。我们每个小时都会以文件的形式把探测数据传递到指定的 HDFS 目录中 [1] ,其中包含有这个小时之内的所有探测数据。探测数据的获取是每天针对一天内所有的 24 个文件完成的。如果文件的数量是 24,那么获取过程就会启动。否则:

  • 当天什么都不做
  • 对前一天——最多到 7 天,发送剩下的内容到探测数据提供程序
  • 如果目录的存在时间已达到 7 天,那么就获取所有可用的探测数据文件。

过程的总体实现请见图 1

(点击可以查看大图)。

图1: 过程图

在此,主流程(数据获取流程)首先会为今天以及之前的六天计算出目录的名称,然后启动(fork)七个目录的子过程(子流程)。待所有子过程的状态都变成终止之后,join 步骤就会把控制权交给end 状态。

子过程启动时,首先会获得关于目录的信息——它的日期以及文件数量。基于这条信息,它会决定是获取数据还是把数据归档,或者发送剩下的邮件,或者不做任何工作。

Directory 子过程实现

以下代码负责实现的是 directory 子过程(代码 1)。

复制代码
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'>
<start to='getDirInfo' />
<!-- STEP ONE -->
<action name='getDirInfo'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GetDirInfo</main-class>
<arg>${inputDir}</arg>
<capture-output />
</java>
<ok to="makeIngestDecision" />
<error to="fail" />
</action>
<!-- STEP TWO -->
<decision name="makeIngestDecision">
<switch>
<!-- empty or doesn't exist -->
<case to="end">
${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||
(wf:actionData('getDirInfo')['dir.age'] lt 1 and
wf:actionData('getDirInfo')['dir.num-files'] lt 24)}
</case>
<!-- # of files >= 24 -->
<case to="ingest">
${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||
wf:actionData('getDirInfo')['dir.age'] gt 6}
</case>
<default to="sendEmail"/>
</switch>
</decision>
<!--EMAIL-->
<action name="sendEmail">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.StandaloneMailer</main-class>
<arg>probedata2@navteq.com</arg>
<arg>gregory.titievsky@navteq.com</arg>
<arg>${inputDir}</arg>
<arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg>
<arg>${wf:actionData('getDirInfo')['dir.age']}</arg>
</java>
<ok to="end" />
<error to="fail" />
</action>
<!--INGESTION -->
<action name="ingest">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}" />
</prepare>
<configuration>
<property>
<name>mapred.reduce.tasks</name>
<value>300</value>
</property>
</configuration>
<main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>
<arg>-conf</arg>
<arg>action.xml</arg>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to=" archive-data" />
<error to="ingest-fail" />
</action>
<!—Archive Data -->
<action name="archive-data">
<fs>
<move source='${inputDir}' target='/probe/backup/${dirName}' />
<delete path = '${inputDir}' />
</fs>
<ok to="end" />
<error to="ingest-fail" />
</action>
<kill name="ingest-fail">
<message>Ingestion failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

代码 1: Directory 子过程

这个子过程的 start 节点会触发自定义的 java 节点,这个节点会获得目录信息(代码 2)。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.GregorianCalendar;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class GetDirInfo {
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
String dirPath = args[0];
String propKey0 = "dir.num-files";
String propVal0 = "-1";
String propKey1 = "dir.age";
String propVal1 = "-1";
System.out.println("Directory path: '"+dirPath+"'");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path hadoopDir = new Path(dirPath);
if (fs.exists(hadoopDir)){
FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir);
int numFilesInDir = files.length;
propVal0 = Integer.toString(numFilesInDir);
long timePassed, daysPassedLong;
int daysPassed;
String dirName = hadoopDir.getName();
String[] dirNameArray = dirName.split("-");
if (dirNameArray.length == 3) {
int year = Integer.valueOf(dirNameArray[0]);
int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 based
int date = Integer.valueOf(dirNameArray[2]);
GregorianCalendar dirCreationDate = new GregorianCalendar(year,
month, date);
timePassed = (new GregorianCalendar()).getTimeInMillis()
- dirCreationDate.getTimeInMillis();
daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;;
propVal1 = Integer.toString(daysPassed);
}
}
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
props.setProperty(propKey0, propVal0);
props.setProperty(propKey1, propVal1);
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 2: 获得目录信息的节点

这个类会获得目录名作为输入的参数,并首先检查该目录是否存在。如果目录不存在,那么存在时间(age)和文件数量都会返回 -1,否则,这两个值就会返回给子过程。

子过程的下一步是一个 switch(决定)声明,它会决定如何处理目录。如果目录不存在(文件数 < 0),或者是当前日期(存在时间 < 1)并且文件数量少于 24(文件数 < 24),那么子过程就会直接转换到终止状态。如果所有文件都位于子目录中(文件数 > 23)或者目录是在至少七天前创建的(存在时间 > 6),那么就会有如下操作:

  • 使用现存的 Map/reduce 程序 [2] 获取数据
  • 目录会备份在数据归档中,然后删除

对 action 节点的其它配置

获取动作向你展示了另外一些 Oozie 配置参数,包括: - Prepare——如果出现了 prepare 参数,就意味着在启动作业(job)之前会删除路径列表。这应该专门用于清理目录。删除操作会在 fs.default.name 文件系统中执行。

  • Configuration——如果出现了 configuration 元素,它其中就会包含针对 Map/Reduce 作业的 JobConf 属性。它不仅可以用于 map/reduce 动作, 而且还可以用于启动 map/reduce 作业的 java 动作。

如果不是以上两种情况,那么子过程就会发送剩余的邮件,然后退出。邮件是作为另一个 java 主类实现的(代码 3)。

复制代码
package com.navteq.oozie;
import java.util.Properties;
import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
public class StandaloneMailer {
private static String _mServer = "imailchi.navtech.com";
private static Properties _props = null;
private StandaloneMailer(){}
public static void init(String mServer){
_mServer = mServer;
_props = new Properties();
_props.setProperty("mail.smtp.host", _mServer);
}
public static void SendMail(String subject, String message, String from, String to) throws Exception {
// create some properties and get the default Session
Session session = Session.getDefaultInstance(_props, null);
// create a message
Message msg = new MimeMessage(session);
// set the from and to address
InternetAddress addressFrom = new InternetAddress(from);
msg.setFrom(addressFrom);
String [] recipients = new String[] {to};
InternetAddress[] addressTo = new InternetAddress[recipients.length];
for (int i = 0; i < recipients.length; i++){
addressTo[i] = new InternetAddress(recipients[i]);
}
msg.setRecipients(Message.RecipientType.TO, addressTo);
// Setting the Subject and Content Type
msg.setSubject(subject);
msg.setContent(message, "text/plain");
Transport.send(msg);
}
public static void main (String[] args) throws Exception {
if (args.length ==5){
init(_mServer);
StringBuilder subject = new StringBuilder();
StringBuilder body = new StringBuilder();
subject.append("Directory ").append(args[2]).append(" contains").append(args[3]).append(" files.");
body.append("Directory ").append(args[2]).append(" is ").append(args[4]).
append(" days old and contains only ").append(args[3]).append(" files instead of 24.");
SendMail(subject.toString(), body.toString(), args[0], args[1]);
}
else throw new Exception("Invalid number of parameters provided for email");
}
}

列表 3: 发送提醒邮件

这是使用了 javax.mail API 的简单实现,用于发送邮件。

主过程的实现

我们已经实现了子过程,然后,对主过程的实现就变得非常简单了(列表 4) [3]

复制代码
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'>
<start to='getDirs2Process' />
<!-- STEP ONE -->
<action name='getDirs2Process'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GenerateLookupDirs</main-class>
<capture-output />
</java>
<ok to="forkSubWorkflows" />
<error to="fail" />
</action>
<fork name="forkSubWorkflows">
<path start="processDir0"/>
<path start="processDir1"/>
<path start="processDir2"/>
<path start="processDir3"/>
<path start="processDir4"/>
<path start="processDir5"/>
<path start="processDir6"/>
<path start="processDir7"/>
</fork>
<action name="processDir0">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>activeDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
….
<action name="processDir7">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="end"/>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

代码 4: 数据获取主过程

这个过程首先会触发 java 节点,计算需要处理的目录列表(列表 5),然后对每个目录执行子过程,从而处理给定的目录。

复制代码
package com.navteq.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;
public class GenerateLookupDirs {
public static final long dayMillis = 1000 * 60 * 60 * 24;
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
public static void main(String[] args) throws Exception {
Calendar curDate = new GregorianCalendar();
int year, month, date;
String propKey, propVal;
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
for (int i = 0; i<8; ++i)
{
year = curDate.get(Calendar.YEAR);
month = curDate.get(Calendar.MONTH) + 1;
date = curDate.get(Calendar.DATE);
propKey = "dir"+i;
propVal = year + "-" +
(month < 10 ? "0" + month : month) + "-" +
(date < 10 ? "0" + date : date);
props.setProperty(propKey, propVal);
curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
}
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}

代码 5: 目录计算程序

结论

在这篇文章中,我们向你展示了一个更复杂的完整的工作流示例,它让我们可以演示更多的 Oozie 特性以及对它们的应用。在下一篇文章中,我们会讨论构建可重用的 Oozie 组件库,并使用自定义的节点扩展 Oozie。

致谢

非常感谢我们在 Navteq 的同事 Gregory Titievsky,他为我们实现了大部分代码。

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。

参考信息

1. Boris Lublinsky Mike Segel 《Oozie 简介》


[1] 目录的名称是搜集这条数据的日期。

[2] 这是已经存在的程序,对它的描述与本文无关。

[3] 在此省略了一些重复代码。

查看英文原文: Oozie by Example


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

公众号推荐:

跳进 AI 的奇妙世界,一起探索未来工作的新风貌!想要深入了解 AI 如何成为产业创新的新引擎?好奇哪些城市正成为 AI 人才的新磁场?《中国生成式 AI 开发者洞察 2024》由 InfoQ 研究中心精心打造,为你深度解锁生成式 AI 领域的最新开发者动态。无论你是资深研发者,还是对生成式 AI 充满好奇的新手,这份报告都是你不可错过的知识宝典。欢迎大家扫码关注「AI前线」公众号,回复「开发者洞察」领取。

2011-08-26 00:0030494
用户头像

发布了 340 篇内容, 共 126.1 次阅读, 收获喜欢 13 次。

关注

评论

发布
暂无评论
发现更多内容

Docker从入门到精通:ubuntu系统安装docker

霍格沃兹测试开发学社

2024九章云极DataCanvas智算操作系统新品发布会震撼来袭!

九章云极DataCanvas

enkins如何请求http接口及乱码问题解决

百度搜索:蓝易云

云计算 Linux 运维 HTTP jenkins

SQLServer如何获取客户端IP

百度搜索:蓝易云

sql 云计算 Linux 运维 IP

Downie 4 :mac电脑视频下载抓取工具

Rose

零基础到精通,Postman安装使用教程(一)

霍格沃兹测试开发学社

2024.04.17 Flink 资料整理

Joseph295

从零基础到精通,抓包神器fiddler保姆级使用教程(一)

霍格沃兹测试开发学社

使用 TypeScript 从零搭建自己的 Web 框架:AOT 编译

RoyLin

typescript

腾讯会议发布腾讯天籁inside3.0,为厂商提供AI音视频算法解决方案

Geek_2d6073

鸿蒙HarmonyOS实战-ArkUI组件(RelativeContainer)

蜀道山

鸿蒙 HarmonyOS 鸿蒙开发 arkui ArkTS

学习 Go 语言,有哪些优质的开源项目

宇宙之一粟

GitHub Go 语言

chrome浏览器插件谷歌访问助手mac版下载及安装

Rose

centos下安装jenkins.war

百度搜索:蓝易云

Java Linux centos 运维 jenkins

AnyGo for Mac中文破解版:路线模拟、批量定位更改、实时位置统计

Rose

深入了解 Docker:革命性的容器化技术

霍格沃兹测试开发学社

小程序技术实现前端热更新的优势

FinFish

小程序容器 小程序技术 小程序热更新 小程序运行能力

腾讯音乐:说说Redis脑裂问题?

王磊

Java 面试

PPTX文件怎么打开?2个技巧助你轻松搞定职场办公!

彭宏豪95

效率工具 PPT 在线白板 PPT模板 办公软件

哨兵模式的悲和喜

算法的秘密

使用 TypeScript 从零搭建自己的 Web 框架:大语言模型与 SSE

RoyLin

typescript

Docker从入门到精通:Docker镜像相关命令学习

霍格沃兹测试开发学社

Logstash同步MySQL数据到ElasticSearch

百度搜索:蓝易云

MySQL elasticsearch Linux 运维 Logstash

Debian下dpkg-query命令怎么用

百度搜索:蓝易云

云计算 Linux 运维 云服务器 Debian

使用 TypeScript 从零搭建自己的 Web 框架:AI 工程化

RoyLin

typescript

哪里有Photoshop 2021中文版资源?如何破解ps2021?

Rose

提高 RAG 应用准确度,时下流行的 Reranker 了解一下?

Zilliz

Zilliz rag reranker

《SQL必知必会(第5版)》PDF

程序员李木子

这些Git事故灾难, 你经历过几个?

前夕

git 面试 前端 后端 版本控制

ChatGPT4.5:能力大提升,全新体验

蓉蓉

openai ChatGPT GPT-4

面试宝典

Joseph295

跟着示例学Oozie_Java_Boris Lublinsky_InfoQ精选文章