生成式AI领域的最新成果都在这里!抢 QCon 展区门票 了解详情
写点什么

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler

  • 2023-02-25
    北京
  • 本文字数:5709 字

    阅读完需:约 19 分钟

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler

近日,调度系统迁移工具 Air2phin 宣布开源。借助 Air2phin,用户可 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为有调度系统迁移需要的用户带来极大便利。

Air2phin 是什么?


Air2phin 是一个最近宣布开源的调度系统迁移工具,旨在将 Apache Airflow DAGs 文件转换成 Apache DolphinScheduler Python SDK 定义文件,从而实现用户将调度系统(Workflow orchestration)从 Airflow 迁移到 DolphinScheduler 的目的。它是一个基于多规则的 AST 转换器,使用 LibCST 来解析和转换 Airflow 的 DAG 代码,其全部规则使用 Yaml 文件定义,并提供了一定的自定义规则扩展能力。


近期,Air2phin 已经发布了 0.0.12 版本,提供了丰富的功能,可以更好地帮助用户完成 Airflow 到 Apache DolphinScheduler 的迁移。


注释:AST 是 Abstract Syntax Tree(抽象语法树)的缩写,它是一种以树状结构表示代码语法结构的数据结构。在编译器中,AST 是由词法分析器和语法分析器生成的。词法分析器将源代码转换成标记流(token stream),语法分析器将标记流转换成抽象语法树。AST 是一种树状结构,它由一系列节点组成,每个节点表示代码中的一个语法结构(如表达式、语句、函数、类等),节点之间的关系表示语法结构之间的嵌套关系。

为什么开源 Air2phin?


可能有人会问,为什么我需要一个迁移工具?这是因为随着业务的发展,企业或组织原来使用的工作流编排系统已经无法满足当前的需求,需要将工作流编排系统迁移到新的平台或者更新到新的版本。经过调研,很多用户有了将调度系统从开源工作流编排系统 Airflow 迁移到 Apache DolphinScheduler 上来的需求。


在迁移过程中,由于数据处理任务可能涉及多个系统之间的依赖关系,迁移过程需要确保在不影响业务运行的前提下完成。此时,调度系统迁移工具就可以发挥重要作用,它能减少人工干预,尽量自动化地完成两个调度系统间的迁移工作,并且能兼容多个系统间的多个版本,几乎可以做到用户无干预完成迁移。

为此,白鲸开源专门研发了开源迁移工具 Air2phin,可以让用户 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为用户带来极大的便利。


为了让大家更好地理解 Air2phin 的重要性,我们先从调度系统的相关背景知识开始,了解将调度系统从 Airflow 迁移至 Apache DolphinScheduler 的好处。


为什么要从 Airflow 迁移至 DolphinScheduler?

什么是工作流编排系统?


工作流编排系统,是以尊重编排规则和业务逻辑的方式管理数据流。工作流编排工具让用户可以将多个有关联的任务转换为可以安排、运行和观测的工作流,帮助企业更好地管理和控制业务流程,从而提高业务效率。工作流编排是数据处理流程中不可或缺的组件之一,负责根据预先定义的规则和逻辑执行数据处理任务,确保数据处理流程按照预期顺利执行,常见工作流编排系统包括 Apache DolphinScheduler、Apache Airflow、Apache Oozie, Azkaban 等。

Airflow 是什么?


其中,Apache Airflow 是一个开源的工作流编排系统,它可以帮助用户创建、调度和监控复杂的工作流程。Airflow 最初由 Airbnb 开发,并于 2016 年开源,现在由 Apache 软件基金会维护。Airflow 使用 Python 语言编写,具有高度的可扩展性和灵活性,支持多种任务类型,如计算、数据处理、通知、交互等。Airflow 的工作流程是通过编写 Python 脚本来定义的,可以使用 Airflow 提供的操作符和钩子,以及自定义操作符和钩子来扩展其功能。但其有着不可忽视的缺陷,比如需要需要深度二次开发,脱离社区版本,升级成本高;Python 技术栈维护迭代成本高;scheduler loop 扫描 Dag folder 延迟降低性能的问题;以及在生产环境中使用稳定性差等。



在新数据时代业务需求下诞生的 Apache DolphinScheduler 是一个开源的分布式工作流调度系统,弥补了以往调度系统的弱势,旨在为企业用户提供一种可靠、高效、易于使用的工作流调度平台,支持多种任务类型,如计算、数据处理、ETL 等。


