写点什么

扩展 Oozie

2011 年 10 月 20 日

在前面的两篇文章中 [ 1 2 ],我们描述了 Oozie 工作流服务器,并且展示了几个工作流的示例。我们还描述了针对 Oozie 的工作流的部署和配置,以及用来启动、停止和监控 Oozie 工作流的工具。

在本文中,我们会向你展示 Oozie 的可扩展性,并说明它是如何支持我们实现自定义的、协同工作的语言扩展。

为什么需要自定义节点(Custom Node)?

正如我们在文章 [1] 中所说明的,Oozie 之所以与众不同,是因为它提供了一种“最小化”的工作流语言,其中只包含少数几种控制和动作节点。尽管其中的一种动作节点是 java 动作节点,它让我们可以从 Oozie 工作流调用任意一个带有 main 方法的 java 类,但这种方法并非总是最佳的。原因之一就在于,java 动作是在 Hadoop 簇集中作为 map-reduce 作业执行的,并且只带有唯一的 Mapper 任务。一方面,这带来了很多好处:

  • 它拥有内建的可伸缩性以及对 map/reduce 框架的灾难恢复支持,这样我们就不必这些特性构建到 Oozie 中。
  • 外部执行机制,这让 Oozie 引擎变得更轻量级,从而可以支持更多并发运行的过程。

另一方面,这种方法也有一些缺点:

  • 它把每个 java 节点都作为 mapper 任务启动,这会导致在 Hadoop 簇集中启动新的 JVM 而产生额外的开销。
  • 在外部执行 java 类导致了额外的网络传输,用于与 Oozie 服务器同步这些执行结果。
  • 从 java 节点传递参数成了非常耗费资源的操作。

尽管如此,在运行时间相对较长(几分钟甚至几小时)的 map/reduce 或者 Pig 作业中,好处会大大超过负载的缺点,但是,在简单的 java 节点中(参见 [2]),我们就需要注意外部执行所导致的开销了。所以,使用自定义动作节点的原因之一,就是为了支持在 Oozie 的执行上下文中直接执行轻量级的 java 类 [1]

使用自定义动作的另一个原因是为了提高工作流的语义和可读性。由于 Oozie 是一种支持基于 Hadoop 处理组件的工作流引擎,所以它的语法完全是以 Hadoop 执行为中心的——Hadoop 文件系统、map/reduce、Pig 等等。这种语法能够很好地符合 Hadoop 开发者的习惯,但是并没有涉及到太多关于给定动作的功能信息。我们可以为动作本身制定与业务相关的命名转换规则,但这只是特别用来解决问题的——对动作的命名只反映了给定过程的语法,既不是总体上的主题领域,也不能解决动作参数的问题,那些问题仍然只能由开发者来解决。

幸运的是,Oozie 支持非常棒的扩展机制——自定义动作节点 [3],它让我们可以很容易地解决这两个问题。自定义的动作节点让我们可以使用附加的动作(动词)来扩展 Oozie 的语言。Oozie 的动作节点可以是同步的,也可以是异步的。

  • 同步节点——它在 Oozie 内部执行,在继续执行之前会等待这些节点完成动作。这些节点是为轻量级的任务所用的,像自定义计算,文件系统的移动、创建目录、删除等等。
  • 异步节点——它是由 Oozie 启动的,但是在 Oozie 引擎的外部执行,它会监控正在执行的动作,直到完成。这是通过动作的回调或者 Oozie 针对动作状态的 polling 操作完成的。

实现 Oozie 自定义动作处理程序

在这个例子中,我们会对独立的邮件程序进行转换,并展现到自定义的 email 动作中 [2](代码 1)。

