扩展 Oozie

  • 2011-10-19
  • 本文字数:8915 字

    阅读完需:约 29 分钟

在前面的两篇文章中 [ 1 2 ],我们描述了 Oozie 工作流服务器,并且展示了几个工作流的示例。我们还描述了针对 Oozie 的工作流的部署和配置,以及用来启动、停止和监控 Oozie 工作流的工具。

在本文中,我们会向你展示 Oozie 的可扩展性,并说明它是如何支持我们实现自定义的、协同工作的语言扩展。

为什么需要自定义节点(Custom Node)?

正如我们在文章 [1] 中所说明的,Oozie 之所以与众不同,是因为它提供了一种“最小化”的工作流语言,其中只包含少数几种控制和动作节点。尽管其中的一种动作节点是 java 动作节点,它让我们可以从 Oozie 工作流调用任意一个带有 main 方法的 java 类,但这种方法并非总是最佳的。原因之一就在于,java 动作是在 Hadoop 簇集中作为 map-reduce 作业执行的,并且只带有唯一的 Mapper 任务。一方面,这带来了很多好处:

  • 它拥有内建的可伸缩性以及对 map/reduce 框架的灾难恢复支持,这样我们就不必这些特性构建到 Oozie 中。
  • 外部执行机制,这让 Oozie 引擎变得更轻量级,从而可以支持更多并发运行的过程。

另一方面,这种方法也有一些缺点:

  • 它把每个 java 节点都作为 mapper 任务启动,这会导致在 Hadoop 簇集中启动新的 JVM 而产生额外的开销。
  • 在外部执行 java 类导致了额外的网络传输,用于与 Oozie 服务器同步这些执行结果。
  • 从 java 节点传递参数成了非常耗费资源的操作。

尽管如此,在运行时间相对较长(几分钟甚至几小时)的 map/reduce 或者 Pig 作业中,好处会大大超过负载的缺点,但是,在简单的 java 节点中(参见 [2]),我们就需要注意外部执行所导致的开销了。所以,使用自定义动作节点的原因之一,就是为了支持在 Oozie 的执行上下文中直接执行轻量级的 java 类 [1]

使用自定义动作的另一个原因是为了提高工作流的语义和可读性。由于 Oozie 是一种支持基于 Hadoop 处理组件的工作流引擎,所以它的语法完全是以 Hadoop 执行为中心的——Hadoop 文件系统、map/reduce、Pig 等等。这种语法能够很好地符合 Hadoop 开发者的习惯,但是并没有涉及到太多关于给定动作的功能信息。我们可以为动作本身制定与业务相关的命名转换规则,但这只是特别用来解决问题的——对动作的命名只反映了给定过程的语法,既不是总体上的主题领域,也不能解决动作参数的问题,那些问题仍然只能由开发者来解决。

幸运的是,Oozie 支持非常棒的扩展机制——自定义动作节点 [3],它让我们可以很容易地解决这两个问题。自定义的动作节点让我们可以使用附加的动作(动词)来扩展 Oozie 的语言。Oozie 的动作节点可以是同步的,也可以是异步的。

  • 同步节点——它在 Oozie 内部执行,在继续执行之前会等待这些节点完成动作。这些节点是为轻量级的任务所用的,像自定义计算,文件系统的移动、创建目录、删除等等。
  • 异步节点——它是由 Oozie 启动的,但是在 Oozie 引擎的外部执行,它会监控正在执行的动作,直到完成。这是通过动作的回调或者 Oozie 针对动作状态的 polling 操作完成的。

实现 Oozie 自定义动作处理程序

在这个例子中,我们会对独立的邮件程序进行转换,并展现到自定义的 email 动作中 [2](代码 1)。


package com.navteq.assetmgmt.oozie.custom;

import java.util.Properties;
import java.util.StringTokenizer;

import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.ActionExecutorException.ErrorType;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.Namespace;

public class EmailActionExecutor extends ActionExecutor {

        private static final String NODENAME = "eMail";

        private static final String SUCCEEDED = "OK";
 private static final String FAILED = "FAIL";
 private static final String KILLED = "KILLED";

        private static final String DEFAULMAILSERVER = "imailchi.navtech.com";
        private static final String EMAILSERVER = "emailServer";
        private static final String SUBJECT = "emailSubject";
        private static final String MESSAGE = "emailBody";
        private static final String FROM = "emailFrom";
        private static final String TO = "emailTo";

 public EmailActionExecutor() {
   super(NODENAME);
}

        @Override
        public void check(Context context, WorkflowAction action) throws ActionExecutorException {

                 // Should not be called for synch operation
                 throw new UnsupportedOperationException();
        }