与 Airflow 相比,DolphinScheduler 采用了分布式架构,提供了多种任务类型,用户可以定义任务之间的依赖关系,设置任务的优先级和调度策略等,其使用可视化的界面来创建和管理工作流程的特性更是与 Airflow 形成鲜明对比,变得更加易于操作,对非编程人员来说更加友好。



经过调研对比,对于很多用户来说,将调度系统迁移至 Apache DolphinScheduler 是一个降本增效的更优选择。

Air2phin 如何安装和使用

Air2phin 是一个 python 的包,可以通过 Python 的包安装工具 pip 完成安装,详见 air2phin getting start。


python -m pip install --upgrade air2phin
复制代码


一个简单的例子

我们通过一个简单的例子,来说明如何使用 Air2phin 的。我们截取了 airflow tutorial.py 中的部分代码作为 Air2phin 转化的例子,来说明 Air2phin 如何逐步完成转化成 dolphinscheduler python sdk。

图 1:airflow tutorial.py 中的部分代码

图 2:Air2phin 如何逐步完成转化成 dolphinscheduler python sdk


假设将 airflow tutorial.py 部分内容保存至文件 tutorial_part.py,想要将其转化成 dolphinscheduler python sdk 定义,只需要一行命令就能完成。结果如图 2 所示,因为命令增加了 --inplace 参数,所以 Air2phin 会直接将原文件覆盖,如果不需要覆盖原问题,可以不使用 --inplace 参数,Air2phin 会新增一个 tutorial_part-air2phin.py 文件来保存转化后的内容。


air2phin migrate --inplace tutorial_part.py
复制代码

通过观察,我们发现这次转化分别触发了多条转化规则,包括

  • 将 airflow.DAG 转换成 pydolphinscheduler.core.process_definition.ProcessDefinition,这个规则在第三行(import 语句)以及第六行 DAG context

  • 将 airflow.operators.bash.BashOperator 转换成 pydolphinscheduler.tasks.shell.Shell,这个规则在任务 t1,t2 中都被使用

  • 除了对应的类转化之外,我们需要将类的属性进行转化,如将 airflow.DAG.schedule_interval 转换成了 ProcessDefinition.schedule,同时修改了部分值的内容,如将 timedelta(days=1) 转成 '0 0 0 * * ? *'