复制代码
package com.navteq.assetmgmt.oozie.custom;
import java.util.Properties;
import java.util.StringTokenizer;
import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.ActionExecutorException.ErrorType;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.Namespace;
public class EmailActionExecutor extends ActionExecutor {
private static final String NODENAME = "eMail";
private static final String SUCCEEDED = "OK";
private static final String FAILED = "FAIL";
private static final String KILLED = "KILLED";
private static final String DEFAULMAILSERVER = "imailchi.navtech.com";
private static final String EMAILSERVER = "emailServer";
private static final String SUBJECT = "emailSubject";
private static final String MESSAGE = "emailBody";
private static final String FROM = "emailFrom";
private static final String TO = "emailTo";
public EmailActionExecutor() {
super(NODENAME);
}
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
// Should not be called for synch operation
throw new UnsupportedOperationException();
}
@Override
public void end(Context context, WorkflowAction action)throws ActionExecutorException {
String externalStatus = action.getExternalStatus();
WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ?
WorkflowAction.Status.OK : WorkflowAction.Status.ERROR;
context.setEndData(status, getActionSignal(status));
}
@Override
public boolean isCompleted(String arg0) {
return true;
}
@Override
public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
}
@Override
public void start(Context context, WorkflowAction action) throws ActionExecutorException {
// Get parameters from Node configuration
try{
Element actionXml = XmlUtils.parseXml(action.getConf());
Namespace ns = Namespace.getNamespace("uri:custom:email-action:0.1");
String server = actionXml.getChildTextTrim(EMAILSERVER, ns);
String subject = actionXml.getChildTextTrim(SUBJECT, ns);
String message = actionXml.getChildTextTrim(MESSAGE, ns);
String from = actionXml.getChildTextTrim(FROM, ns);
String to = actionXml.getChildTextTrim(TO, ns);
// Check if all parameters are there
if(server == null)
server = DEFAULMAILSERVER;
if((message == null) || (from == null) || (to == null))
throw new ActionExecutorException(ErrorType.FAILED,
ErrorCode.E0000.toString(), "Not all parameters are defined");
// Execute action synchronously
SendMail(server, subject, message, from, to);
context.setExecutionData(SUCCEEDED, null);
}
catch(Exception e){
context.setExecutionData(FAILED, null);
throw new ActionExecutorException(ErrorType.FAILED,
ErrorCode.E0000.toString(), e.getMessage());
}
}
// Sending an email
public void SendMail(String server, String subject, String message,
String from, String to) throws Exception {
// create some properties and get the default Session
Properties props = new Properties();
props.setProperty("mail.smtp.host", server);
Session session = Session.getDefaultInstance(props, null);
// create a message
Message msg = new MimeMessage(session);
// set the from and to address
InternetAddress addressFrom = new InternetAddress(from);
msg.setFrom(addressFrom);
// To is a comma separated list
StringTokenizer st = new StringTokenizer(to, ",");
String [] recipients = new String[st.countTokens()];
int rc = 0;
while(st.hasMoreTokens())
recipients[rc++] = st.nextToken();
InternetAddress[] addressTo = new InternetAddress[recipients.length];
for (int i = 0; i < recipients.length; i++){
addressTo[i] = new InternetAddress(recipients[i]);
}
msg.setRecipients(Message.RecipientType.TO, addressTo);
// Setting the Subject and Content Type
msg.setSubject(subject);
msg.setContent(message, "text/plain");
Transport.send(msg);
}
}

代码 1: Email 自定义动作

这个实现对 ActionExecutor [2] 类(由 Oozie 提供)进行了扩展,并重写了一些必要的方法。因为邮件的发送过程是一种非常快速的操作,所以我们决定将其实现为同步的动作处理程序,那意味着它会在 Oozie 的执行上下文中执行。

