互联网行业每天都有大量的日志生成,需要在固定时间段对数据进行 ETL 工作。用户常规的做法是启动一组长期运行的 EMR 集群,配置远程提交任务的服务器,结合自身的任务调度系统定期提交任务,但集群执行完成任务之后会闲置,造成不必要的开销。另一种方法是在需要执行任务的时候启动集群,任务完成之后关闭集群,但因为每次启动集群后,主节点与核心节点的 IP 都会发生分变化,导致每次都需要重新配置提交任务的服务器,造成额外的工作负担。本文介绍了一种通过 Apache Airflow 任务调度系统动态启停 Amazon EMR 集群的方法,并通过 EMR 内置的 Livy 远程提交作业,这样可以节省大量的成本并且无需进行过多的额外配置。
1. 相关技术介绍
在开始之前,请先对以下技术进行简单了解。
1.1 Apache Airflow
Apache Airflow 是一款开源的任务调度系统,用户通过创建 DAG(有向无环图)来定义任务的流程,图中的每个节点就是需要执行的任务,不同 DAG 之间的任务可以相互依赖。通过 Airflow 我们可以定时执行脚本,并且它提供了 web 界面供用户可视化地查看任务执行情况。
1.2 Apache Livy
Apache Livy 是 Hadoop 生态圈中提供远程提交任务功能的应用程序。它以 Rest API 的方式提供了 Session 与 Batches 两种集群执行任务的方法。Session 指的是将集群需要执行的代码写在对 Livy 请求中,目前支持 spark、pyspark、sparkr 与 sql 等四种方式与集群交互。Bathches 指的是将代码存放在指定位置,在请求中提供路径,让集群执行代码。例如将 jar 包存放在 S3 上,在请求 Livy 的时候提供 jar 包的路径,从而让集群直接执行 jar 包,好处是无需在集群上配置执行代码所需的依赖。
2. 演练
通过本文示例,我将向您展示如何实现以下方案:
基于开源调度工具 Airflow 编排提交 Spark Jobs 到 EMR 做批处理,Job 开始之前启动 EMR 集群,对集群节点采用 Spot 实例,所有 Job 结束后关闭 EMR 集群。
2.1 流程架构图与过程简介
(1)在一台 EC2 上配置 Airflow;
(2)定义 Airflow 工作流,其中包含创建集群,Spark 任务步骤与终止集群等任务;
(3)向 Livy 提交任务;
(4)EMR 从 S3 中读取数据,对数据进行处理完成之后重新写入 S3;
(5)工作完成,终止集群。
2.2 前提条件
(1)本文示例所使用的区域 us-east-1;
(2)在该区域创建一台 EC2,并确保与 EC2 绑定的 IAM Role 有 EMR 集群的 Full Access;
(3)拥有创建 EMR 集群所需的默认角色:EMR_DefaultRole 与 EMR_EC2_DefaultRole;
(4)创建 S3 桶,下载 jar 包 spark-examples_2.11-2.4.4 和数据集 emrdata.txt,并上传至 s3。
2.3 实现过程
2.3.1 在 EC2 上配置 Airflow
(1)登陆 EC2,安装 Airflow 已经相关依赖
Python
sudo yum update -y
sudo yum install -y python-pip gcc mysql-devel python-devel mysql
sudo pip install mysql-python
sudo yum install -y python3
sudo pip3 install boto3
sudo pip3 install requests
# 安装Airflow
sudo pip install apache-airflow
sudo pip install 'apache-airflow[celery]'
airflow initdb
(2)创建 RDS for Mysql 数据库供 Airflow 使用,对数据库性能要求不高,因此使用默认配置即可
(3)更改 airflow.cfg 配置文件,并测试是否能打开 Airflow 的 web 页面
Python
cd airflow
vim airflow.cfg
# 找到sql_alchemy_conn等参数所在位置,替换为创建的数据库信息
sql_alchemy_conn = mysql://admin:12345678@database-for-airflow.cdtwa5j4xten.us-east-1.rds.amazonaws.com/airflowdb
# Exit vim, Update Airflow Database
airflow initdb
# 配置celery相关参数
vim airflow.cfg
# 找到executor位置,将执行器设置为celery,可保证不相互依赖的任务可以并行执行
executor = CeleryExecutor
# 找到broker_url与result_backend参数的位置
broker_url = sqla+mysql://admin:12345678@database-for-airflow.cdtwa5j4xten.us-east-1.rds.amazonaws.com:3306/airflowdb
result_backend = db+mysql://admin:12345678@database-for-airflow.cdtwa5j4xten.us-east-1.rds.amazonaws.com:3306/airflowdb
# 开启airflow的webserver,在网页上输入EC2的DNS,查看是否能打开网页(注意打开安全组,并且如果本地连上的是公司的vpn,可能会出现无法打开网页的情况)
airflow webserver -p 8080 &
# 启动worker
airflow worker &
# 启动flower,可对worker中的任务进行可视化(要看到网页注意打开5555端口)
airflow flower &
2.3.2 定义工作流
现定义如下两个 Airflow 的 DAG:
dag_transform_calpi
(1)create_emr_cluster:创建 EMR 集群;
Python
# -*- coding: UTF-8 -*-
import boto3
import time
emr_client = boto3.client('emr', region_name='us-east-1')
# 定义集群名称,集群名称不要与当前运行的集群重名
name = 'emr-cluster'
# 定义instance,可自定义实例的数量与类型
intances = {
'InstanceGroups': [
{
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': 'm4.xlarge',
'InstanceCount': 1,
},
{
'Market': 'SPOT',
'InstanceRole': 'CORE',
'InstanceType': 'm4.xlarge',
'InstanceCount': 2,
}
],
'KeepJobFlowAliveWhenNoSteps': True
}
# 定义集群中的应用
applications = [
{
'Name': 'Hadoop'
},
{
'Name': 'Pig'
},
{
'Name': 'Livy'
},
{
'Name': 'Hive'
},
{
'Name': 'Hue'
},
{
'Name': 'Spark'
}
]
if __name__ == '__main__':
# 创建emr集群
emr_client.run_job_flow(
Name=name, ReleaseLabel='emr-5.12.0', Instances=intances, Applications=applications,
JobFlowRole='EMR_EC2_DefaultRole', ServiceRole='EMR_DefaultRole')
# 持续发送请求,直到创建的集群状态处于Waiting为止
flag = True
while flag:
time.sleep(20)
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
if i['Name'] == name:
flag = False
(2)create_livy_session:创建 Livy 会话;
Python
# -*- coding: UTF-8 -*-
import requests
import json
import pprint
import boto3
# 获取集群的DNS,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
if i['Name'] == name:
cluster_id = i['Id']
r = emr_client.describe_cluster(ClusterId=cluster_id)
emr_dns = r['Cluster']['MasterPublicDnsName']
# livy_host为配置在emr集群上livy的url,无需修改代码
livy_host = 'http://' + emr_dns + ':8998'
data = {'kind': 'pyspark'}
headers = {'Content-Type': 'application/json'}
r = requests.post(livy_host + '/sessions',
data=json.dumps(data), headers=headers)
pprint.pprint(r.json())
(3)sleep:等待会话创建完成;
(4)calpi:以 batches 的方式执行 spark 任务计算 pi 值;
Python
# -*- coding: UTF-8 -*-
import requests
import json
import textwrap
import pprint
import boto3
# 获取执行jar包任务的livy batch的url,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
if i['Name'] == name:
cluster_id = i['Id']
r = emr_client.describe_cluster(ClusterId=cluster_id)
emr_dns = r['Cluster']['MasterPublicDnsName']
batch_url = 'http://' + emr_dns + ':8998/batches'
headers = {'Content-Type': 'application/json'}
# 提交任务
data = {"file": "s3://xiaoyj/emr/spark-examples_2.11-2.4.4.jar",
"className": "org.apache.spark.examples.SparkPi"}
r = requests.post(batch_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())
(5)query_completed:外部任务,依赖于第二个 DAG(dag_query),即等待查询完成之后,执行下一个任务;
(6)终止集群。
Python
# -*- coding: UTF-8 -*-
import boto3
import time
# 终止集群,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
flag = True
while flag:
time.sleep(120)
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
if i['Name'] == name:
emr_client.terminate_job_flows(JobFlowIds=[i['Id']])
flag = False
dag_query
(1)sleep_completed:外部任务,依赖于第一个 DAG(dag_transform_calpi),即等待 Livy 会话执行下一个任务;
(2)transform:对之前上传到 S3 上的文本文件进行聚合、转换;
Python
# -*- coding: UTF-8 -*-
import requests
import json
import textwrap
import pprint
import boto3
# 获取提交任务的livy_url,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
if i['Name'] == name:
cluster_id = i['Id']
r = emr_client.describe_cluster(ClusterId=cluster_id)
emr_dns = r['Cluster']['MasterPublicDnsName']
livy_url = 'http://' + emr_dns + ':8998/sessions/0/statements'
headers = {'Content-Type': 'application/json'}
# 提交任务,data中的code为在emr中执行的代码,对s3中的文件进行转化操作,完成后将结果存放回s3作为中间结果
data = {
'code': textwrap.dedent("""
import json
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3-us-east-2.amazonaws.com')
text_file = sc.textFile("s3a://xiaoyj/emr/emrdata.txt")
text_file = text_file.map(lambda x: x.split('::'))
text_file = text_file.map(lambda x: (int(x[0]), x[1:]))
text_file = text_file.groupByKey().map(lambda x: (x[0], list(x[1])))
text_file = text_file.sortByKey()
text_file = text_file.map(lambda x: {x[0]: x[1]})
text_file = text_file.map(lambda x: json.dumps(x))
text_file.coalesce(1).saveAsTextFile("s3a://xiaoyj/emr/middle_result")
print("Transform Complete!")
""")
}
r = requests.post(livy_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())
(3)check_s3:检查 S3 中是否有上一步生成的中间结果;
Python
# -*- coding: UTF-8 -*-
import boto3
import time
# 轮询s3,确认transform任务是否执行完成(即s3中是否有middle_result文件生成),name为你的s3桶名称
name = 'xiaoyj'
s3_client = boto3.client('s3', region_name='us-east-1')
flag = True
while flag:
time.sleep(60)
r = s3_client.list_objects(Bucket=name)
for i in r['Contents']:
if i['Key'] == 'emr/middle_result/part-00000':
flag = False
(4)query:对上一步生成的中间结果进行查询。
Python
# -*- coding: UTF-8 -*-
import requests
import json
import textwrap
import pprint
import boto3
# 获取提交任务的livy_url,其中name为你的集群名称
name = 'emr-cluster'
emr_client = boto3.client('emr', region_name='us-east-1')
r = emr_client.list_clusters(ClusterStates=['WAITING'])
for i in r['Clusters']:
if i['Name'] == name:
cluster_id = i['Id']
r = emr_client.describe_cluster(ClusterId=cluster_id)
emr_dns = r['Cluster']['MasterPublicDnsName']
livy_url = 'http://' + emr_dns + ':8998/sessions/0/statements'
headers = {'Content-Type': 'application/json'}
# 提交任务,data中的code为在emr中执行的代码,对s3中的文件进行转化操作,完成后将结果存放回s3作为中间结果
data = {
'code': textwrap.dedent("""
import json
from pyspark.sql import HiveContext, Row
hiveCtx = HiveContext(sc)
input = hiveCtx.read.json("s3a://xiaoyj/emr/middle_result/part-00000")
input.registe rTempTable("tbn")
result = hiveCtx.sql("SELECT size(`9`) from tbn")
result = result.rdd.map(lambda row: row[0])
result.coalesce(1).saveAsTextFile("s3a://xiaoyj/emr/result")
print("Search Complete!")
""")
}
r = requests.post(livy_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())
2.3.3 创建 Airflow 工作流
(1)在 airflow 文件夹中创建 dags 文件夹,并进入到文件夹中;
(2)定义工作流(注意开头的 # — coding: UTF-8 –不要省略,并且 bash_command 需替换为自己任务所在的路径);
Python
vim dag_transform_calpi.py
# -*- coding: UTF-8 -*-
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime.now().replace(microsecond=0),
'email': ['756044579@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('dag_transform_calpi', default_args=default_args,
schedule_interval=timedelta(days=1))
# 创建emr集群
t0 = BashOperator(
task_id='create_emr_cluster',
bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/create_emr_cluster.py',
dag=dag)
# 创建livy的会话
t1 = BashOperator(
task_id='create_livy_session',
bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/create_session.py',
dag=dag)
# 等待会话创建完成
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 20',
dag=dag)
# 计算pi值
t3 = BashOperator(
task_id='calpi',
bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/calpi.py',
dag=dag)
# 终止emr集群
t4 = BashOperator(
task_id='terminate_cluster',
bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/terminate_cluster.py',
dag=dag)
# dag_query中的spark sql任务
external_task = ExternalTaskSensor(
external_task_id='query', task_id='query_completed', external_dag_id='dag_query', dag=dag)
# 定义airflow的有向无环图
t0 >> t1
t1 >> t2
t2 >> t3
external_task >> t4
Python
vim dag_query.py
# -*- coding: UTF-8 -*-
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime.now().replace(microsecond=0),
'email': ['756044579@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('dag_query', default_args=default_args,
schedule_interval=timedelta(days=1))
# 对s3上的文本文件进行转化操作
t0 = BashOperator(
task_id='transform',
bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/transform.py',
dag=dag)
# 轮询s3,查看中间结果是否生成
t1 = BashOperator(
task_id='check_s3',
bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/check_s3.py',
dag=dag)
# spark sql任务
t2 = BashOperator(
task_id='query',
bash_command='python3 /Users/xiaoyj/Desktop/emr_poc/query.py',
dag=dag)
# dag_transform_calpi中的sleep任务
external_task = ExternalTaskSensor(
external_task_id='sleep', task_id='sleep_completed', external_dag_id='dag_transform_calpi', dag=dag)
external_task >> t0
t0 >> t1
t1 >> t2
(3)重制Airflow数据库;
Python
airflow resetdb
(4)启动Airflow,-s为当前日期,-e是结束日期,均设置为当日的日期(若工作流执行失败并想重头开始执行工作,需要先执行airflow resetdb)
Python
airflow backfill dag_transform_calpi -s 2019-12-02 -e 2019-12-02 & airflow backfill dag_query -s 2019-12-02 -e 2019-12-02
## 3. 展示
(1)打开AWS EMR控制台,可以观察到集群正在创建;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy5.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy5.jpg)
(2)待集群创建完成后,获取主节点DNS,并打开网页;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy6.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy6.jpg)
(3)观察到Livy上并行提交了两个任务分别是spark对文本的tansform操作和jar包计算pi值的任务;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy7.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy7.jpg)
(4)pi值计算完成;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy8.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy8.jpg)
(5)待Transform任务完成,Spark SQL任务开始执行;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy9.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy9.jpg)
(6)执行完成后可以在s3上可以看到Transform任务生成的middle result和Spark SQL任务生成的最终结果;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy10.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy10.jpg)
(7)下载middle_result中的文件,可以看到聚合结果;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy11.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy11.jpg)
(8)下载result中的文件,可以查看到最终结果(统计编号为9的列表中包含53组数据,-1表示其他json文件没有编号为9的;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy12.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy12.jpg)
(9)任务执行完毕,发现集群自动终止;
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy13.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy13.jpg)
(10)再查看远程服务器上Airflow的web界面,发现两个dag已经执行完毕。
[](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy14.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy14.jpg)
## 4. 总结
本文展现了如何使用Airflow启动EMR集群,并通过Livy远程提交任务,在任务完成后终止集群。成本节省主要体现在两个方面:1)每天在需要执行ETL工作时启动集群,任务执行完成后终止集群,因此不会出现空闲的集群;2)EMR可以配合Spot实例使用,从而节省更多的成本。另一个好处是使用Livy无需额外配置远程提交任务的服务器,并且EMR集成了Livy的一键安装,造成了极大的方便。
## 本篇作者
<footer>
![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/Author/xiaoyj.jpg)
### [](https://amazonaws-china.com/cn/blogs/china/tag/%E8%82%96%E5%85%83%E5%90%9B/)
AWS解决方案架构师,负责基于AWS云计算方案的架构咨询和设计实现,同时致力于数据分析与AI的研究与应用。
</footer>
**作者介绍:**
翟羽翔,AWS解决方案架构师,负责基于AWS云计算方案的架构咨询和设计实现,同时致力于数据湖的应用和推广。
**本文转载自AWS技术博客。**
**原文链接:**https://amazonaws-china.com/cn/blogs/china/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy/
更多内容推荐
Python 太慢了吗?
“语言本身可能不是瓶颈,而是外部系统的限制。”
大数据基础:Spark 工作原理及基础概念
Apache Spark 是专为大规模数据处理而设计的快速通用计算引擎,在数据挖掘和机器学习领域有着广泛的应用,现在也已形成一个高速发展、应用广泛的生态系统。本文将为大家详细介绍 Spark 的核心技术原理。
40|命令式和声明式,谁才是驱动云原生的“引擎”?
这节课我们来聊聊命令式和声明式。
2023-03-10
新的 Dataproc 可选组件支持 Apache Flink 和 Docker
集群。Flink。是一种广泛使用的容器技术。集群的每个节点。集群交互。中创建容器化的服务。集群。可移植性的更多信息。
Spark 任务等待与运行策略
前面我们提到了Spark的资源分配策略,资源配置有静态和动态两种模式,不同模式在任务提交后会有不同的内存占用行为,但是由于队列资源是有限的,因此会出现任务因为资源不够导致等待的情况。本节来详细分析一下任务提交后在的等待与运行影响因素。
2021-04-19
如何用 Spark 计算引擎执行 FATE 联邦学习任务?
FATE 1.5 LTS 版本支持使用 Spark 作为底层的计算引擎,本文将对其实现细节以及使用进行简单介绍,方便用户在实际的使用过程中进行调优或者排查错误。
PyFlink 开发环境利器:Zeppelin Notebook
在 Zeppelin notebook 里利用 Conda 来创建 Python env 自动部署到 Yarn 集群中。
2021-08-25
MRS 2.0 性能再度爆破,更加简单易用
MapReduce服务上线以来,我们在用户的意见和建议中不断地进行改进和优化。
AWS 推出 Apache Airflow 全托管工作流 MWAA
最近,AWS推出了亚马逊Apache Airflow托管工作流(MWAA),这是一项全托管的服务,简化了在AWS上运行开源版Apache Airflow和构建工作流来执行ETL作业和数据管道的工作。
21|应用定义:如何使用 Helm 定义应用?
这节课,我们还是以示例应用为例子,把它从原始的 Kubernetes Manifest 改造成 Helm 应用。
2023-01-25
22|如何使用 ArgoCD 快速打造生产可用的 GitOps 工作流?
这节课,我们以示例应用为例,使用 GitHub Action 和 Helm 分别作为自动构建镜像和应用定义的工具,并通过 ArgoCD 来构建一个完整的 GitOps 工作流。
2023-01-27
看 CarbonData 如何用四招助力 Apache Spark
摘要:CarbonData 在 Apache Spark 和存储系统之间起到中介服务的作用,为 Spark 提供的4个重要功能。
2021-06-30
31|项目实战与部署:如何实现接口部署与访问?
在企业应用当中,把项目部署到服务器上,不但能让前端访问接口,也能供更多用户使用我们的平台。
2023-07-03
2. ZooKeeper 集群的 shell 操作命令
2023-09-08
Python——字符串查找 / 替换 / 分割
字符串查找、替换、分割
2021-06-10
没看过这篇文章,别说你会用 Airflow
经过几轮的迭代改进,目前Airflow 集群可以支持多条ETL pipeline,能自适应处理300多G的数据量,并稳定运行超过2年
39|GitOps 最佳实践,ArgoCD 凭什么脱颖而出?
ArgoCD 能在众多 CD 工具中脱颖而出,除了 GitOps 大背景的推动以外,其自身也具备非常多优秀的特性。
2023-03-08
Komodo Health 公司如何在 EKS 与 EMR 6 上使用多租户 Notebook 平台建立自助服务分析方案
本文介绍Komodo Health 公司如何在 EKS 与 EMR 6 上使用多租户 Notebook 平台建立自助服务分析方案。
[Python] 第一章 (建议收藏)
这篇文章带你打开Python的大门
2022-02-23
成功从 Hadoop 迁移到 Lakehouse 架构的 5 个关键步骤
从 Hadoop 迁移到基于云的现代架构(比如 Lakehouse 架构)的决定是业务决策,而非技术决策。我们在之前的文章中探讨了每一个组织都必须重新评估他们与 Hadoop 的关系的原因。本文中,我们将特别关注实际的迁移过程本身。你将学习成功迁移的关键步骤,以及 Lakehouse 架构在激发下一轮数据驱动创新中所扮演的角色。
推荐阅读
软件测试 | 测试开发 | 从跨专业手工测试转岗外包,再到 Python 测试开发,跳槽涨薪 85%!
2022-09-13
2. 集群联邦
2023-09-27
24|提示语工程(六):超越智能,让你的 AI 系统成为全知超人
2023-10-13
40|连接器:如何以 MQ 为核心搭建数据集成架构?
2023-09-20
计算中间件 Apache Linkis 正式毕业成为 Apache 顶级项目
大规模运行 Apache Airflow 的经验和教训
Python 银行取款系统
2023-02-25
电子书
大厂实战PPT下载
换一换 魏刚 | OPPO 机器学习部预估组负责人
常扬 | 合合信息 智能创新事业部研发总监
高振泽 | 美团 前端技术专家
评论 1 条评论