        @Override
        public void end(Context context, WorkflowAction action)throws ActionExecutorException {

                 String externalStatus = action.getExternalStatus();
                 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ?
                        WorkflowAction.Status.OK : WorkflowAction.Status.ERROR;
                 context.setEndData(status, getActionSignal(status));
        }

        @Override
        public boolean isCompleted(String arg0) {

                 return true;
        }

        @Override
        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {

context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
        }

        @Override
        public void start(Context context, WorkflowAction action) throws ActionExecutorException {

                 // Get parameters from Node configuration
                 try{
                          Element actionXml = XmlUtils.parseXml(action.getConf());
                          Namespace ns = Namespace.getNamespace("uri:custom:email-action:0.1");

                          String server = actionXml.getChildTextTrim(EMAILSERVER, ns);
                          String subject = actionXml.getChildTextTrim(SUBJECT, ns);
                          String message = actionXml.getChildTextTrim(MESSAGE, ns);
                          String from = actionXml.getChildTextTrim(FROM, ns);
                          String to = actionXml.getChildTextTrim(TO, ns);

                          // Check if all parameters are there
                          if(server == null)
                                   server = DEFAULMAILSERVER;
                          if((message == null) || (from == null) || (to == null))
                                   throw new ActionExecutorException(ErrorType.FAILED,
  ErrorCode.E0000.toString(), "Not all parameters are defined");

                          // Execute action synchronously
                          SendMail(server, subject, message, from, to);
                          context.setExecutionData(SUCCEEDED, null);
               }
               catch(Exception e){
    context.setExecutionData(FAILED, null);
                      throw new ActionExecutorException(ErrorType.FAILED,
  ErrorCode.E0000.toString(), e.getMessage());
               }
    }

    // Sending an email
    public void SendMail(String server, String subject, String message,
                          String from, String to) throws Exception {

               // create some properties and get the default Session
               Properties props = new Properties();
               props.setProperty("mail.smtp.host", server);
               Session session = Session.getDefaultInstance(props, null);

               // create a message
               Message msg = new MimeMessage(session);

               // set the from and to address
               InternetAddress addressFrom = new InternetAddress(from);
               msg.setFrom(addressFrom);

               // To is a comma separated list
               StringTokenizer st = new StringTokenizer(to, ",");
               String [] recipients = new String[st.countTokens()];
               int rc = 0;
               while(st.hasMoreTokens())
                       recipients[rc++] = st.nextToken();
               InternetAddress[] addressTo = new InternetAddress[recipients.length];
               for (int i = 0; i < recipients.length; i++){
                       addressTo[i] = new InternetAddress(recipients[i]);
               }
               msg.setRecipients(Message.RecipientType.TO, addressTo);

               // Setting the Subject and Content Type
               msg.setSubject(subject);
               msg.setContent(message, "text/plain");
               Transport.send(msg);
    }
}

代码 1: Email 自定义动作

这个实现对 ActionExecutor [2] 类(由 Oozie 提供)进行了扩展,并重写了一些必要的方法。因为邮件的发送过程是一种非常快速的操作,所以我们决定将其实现为同步的动作处理程序,那意味着它会在 Oozie 的执行上下文中执行。

我们的实现(代码 1)遵循了 Oozie 文档并实现了所有必需的方法:

  • 对于所有自定义动作处理程序都需要没有参数的构造函数。这个构造函数会注册动作处理程序的名称(使用动作名称来调用父类),我们会在工作流 XML 中使用它。
  • 我们可以使用 InitActionType [3] 方法来注册执行动作时可能发生的异常,以及它们的类型和错误信息,并为执行程序本身执行初始化操作。
  • Start 方法是用来启动动作的执行操作的。因为我们实现的是同步动作,所以整个动作都会在此执行。这个方法是由 Oozie 调用的,它有两个参数:Context 和 WorkflowAction。Context 参数提供了对 Oozie 工作流执行上下文的访问,其中主要包含了工作流的变量,并提供了对其进行操作的非常简单的 API(set、get) [4] 。而 WorkflowAction 则提供了 Oozie 对当前动作的定义。
  • 还有 Check 方法,Oozie 会使用它来检查动作的状态。它不能作为同步动作来调用。
  • Kill 方法,可以用来中断运行的作业或者动作。
  • End 方法,可以用于所有清理动作,或者用于可能在完成动作之后所要做的处理。它还需要设置执行的结果。

部署并使用 Oozie 自定义动作处理程序