我们的实现(代码 1)遵循了 Oozie 文档并实现了所有必需的方法:

  • 对于所有自定义动作处理程序都需要没有参数的构造函数。这个构造函数会注册动作处理程序的名称(使用动作名称来调用父类),我们会在工作流 XML 中使用它。
  • 我们可以使用 InitActionType [3] 方法来注册执行动作时可能发生的异常,以及它们的类型和错误信息,并为执行程序本身执行初始化操作。
  • Start 方法是用来启动动作的执行操作的。因为我们实现的是同步动作,所以整个动作都会在此执行。这个方法是由 Oozie 调用的,它有两个参数:Context 和 WorkflowAction。Context 参数提供了对 Oozie 工作流执行上下文的访问,其中主要包含了工作流的变量,并提供了对其进行操作的非常简单的 API(set、get) [4] 。而 WorkflowAction 则提供了 Oozie 对当前动作的定义。
  • 还有 Check 方法,Oozie 会使用它来检查动作的状态。它不能作为同步动作来调用。
  • Kill 方法,可以用来中断运行的作业或者动作。
  • End 方法,可以用于所有清理动作,或者用于可能在完成动作之后所要做的处理。它还需要设置执行的结果。

部署并使用 Oozie 自定义动作处理程序

实现了自定义的动作执行方式之后,我们需要为新的 email 动作定义 XML 模式 [5] (代码 2)。

复制代码
<?<span color="#008080">xml</span> <span color="#800080">version</span>=<i><span color="#0000ff">"1.0"</span></i> <span color="#800080">encoding</span>=<i><span color="#0000ff">"UTF-8"</span></i>?>
<<span color="#008080">xs:schema</span> <span color="#800080">xmlns:xs</span>=<span color="#0000ff"><i>"http://www.w3.org/2001/XMLSchema"</i> </span>
                 <span color="#800080">xmlns:email</span>=<i><span color="#0000ff">"uri:custom:email-action:0.1"</span></i>
     <span color="#800080">elementFormDefault</span>=<i><span color="#0000ff">"qualified"</span></i>
     <span color="#800080">targetNamespace</span>=<i><span color="#0000ff">"uri:custom:email-action:0.1"</span></i>>
<<span color="#008080">xs:complexType</span> <span color="#800080">name</span>=<i><span color="#0000ff">"EMAIL"</span></i>>
       <<span color="#008080">xs:sequence</span>>
                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailServer"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> <span color="#800080">minOccurs</span>=<i><span color="#0000ff">"0"</span></i> <span color="#800080">maxOccurs</span>=<i><span color="#0000ff">"1"</span></i> />
                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailSubject"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> />
                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailFrom"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> />
                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailTo"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> />
                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailBody"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> />
       </<span color="#008080">xs:sequence</span>>
</<span color="#008080">xs:complexType</span>>
<p><<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"eMail"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"email:EMAIL"</span></i>></<span color="#008080">xs:element</span>><br></br></<span color="#008080">xs:schema</span>></p>

代码 2: 为 email 组件所用的 XML schema

自定义动作节点和 XML schema 文件都需要打包在单独的 jar 文件中,比方说 emailAction.Jar。我们可以使用 Oozie 的 oozie-setup.sh 脚本执行下面的命令,从而把这个(以及其他所有)jar 文件添加到 Oozie 的 war 文件中。

复制代码
$ bin/oozie-setup.sh -jars emailAction.jar:mail.jar (See Adding Jars to Oozie)

代码 3: 部署命令

向 Oozie 添加 Jar 文件

你要知道,Cloudera 推荐的 oozie-setup.sh 命令行会重新构建你的 war 文件,并且, 如果你使用网页来监控作业,那么就会丢失 java 的脚本扩展。在测试方面,我们难以同时包含 -extjs 和 -jars 选项。作为权宜之计,我们会把 jar 文件复制到 ${CATALINA_BASE}/webapps/oozie/WEB-INF/lib 中,其中 ${CATALINA_BASE}代表的是 /var/lib/oozie/oozie-server。请注意,这里存在一定的风险,如果其他人重新构建了 war 文件,那么你就会丢失这些扩展,并且他们以后需要手动添加。对于测试来说,我们建议复制 jar 文件,然而,对于生产环境,我们建议把 jar 添加到 war 文件中。