最后,我们只需要安装 pydolphinscheduler ,并且将转化后的文件通过 python 运行,就能完成工作流的迁移了,详见 pydolphinscheduler 使用(https://dolphinscheduler.apache.org/python/main/start.html#installing-pydolphinscheduler)

# 安装 apache-dolphinschedulerpython -m pip install apache-dolphinscheduler# 将工作流提交到 dolphinschedulerpython tutorial_part.py
复制代码


在运行 python tutorial_part.py 时,需要保证 dolphinscheduler API 和 python gateway 服务已经启动,并且开放了对应的端口,详见启动 python gateway service。


至此,我们通过一个简单的例子,说明了 Air2phin 是如何完成迁移的。


工作原理

Airflow 和 dolphinscheduler python sdk 如何工作?

在了解 Air2phin 如果工作之前,先了解 Airflow 和 dolphinscheduler python sdk 如何工作是非常重要的前置条件,帮助我们更好地了解 Air2phin 的迁移步骤,当遇到问题的时候也能更加从容地应对。


  • Airflow 如何工作:Airflow 工作流相关的信息都保存在 DAG 文件中,之后将 DAG 文件放置到 Airflow 的指定目录,Airflow 的 Scheduler 会间隔一定时间去扫描和解析 Airflow 的 DAG 文件,所以 DAG 文件是被动被扫描和更新的。

  • dolphinscheduler python sdk: 同 Airflow 类似,将全部工作流相关的信息都通过 Python 文件定义,但是 dolphinscheduler python sdk 是通过人为主动触发的方式,将工作流信息提交,运行命令 python 工作流文件名 即可完成主动任务提交。

Air2phin 工作流程


了解完两者是如何使用,如何提交/发现工作流的,将更加利于我们对 Air2phin 的工作原理的理解。因为 Airflow 的 DAG 文件以及 DolphinScheduler 的 Python sdk 定义文件都是 Python 编写的,所以 Air2phin 的大部分代码都是处理两者间的差异,最后将 Airflow 的代码转化成 dolphinscheduler python sdk 和定义。


Air2phin 使用了 LibCST(https://libcst.readthedocs.io/en/latest/) 来实现 airflow python DAG 代码的抽象语法树解析,然后通过 LibCST 的 Transformer(https://libcst.readthedocs.io/en/latest/tutorial.html#Build-Visitor-or-Transformer)结合转化规则最后转化成 dolphinscheduler python sdk 的定义。


Air2phin 整体工作流程如下:

  • 从标准输入或者文件中获取原本的 Airflow DAG 内容

  • 从 Yaml 文件加载所有转换规则

  • 将 Airflow DAG 内容通过 LibCST 解析成 CST 树

  • 通过 LibCST Transformer 转换 dolphinscheduler python sdk 定义内容

Air2phin 最佳实践

迁移整个文件夹而不是单个文件

当用户想要迁移 Airflow 到 DolphinScheduler 的时候,都是想要整体做迁移而不是单个文件迁移的,Air2phin 提供整体文件夹迁移的能力,只需要将路径从文件路径改成文件夹即可。


# 迁移整个 ~/airflow/dags 文件夹air2phin migrate --inplace ~/airflow/dags
复制代码


增加自定义的规则

部分使用 Airflow 的用户自定义 Hook 或者 Operator,用户自定义的 Operator 无法通过 Air2phin 内置的转化规则完成转化,需要用户增加自定义的规则,并告诉 Air2phin 规则的位置。例如我们有一个叫 MyCustomOperator 的算子是继承 PostgresOperator 的大部分功能, 只是命名不一样,其定义如下:


from airflow.providers.postgres.operators.postgres import PostgresOperatorclass MyCustomOperator(PostgresOperator):    def __init__(        self,        *,        sql: str | Iterable[str],        my_custom_conn_id: str = 'postgres_default',        autocommit: bool = False,        parameters: Iterable | Mapping | None = None,        database: str | None = None,        runtime_parameters: Mapping | None = None,        **kwargs,    ) -> None:        super().__init__(            sql=sql,            postgres_conn_id=my_custom_conn_id,            autocommit=autocommit,            parameters=parameters,            database=database,            runtime_parameters=runtime_parameters,            **kwargs,        )
复制代码

它在 Airflow 的多个 DAG 中被使用,使用的方式如下:

from custom.my_custom_operator import MyCustomOperatorwith DAG(    dag_id='my_custom_dag',    default_args=default_args,    schedule_interval='@once',    start_date=days_ago(2),    tags=['example'],) as dag:    t1 = MyCustomOperator(        task_id='my_custom_task',        sql='select * from table',        my_custom_conn_id='my_custom_conn_id',    )
复制代码

现在需要对这个 Operator 进行转化,我们可以自定义一个转化规则,并将其命名为 MyCustomOperator.yaml,内容如下,最主要的内容是 migration.module 和 migration.parameter 的定义,其确定了转化规则:

name: MyCustomOperatordescription: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.migration:  module:    - action: replace      src: custom.my_custom_operator.MyCustomOperator      dest: pydolphinscheduler.tasks.sql.Sql  parameter:    - action: replace      src: task_id      dest: name    - action: replace      src: my_custom_conn_id      dest: datasource_name
复制代码

再使用 --custom-rules 参数指定转化自定义参数,就能应用自定义规则的转化:


# 指定自定义规则路径为 /path/to/MyCustomOperator.yamlair2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags
复制代码


让 Air2phin 运行地更快

Air2phin 默认是一个进程运行 DAG 文件的转化的,当你有许多 DAG 文件时,Air2phin 转化非常耗时,我们提供了一个启动多进程运行 Air2phin 转化的参数 --multiprocess,可以将其指定为用户机器的 CPU 数量来缩短转化时间:


# 指定 air2phin 启动 12 个进程同时进行转化air2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml --multiprocess 12 ~/airflow/dags
复制代码


存在的问题


目前,作为一个转化工具,Air2phin 的使用方式已经算比较完善了,能够满足用户迁移调度系统的基本需求,但还有一些地方有待完善。


内置规则还不够多

转化规则还不够多,目前只有五个,分别是:

  • airflow.DAG

  • airflow.operators.bash.BashOperator

  • airflow.operators.dummy_operator.DummyOperator

  • airflow.operators.python_operator.PythonOperator

  • airflow.operators.spark_sql_operator.SparkSqlOperator


如果有更多的规则,Air2phin 将成为一个更加好用的转化工具,这里欢迎各位随时提交转化规则的 PR(https://github.com/WhaleOps/air2phin/pulls)


部分 Airflow 的用法不能被迁移过来


部分概念仅仅在 Airflow 中有,在 DolphinScheduler 中还没有,如任务的成功、失败、重试、触发 callback,任务的 owner,variable,工作流并发数,tag 等,这部分 Airflow DAG 可以被迁移,但兼容的属性将会丢失,无法迁移到 DolphinScheduler。

Air2phin 常见问题解答


Q:为什么选择解析 Airflow DAG 文件而不是数据库?

A:因为 Airflow DAG 文件中才有完成的工作流信息,Airflow 的数据库中只有工作流基本信息,没有任务定义的信息,也没有任务的关系,我们选择通过解析 Airflow 的 DAG 文件而不是数据库来完成转化。


Q:为什么要通过 dolphinscheduler python sdk 做中转不自己提交到 DolphinScheduler?

A:因为 Airflow DAG 就是 Python 定义的,在 Airflow DAG 中有很多 Python 的特性,我们不想将这部分特性转化成结构化的数据(转化可能存在信息丢失),恰好 DolphinScheduler 已经有了 Python 的 sdk,所以直接通过 LibCST 转化是成本更加低的做法。


Q:为什么使用 LibCST 而不是 python 内置的 AST?

A:因为 LibCST 更加符合我们,Python 内置的 AST 库解析成 AST 的时候会丢失掉 comment 的信息,但是我们呢希望保留着部分信息。且 LibCST 提供更加多 visitor 保证我们更加方便的实现替换。

参考链接:air2phin(https://github.com/WhaleOps/air2phin)

2023-02-25 12:303626

评论

发布
暂无评论
发现更多内容

Elasticsearch 新机型发布,性能提升30%

腾讯云大数据

大数据 elasticsearch Elastic Stack

传统巨头抢占区块链场景高地 医疗、汽车、金融成为热门赛道

CECBC

区块链 金融

建信金科大咖访谈:金融衍生品定价与建行实践

金科优源汇

金融科技 金融创新

大咖直播 | Elasticsearch 应用监控管理平台搭建实战

腾讯云大数据

大数据 elasticsearch Elastic Stack 监控管理平台

自建本地电话告警系统

周楠

运维 物联网 监控告警

这才是你需要的C/C++Linux学习路线!

赖猫

c++ Linux 后台开发 服务器开发

一口气说出四种幂等性解决方案,面试官露出了姨母笑~

不才陈某

Java 分布式 接口

云南区块链电子发票全面推广啦!

CECBC

区块链 纳税人

记一次MapReduce的内存溢出

AI乔治

Java mapreduce 架构 内存溢出

静态代码分析工具评估指标及方法

maijun

告别消费主义的双12,是华为云12.12会员节真正的意义

脑极体

Java中多线程安全问题实例分析

叫练

Java 多线程 什么是多线程 多线程与高并发

第13周作业

饭桶

Shell脚本命令常用技巧

MySQL从删库到跑路

shell脚本编写

太赞了!滴滴开源了一套分布式ID的生成系统...

Java架构师迁哥

住建部等六部门:广泛运用区块链等技术,建设智慧物业管理服务平台

CECBC

物业生活

浅谈JDK并发包下面的分治思想及分治思想在高并发场景的运用

AI乔治

Java 架构 jdk 分布式 多线程与高并发

AWS 助力贝壳VR看房走出国门,升级全球居住服务新体验

亚马逊云科技 (Amazon Web Services)

AWS

Apache顶级项目ShardingSphere — SQL Parser的设计与实现

京东数科风险算法与技术

数据库 开源 中间件

即构实时音视频多中心调度设计

ZEGO即构

EMAS远程日志 - 移动端问题排查利器

移动研发平台EMAS

阿里云 运维 日志 监控告警 应用

附PPT丨AI和云原生时代的数据库进化之路

dbaplus社群

数据库 云原生

云原生架构-可观测性之 Prometheus 服务自动发现

云原生实验室

互联网应用系统常见问题与方案

raox

极客大学架构师训练营

第十三周学习总结

饭桶

如何在数智化时代少走弯路? 这里有100个案例可以借鉴

京东科技开发者

DevOps 云原生

图文回顾丨北京「解构云原生:企业数字化转型新支点」沙龙

Rancher

k8s rancher

信任的传递——为什么我们需要第三方授权?

张凯峰

证书 身份认证

几种常见的研发管理体系,哪种更适合你?

菜根老谭

敏捷开发 研发管理 CMMI IPD

搭建网站/APP最全准备攻略

前嗅大数据

小程序 建站 APP发布

面向全场景模块化设计 京东智联云的服务器部署有多灵活?

京东科技开发者

服务器 云主机

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler_大数据_钟嘉杰_InfoQ精选文章