实现了自定义的动作执行方式之后,我们需要为新的 email 动作定义 XML 模式 [5] (代码 2)。


<?<span color="#008080">xml</span> <span color="#800080">version</span>=<i><span color="#0000ff">"1.0"</span></i> <span color="#800080">encoding</span>=<i><span color="#0000ff">"UTF-8"</span></i>?>
<<span color="#008080">xs:schema</span> <span color="#800080">xmlns:xs</span>=<span color="#0000ff"><i>"http://www.w3.org/2001/XMLSchema"</i> </span>

                 <span color="#800080">xmlns:email</span>=<i><span color="#0000ff">"uri:custom:email-action:0.1"</span></i>
     <span color="#800080">elementFormDefault</span>=<i><span color="#0000ff">"qualified"</span></i>
     <span color="#800080">targetNamespace</span>=<i><span color="#0000ff">"uri:custom:email-action:0.1"</span></i>>
<<span color="#008080">xs:complexType</span> <span color="#800080">name</span>=<i><span color="#0000ff">"EMAIL"</span></i>>

       <<span color="#008080">xs:sequence</span>>
                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailServer"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> <span color="#800080">minOccurs</span>=<i><span color="#0000ff">"0"</span></i> <span color="#800080">maxOccurs</span>=<i><span color="#0000ff">"1"</span></i> />

                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailSubject"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> />
                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailFrom"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> />

                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailTo"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> />
                 <<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"emailBody"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"xs:string"</span></i> />

       </<span color="#008080">xs:sequence</span>>
</<span color="#008080">xs:complexType</span>>
<p><<span color="#008080">xs:element</span> <span color="#800080">name</span>=<i><span color="#0000ff">"eMail"</span></i> <span color="#800080">type</span>=<i><span color="#0000ff">"email:EMAIL"</span></i>></<span color="#008080">xs:element</span>><br></br></<span color="#008080">xs:schema</span>></p>

代码 2: 为 email 组件所用的 XML schema

自定义动作节点和 XML schema 文件都需要打包在单独的 jar 文件中,比方说 emailAction.Jar。我们可以使用 Oozie 的 oozie-setup.sh 脚本执行下面的命令,从而把这个(以及其他所有)jar 文件添加到 Oozie 的 war 文件中。


$ bin/oozie-setup.sh -jars emailAction.jar:mail.jar (See Adding Jars to Oozie) 

代码 3: 部署命令

向 Oozie 添加 Jar 文件

你要知道,Cloudera 推荐的 oozie-setup.sh 命令行会重新构建你的 war 文件,并且, 如果你使用网页来监控作业,那么就会丢失 java 的脚本扩展。在测试方面,我们难以同时包含 -extjs 和 -jars 选项。作为权宜之计,我们会把 jar 文件复制到 ${CATALINA_BASE}/webapps/oozie/WEB-INF/lib 中,其中 ${CATALINA_BASE}代表的是 /var/lib/oozie/oozie-server。请注意,这里存在一定的风险,如果其他人重新构建了 war 文件,那么你就会丢失这些扩展,并且他们以后需要手动添加。对于测试来说,我们建议复制 jar 文件,然而,对于生产环境,我们建议把 jar 添加到 war 文件中。

现在我们需要把关于自定义执行器的信息注册到 Oozie 的运行时中。这是通过扩展 oozie-site.xml 完成的 [6] 。我们可以通过在 Oozie 配置文件 oozie-site.xml 中添加或者修改“oozie.service.ActionService.executor.ext.classes” [7] 来注册自定义动作本身(代码 4)。


……………………………………
<property>
       <name>oozie.service.ActionService.executor.ext.classes</name>
       <value>com.navteq.assetmgmt.oozie.custom. EmailActionExecutor </value>
</property>
……………………………………

代码 4: 自定义执行配置

为新动作(代码 2)所用的 XML schema 应该添加到 oozie-site.xml 中,位于属性“oozie.service.WorkflowSchemaService.ext.schemas” [8] 之下(代码 5)。


………………………………………
<property>
       <name>oozie.service.SchemaService.wf.ext.schemas</name>
       <value> emailAction.xsd</value>
</property>
…………………………………

代码 5: 自定义模式配置

最后,Tomcat 启动之后,我们就可以在工作流中使用自定义的动作节点了。

为了测试我们的实现,我们创建了简单的工作流(代码 6),它会使用我们的执行器来发送 email。