现在我们需要把关于自定义执行器的信息注册到 Oozie 的运行时中。这是通过扩展 oozie-site.xml 完成的 [6] 。我们可以通过在 Oozie 配置文件 oozie-site.xml 中添加或者修改“oozie.service.ActionService.executor.ext.classes” [7] 来注册自定义动作本身(代码 4)。

复制代码
……………………………………
<property>
       <name>oozie.service.ActionService.executor.ext.classes</name>
       <value>com.navteq.assetmgmt.oozie.custom. EmailActionExecutor </value>
</property>
……………………………………

代码 4: 自定义执行配置

为新动作(代码 2)所用的 XML schema 应该添加到 oozie-site.xml 中,位于属性“oozie.service.WorkflowSchemaService.ext.schemas” [8] 之下(代码 5)。

复制代码
………………………………………
<property>
       <name>oozie.service.SchemaService.wf.ext.schemas</name>
       <value> emailAction.xsd</value>
</property>
…………………………………

代码 5: 自定义模式配置

最后,Tomcat 启动之后,我们就可以在工作流中使用自定义的动作节点了。

为了测试我们的实现,我们创建了简单的工作流(代码 6),它会使用我们的执行器来发送 email。

复制代码
<!--
Copyright (c) 2011 NAVTEQ! Inc. All rights reserved.
Test email <u>Oozie</u> Script
-->
<span color="#008080"><workflow-app <span color="#800080">xmlns</span><span color="#000000">=</span><i><span color="#0000ff">'uri:oozie:workflow:0.1'</span></i> <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'emailTester'</span></i>></span>
 <span color="#008080"><start <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">'simpleEmail'</span></i>/></span>
 <span color="#008080"><action <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'simpleEmail'</span></i>></span>
   <span color="#008080"><eMail <span color="#000000">xlmns</span>=“ uri:custom:email-action:0.1”></span>
     <span color="#008080"><emailSubject><span color="#000000">test</span></emailSubject></span>
     <span color="#008080"><emailFrom><span color="#000000">mike.segel<u>@<mycompany>.com</u></span></emailFrom></span>
     <span color="#008080"><emailTo><span color="#000000">boris.lublinsky<u>@<mycompany>.com</u></span></emailTo></span>
     <span color="#008080"><emailMessage><span color="#000000">This is a test message, if you can see this, Mikey did something right! :)</span></emailMessage></span>
   <span color="#008080"></eMail></span>
  <span color="#008080"><error <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">"fail"</span></i>/></span>
  <span color="#008080"><ok <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">"end"</span></i>/></span>
 <span color="#008080"></action></span>
 <span color="#008080"><kill <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">"fail"</span></i>></span>
   <span color="#008080"><message><span color="#000000"><u>Workflow</u> failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</span></message></span>
 <span color="#008080"></kill></span>
 <span color="#008080"><end <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'end'</span></i>/></span>
<span color="#008080"></workflow-app></span>

代码 6: 使用 email 自定义动作执行器的简单工作流

结论

在这篇文章中,我们展示了如何通过创建自定义的动作执行器来扩展 Oozie。这样做让我们可以定义和实现部门或者企业专用的 Oozie 语言(领域专用语言),其中具有部门或者企业的功能。这样的领域专用语言能够简化特定部门或者企业的构建过程,并提高代码的可读性。

尽管 Oozie 相对还不够成熟,但是它已经为处理包含多个 map/reduce 作业、并且能够向总体的业务过程添加非 map-reduce 作业的过程提供了基本的框架。随着越来越多的用户使用 Oozie 并提供反馈,我们相信它有足够的潜力,可以成为 Hadoop 环境中强大的集成部分。

对于 Oozie 来说,还有很多我们没有在这三篇文章中包含的内容。我们只是期望它们可以引起你对 Oozie 工作流引擎的足够兴趣,并且能够成为进一步研究 Oozie 的不错的起点。

