写点什么

使用 Step Functions 协调 Amazon EMR 工作负载

  • 2019-11-27
  • 本文字数:6071 字

    阅读完需:约 20 分钟

使用 Step Functions 协调 Amazon EMR 工作负载

使用 AWS Step Functions,您可以为应用程序添加无服务器工作流自动化。您的工作流的步骤可以在任何位置运行,包括在 AWS Lambda 函数中、在 Amazon Elastic Compute Cloud (EC2) 上或在本地运行。为简化构建工作流,Step Functions 直与多项 AWS 服务集成:Amazon ECSAWS FargateAmazon DynamoDBAmazon Simple Notification Service (SNS)Amazon Simple Queue Service (SQS)AWS BatchAWS GlueAmazon SageMaker 以及(为运行嵌套工作流Step Functions 本身


从今天开始,Step Functions 将连接到 Amazon EMR,使您能够以最少的代码创建数据处理和分析工作流,节省时间,并优化集群利用率。例如,为机器学习构建数据处理管道不仅耗时,而且棘手。借助这一全新集成功能,您可以轻松协调工作流功能,包括上一步结果中的并行执行和依赖关系,并在运行数据处理作业时处理故障和异常情况。


具体来说,Step Functions 状态机现在可以:


  • 创建终止 EMR 集群,包括可更改集群终止保护。这样,您可将现有 EMR 集群重复用于工作流,也可在执行工作流期间按需创建一个集群。

  • 为您的集群添加取消 EMR 步骤。 每个 EMR 步骤都是一个作业单位,其中包含数据操作说明,以便通过安装在集群上的软件进行处理,例如 Apache SparkHivePresto

  • 修改 EMR 集群实例队列大小,使您可以根据工作流每个步骤的要求以编程方式管理扩展。例如,您可以在添加计算密集型步骤之前增大实例组的大小,并在完成后立即减小其大小。


在您创建或终止集群或向集群添加 EMR 步骤时,仅当相应的活动已在 EMR 集群上完成时,您才能使用同步集成移至工作流的下一个步骤。


读取您的 EMR 集群的配置或状态不属于 Step Functions 服务集成。如果您需要,可以使用 Lambda 函数作为任务访问 EMR List*Describe* API。


**使用 EMR 和 Step Functions 构建工作流


**我将在 Step Functions 控制台上创建一个新的状态机。控制台可非常直观地呈现创建步骤,以便易于理解:



为创建状态机,我通过 Amazon States Language (ASL) 使用以下定义:


Json


{  "StartAt": "Should_Create_Cluster",  "States": {    "Should_Create_Cluster": {      "Type": "Choice",      "Choices": [        {          "Variable": "$.CreateCluster",          "BooleanEquals": true,          "Next": "Create_A_Cluster"        },        {          "Variable": "$.CreateCluster",          "BooleanEquals": false,          "Next": "Enable_Termination_Protection"        }      ],      "Default": "Create_A_Cluster"    },    "Create_A_Cluster": {      "Type": "Task",      "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",      "Parameters": {        "Name": "WorkflowCluster",        "VisibleToAllUsers": true,        "ReleaseLabel": "emr-5.28.0",        "Applications": [{ "Name": "Hive" }],        "ServiceRole": "EMR_DefaultRole",        "JobFlowRole": "EMR_EC2_DefaultRole",        "LogUri": "s3://aws-logs-123412341234-eu-west-1/elasticmapreduce/",        "Instances": {          "KeepJobFlowAliveWhenNoSteps": true,          "InstanceFleets": [            {              "InstanceFleetType": "MASTER",              "TargetOnDemandCapacity": 1,              "InstanceTypeConfigs": [                {                  "InstanceType": "m4.xlarge"                }              ]            },            {              "InstanceFleetType": "CORE",              "TargetOnDemandCapacity": 1,              "InstanceTypeConfigs": [                {                  "InstanceType": "m4.xlarge"                }              ]            }          ]        }      },      "ResultPath": "$.CreateClusterResult",      "Next": "Merge_Results"    },    "Merge_Results": {      "Type": "Pass",      "Parameters": {        "CreateCluster.$": "$.CreateCluster",        "TerminateCluster.$": "$.TerminateCluster",        "ClusterId.$": "$.CreateClusterResult.ClusterId"      },      "Next": "Enable_Termination_Protection"    },    "Enable_Termination_Protection": {      "Type": "Task",      "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",      "Parameters": {        "ClusterId.$": "$.ClusterId",        "TerminationProtected": true      },      "ResultPath": null,      "Next": "Add_Steps_Parallel"    },    "Add_Steps_Parallel": {      "Type": "Parallel",      "Branches": [        {          "StartAt": "Step_One",          "States": {            "Step_One": {              "Type": "Task",              "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",              "Parameters": {                "ClusterId.$": "$.ClusterId",                "Step": {                  "Name": "The first step",                  "ActionOnFailure": "CONTINUE",                  "HadoopJarStep": {                    "Jar": "command-runner.jar",                    "Args": [                      "hive-script",                      "--run-hive-script",                      "--args",                      "-f",                      "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",                      "-d",                      "INPUT=s3://eu-west-1.elasticmapreduce.samples",                      "-d",                      "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"                    ]                  }                }              },              "End": true            }          }        },        {          "StartAt": "Wait_10_Seconds",          "States": {            "Wait_10_Seconds": {              "Type": "Wait",              "Seconds": 10,              "Next": "Step_Two (async)"            },            "Step_Two (async)": {              "Type": "Task",              "Resource": "arn:aws:states:::elasticmapreduce:addStep",              "Parameters": {                "ClusterId.$": "$.ClusterId",                "Step": {                  "Name": "The second step",                  "ActionOnFailure": "CONTINUE",                  "HadoopJarStep": {                    "Jar": "command-runner.jar",                    "Args": [                      "hive-script",                      "--run-hive-script",                      "--args",                      "-f",                      "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",                      "-d",                      "INPUT=s3://eu-west-1.elasticmapreduce.samples",                      "-d",                      "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"                    ]                  }                }              },              "ResultPath": "$.AddStepsResult",              "Next": "Wait_Another_10_Seconds"            },            "Wait_Another_10_Seconds": {              "Type": "Wait",              "Seconds": 10,              "Next": "Cancel_Step_Two"            },            "Cancel_Step_Two": {              "Type": "Task",              "Resource": "arn:aws:states:::elasticmapreduce:cancelStep",              "Parameters": {                "ClusterId.$": "$.ClusterId",                "StepId.$": "$.AddStepsResult.StepId"              },              "End": true            }          }        }      ],      "ResultPath": null,      "Next": "Step_Three"    },    "Step_Three": {      "Type": "Task",      "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",      "Parameters": {        "ClusterId.$": "$.ClusterId",        "Step": {          "Name": "The third step",          "ActionOnFailure": "CONTINUE",          "HadoopJarStep": {            "Jar": "command-runner.jar",            "Args": [              "hive-script",              "--run-hive-script",              "--args",              "-f",              "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",              "-d",              "INPUT=s3://eu-west-1.elasticmapreduce.samples",              "-d",              "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"            ]          }        }      },      "ResultPath": null,      "Next": "Disable_Termination_Protection"    },    "Disable_Termination_Protection": {      "Type": "Task",      "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",      "Parameters": {        "ClusterId.$": "$.ClusterId",        "TerminationProtected": false      },      "ResultPath": null,      "Next": "Should_Terminate_Cluster"    },    "Should_Terminate_Cluster": {      "Type": "Choice",      "Choices": [        {          "Variable": "$.TerminateCluster",          "BooleanEquals": true,          "Next": "Terminate_Cluster"        },        {          "Variable": "$.TerminateCluster",          "BooleanEquals": false,          "Next": "Wrapping_Up"        }      ],      "Default": "Wrapping_Up"    },    "Terminate_Cluster": {      "Type": "Task",      "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",      "Parameters": {        "ClusterId.$": "$.ClusterId"      },      "Next": "Wrapping_Up"    },    "Wrapping_Up": {      "Type": "Pass",      "End": true    }  }}
复制代码


我让 Step Functions 控制台为执行此状态机创建一个新的 AWS Identity and Access Management (IAM) 角色。该角色自动包含访问 EMR 所需的所有权限。


此状态机可以使用现有的 EMR 集群,也可创建一个新集群。我可以使用以下输入创建在工作流结束时终止的新集群:


{


"CreateCluster": true,


"TerminateCluster": true


}


要使用现有集群,我需要使用以下句法在集群 ID 中提供输入:


{


"CreateCluster": false,


"TerminateCluster": false,


"ClusterId": "j-..."


}


我们来看看它的工作原理。工作流开始时,Should_Create_Cluster [](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-choice-state.html) 状态将查看输入以确定它是否应进入 Create_A_Cluster 状态。此时,我使用同步调用 (elasticmapreduce:createCluster.sync) 等待新的 EMR 集群到达 WAITING 状态,然后再继续进入下一个工作流状态。AWS Step Functions 控制台会显示使用到 EMR 控制台的链接创建的资源:



之后,Merge_Results [](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-pass-state.html) 状态将输入状态与新创建集群的集群 ID 合并,以将其传递至工作流中的下一个步骤。


在开始处理任何数据之前,我使用 Enable_Termination_Protection 状态 (elasticmapreduce:setClusterTerminationProtection) 帮助确保我的 EMR 集群中的 EC2 实例不会因出现意外或错误而关闭。


现在,我已准备好使用 EMR 集群执行某些操作。我的工作流中有三个 EMR 步骤。为简单起见,这些步骤都基于此 Hive 教程。对于每个步骤,我使用 Hive 的类似于 SQL 的界面对一些示例 CloudFront 日志运行查询,并将结果写入 Amazon Simple Storage Service (S3)。在生产使用案例中,您可能有多个 EMR 工具并行处理和分析您的数据(两个或多个步骤同时运行)或具有某些依赖关系(一个步骤的输出是另一个步骤的必要因素)。我们来尝试执行一些类似的操作。


首先我在[](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-parallel-state.html)状态下执行 Step_OneStep_Two


  • Step_One 作为作业 (elasticmapreduce:addStep.sync) 同步运行 EMR 步骤。这意味着在继续执行工作流中的下一个步骤之前,执行会等待 EMR 步骤完成(或取消)。您可以有选择地添加超时,以监控 EMR 步骤的执行是否在预期时间范围内。

  • Step_Two 正在异步添加 EMR 步骤 (elasticmapreduce:addStep)。在这种情况下,只要 EMR 回复请求已收到,工作流就会移至下一个步骤。几秒钟后,为了尝试其他集成,我取消了 Step_Two (elasticmapreduce:cancelStep)。这种集成在生产使用案例中可能会非常有用。例如,如果您从并行运行的另一个步骤中收到错误消息,使得继续执行某个 EMR 步骤变得毫无用处,则可取消该步骤。


在这两个步骤都已完成并生成结果之后,我将以作业形式执行 Step_Three,与我为 Step_One 执行的操作类似。当 Step_Three 完成后,我将进入 Disable_Termination_Protection 步骤,因为我已使用集群完成此工作流。


根据输入状态,Should_Terminate_Cluster Choice 状态将进入 Terminate_Cluster 状态 (elasticmapreduce:terminateCluster.sync) 并等待 EMR 集群终止,或直接进入 Wrapping_Up 状态并离开正在运行的集群。


最后,我还要处理 Wrapping_Up 状态。实际上,我在此最终状态下所做工作不多,但我不能从 Choice 状态结束工作流。


在 EMR 控制台中,我看到我的集群和 EMR 步骤的状态:



使用 AWS 命令行界面 (CLI),我可以在配置为 EMR 步骤输出的 S3 存储桶中找到我的查询结果:


aws s3 ls s3://MY-BUCKET/MyHiveQueryResults/ ...


根据我的输入,EMR 集群在此工作流执行结束时仍在运行。我单击 Create_A_Cluster 步骤中的资源链接转至 EMR 控制台并将其终止。如果您也同时打开此演示,请注意不要让 EMR 集群在您不需要时运行。


**现已推出


**Step Functions 与 EMR 的集成已在所有区域全面推出。除常规 Step Functions 和 EMR 定价之外,使用此功能不会产生额外费用。


现在,您可以使用 Step Functions 来快速构建复杂的工作流,以便执行 EMR 作业。工作流可以包括并行执行、依赖关系和异常处理。 Step Functions 可以在作业失败后轻松重试,并在出现严重错误后终止工作流,因为您可以指明出错时所发生的情况。欢迎与我分享您将如何使用此功能!


2019-11-27 08:00867

评论

发布
暂无评论
发现更多内容

2023 IoTDB 用户大会成功举办,深入洞察工业互联网数据价值

Apache IoTDB

如何使用京东商品详情 API 获取用户评价最多的商品详情?

技术冰糖葫芦

API 开发

2024年API安全趋势预测

互联网工科生

API API 安全

如何理解点到点传输,如果加速点到点传输速度

镭速

点对点传输

再也不怕面试官问Redis持久化了

程序员花卷

缓存 后端 写时复制 redis 底层原理

两道题浅析PHP反序列化逃逸

不在线第一只蜗牛

php 面试 PHP开发

别让错误的SQL变更毁了你的数据!那该如何审核变更SQL?

NineData

sql 数据 开发 变更 NineData

灵活易用的即时通讯组件设计思路和最佳实践

融云 RongCloud

ios 设计 即时通讯 API Global IM UIkit

Go语言很难吗?为什么 Go 岗位这么少?

伤感汤姆布利柏

Go 后端 低代码 Go 面试题 面经 后端 大厂

Docker镜像构建:技术深度解析与实践

树上有只程序猿

Docker 镜像

高效微调大模型的新方法

百度开发者中心

nlp 大模型 #人工智能

​HTML代码混淆技术:原理、应用和实现方法详解

【写作训练营打卡|01】

写作

软件测试/人工智能|人工智能与智能化测试Workshop

霍格沃兹测试开发学社

万界星空科技MES系统在设备管理中的多个应用场景

万界星空科技

数字化转型 mes 制造业生产管理系统 云mes 万界星空科技mes

万界星空科技智能工厂的主要建设模式

万界星空科技

数字化转型 智能工厂 智能工厂解决方案 #人工智能 万界星空科技mes

Lazada商品详情接口在电商行业中的重要性及实时数据获取实现

Noah

OpenAI成长史,凭什么快速崛起?特殊股权设计带来哪些影响?

博文视点Broadview

c++类 | AI工程化部署

AIWeker

c AI工程化部署

实例讲解Python 解析JSON实现主机管理

华为云开发者联盟

Python json 开发 华为云 华为云开发者联盟

IDC 中国数字化转型盛典:兴业银行「基于悦数图数据库」的「智能大数据云平台」获奖

悦数图数据库

大数据 云平台 图数据库 智能大数据云平台

从 Logstash 到 TDengine 数据接入功能,原来有这些“不一样”

TDengine

tdengine 时序数据库

分布式基础概念-分布式缓存[3]

派大星

分布式 Java 面试题

融云 CEO 董晗获评甲子光年「2023 中国数字经济创新人物」

融云 RongCloud

互联网 通信 数字经济 wicc 光年20

大模型训练的得力助手

百度开发者中心

大模型 #人工智能 LLM

使用 Step Functions 协调 Amazon EMR 工作负载_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章