<!--
Copyright (c) 2011 NAVTEQ! Inc. All rights reserved.
Test email <u>Oozie</u> Script
-->
<span color="#008080"><workflow-app <span color="#800080">xmlns</span><span color="#000000">=</span><i><span color="#0000ff">'uri:oozie:workflow:0.1'</span></i> <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'emailTester'</span></i>></span>
 <span color="#008080"><start <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">'simpleEmail'</span></i>/></span>

 <span color="#008080"><action <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'simpleEmail'</span></i>></span>
   <span color="#008080"><eMail <span color="#000000">xlmns</span>=“ uri:custom:email-action:0.1”></span>
     <span color="#008080"><emailSubject><span color="#000000">test</span></emailSubject></span>
     <span color="#008080"><emailFrom><span color="#000000">mike.segel<u>@<mycompany>.com</u></span></emailFrom></span>

     <span color="#008080"><emailTo><span color="#000000">boris.lublinsky<u>@<mycompany>.com</u></span></emailTo></span>
     <span color="#008080"><emailMessage><span color="#000000">This is a test message, if you can see this, Mikey did something right! :)</span></emailMessage></span>
   <span color="#008080"></eMail></span>
  <span color="#008080"><error <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">"fail"</span></i>/></span>

  <span color="#008080"><ok <span color="#800080">to</span><span color="#000000">=</span><i><span color="#0000ff">"end"</span></i>/></span>
 <span color="#008080"></action></span>
 <span color="#008080"><kill <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">"fail"</span></i>></span>
   <span color="#008080"><message><span color="#000000"><u>Workflow</u> failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</span></message></span>

 <span color="#008080"></kill></span>
 <span color="#008080"><end <span color="#800080">name</span><span color="#000000">=</span><i><span color="#0000ff">'end'</span></i>/></span>
<span color="#008080"></workflow-app></span>

代码 6: 使用 email 自定义动作执行器的简单工作流

结论

在这篇文章中,我们展示了如何通过创建自定义的动作执行器来扩展 Oozie。这样做让我们可以定义和实现部门或者企业专用的 Oozie 语言(领域专用语言),其中具有部门或者企业的功能。这样的领域专用语言能够简化特定部门或者企业的构建过程,并提高代码的可读性。

尽管 Oozie 相对还不够成熟,但是它已经为处理包含多个 map/reduce 作业、并且能够向总体的业务过程添加非 map-reduce 作业的过程提供了基本的框架。随着越来越多的用户使用 Oozie 并提供反馈,我们相信它有足够的潜力,可以成为 Hadoop 环境中强大的集成部分。

对于 Oozie 来说,还有很多我们没有在这三篇文章中包含的内容。我们只是期望它们可以引起你对 Oozie 工作流引擎的足够兴趣,并且能够成为进一步研究 Oozie 的不错的起点。

参考信息

  1. Boris Lublinsky, Mike Segel. Introduction to Oozie.
  2. Boris Lublinsky, Mike Segel. Oozie by Example
  3. Oozie custom Action Nodes
  4. Oozie source code

关于作者

Boris Lublinsky是 NAVTEQ 公司的首席架构师,在这家公司中他的工作是为大型数据管理和处理、SOA 以及实现各种 NAVTEQ 的项目定义架构的愿景。他还是 InfoQ 的 SOA 编辑,以及 OASIS 的 SOA RA 工作组的参与者。Boris 是一位作者,还经常发表演讲,他最新的一本书是《Applied SOA》。

Michael Segel在过去二十多年间一直与客户写作,识别并解决他们的业务问题。Michael 已经作为多种角色、在多个行业中工作过。他是一位独立顾问,总是期望能够解决所有有挑战的问题。Michael 拥有俄亥俄州立大学的软件工程学位。


[1] 这种类的例子可能是各种计数器操作、简单的计算等等。

[2] 所有 Oozie 动作执行器都是 Oozie 分发程序的一部分,它们都是通过扩展这个类实现的。

[3] 在我们的实现中,我们使用的是默认的实现,这样就是为什么没有在代码中展现的原因。你可以查看一下 Oozie 的源代码 [4],就可以知道在现存的 Oozie 动作处理程序中是如何实现这个方法的。

[4] 配置自定义执行器有两种方式——工作流变量和 / 或动作配置。在我们的例子中展示的是后者,但是在实际情况中,实际上总是二者的组合。

[5] 确保不仅要定义复杂的类型,还要对元素进行定义。那是 Oozie 所期望的。

[6] 通常在 Oozie 的分发包中叫做 oozie-default.xml。

[7] 对于多个执行器,类名应该以逗号分隔。

[8] 对于多次执行,我们可以为多种模式使用逗号分隔的列表。

查看英文原文: Extending Oozie


给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。