参考信息

  1. Boris Lublinsky, Mike Segel. Introduction to Oozie.
  2. Boris Lublinsky, Mike Segel. Oozie by Example
  3. Oozie custom Action Nodes
  4. Oozie source code

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。


[1] 这种类的例子可能是各种计数器操作、简单的计算等等。

[2] 所有 Oozie 动作执行器都是 Oozie 分发程序的一部分,它们都是通过扩展这个类实现的。

[3] 在我们的实现中,我们使用的是默认的实现,这样就是为什么没有在代码中展现的原因。你可以查看一下 Oozie 的源代码 [4],就可以知道在现存的 Oozie 动作处理程序中是如何实现这个方法的。

[4] 配置自定义执行器有两种方式——工作流变量和 / 或动作配置。在我们的例子中展示的是后者,但是在实际情况中,实际上总是二者的组合。

[5] 确保不仅要定义复杂的类型,还要对元素进行定义。那是 Oozie 所期望的。

[6] 通常在 Oozie 的分发包中叫做 oozie-default.xml。

[7] 对于多个执行器,类名应该以逗号分隔。

[8] 对于多次执行,我们可以为多种模式使用逗号分隔的列表。

查看英文原文: Extending Oozie


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2011 年 10 月 20 日 00:005495
用户头像

发布了 340 篇内容, 共 113.6 次阅读, 收获喜欢 4 次。

关注

评论

发布
暂无评论
发现更多内容

《Web应用安全权威指南》.pdf

田维常

【JAVA】TreeSet, LinkedHashSet和HashSet差异对比

莫问

第八周课后练习

knight

第八周总结

alpha

极客大学架构师训练营

高交会:高新企业源中瑞在此出展区块链BAAS技术

13530558032

分分钟玩转SpringBoot自定义注解

比伯

Java 大数据 编程 架构 编程语言

作业-第4周

arcyao

第八章作业

alpha

极客大学架构师训练营

python+requests进行get、post方法接口测试

测试人生路

Python 接口测试

腾讯强推Redis成长手册!原理+应用+集群+拓展+源码五飞

小Q

Java redis 学习 架构 面试

《身边的金钱心理学》

石云升

高交会:高新企业源中瑞在此出展区块链BAAS技术

WX13823153201

架构师训练营 第四周作业

文江

作业-第4周总结

arcyao

第四周作业

Jack

区块链治理的真实价值在哪里

CECBC区块链专委会

区块链 治理 治理机制

架构师训练营第 8 周作业

netspecial

极客大学架构师训练营

极客大学架构师训练营第一期第八周总结

睡不着摇一摇

架构师一期

JVM真香系列:图解垃圾回收器

田维常

JVM 垃圾回收

“懂行”的价值循环与蝴蝶风暴

脑极体

训练营第四周总结

大脸猫

极客大学架构师训练营

网络时间协议介绍以及服务器同步网络时间

MySQL从删库到跑路

ntp 时间同步

“区块链+营销”:科技力量助力行业前行

CECBC区块链专委会

市场营销

架构师训练营第 8 周学习总结

netspecial

极客大学架构师训练营

架构师训练营第 1 期 - 第八周总结

Todd-Lee

极客大学架构师训练营

成为架构师 - 架构师训练营第 04周

陈永龙Vincent

【可下载】2020年底收官!为大家整理了物联网行业全面研究报告、行业洞察、白皮书……

IoT云工坊

人工智能 大数据 5G 物联网 智能家居

家谱链亮相高交会,点亮“区块链+文化”融合发展之路

13530558032

训练营第四周作业

大脸猫

极客大学架构师训练营

【Mycat】Mycat核心开发者带你轻松掌握Mycat路由转发!!

冰河

分布式 微服务 分库分表 中间件 mycat

家谱链亮相高交会,点亮“区块链+文化”融合发展之路

WX13823153201

家谱链亮相高交会

「中国技术开放日·长沙站」现场直播

「中国技术开放日·长沙站」现场直播

扩展Oozie-InfoQ