【AICon】探索八个行业创新案例,教你在教育、金融、医疗、法律等领域实践大模型技术! >>> 了解详情
写点什么

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler

  • 2023-02-25
    北京
  • 本文字数:5709 字

    阅读完需:约 19 分钟

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler

近日,调度系统迁移工具 Air2phin 宣布开源。借助 Air2phin,用户可 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为有调度系统迁移需要的用户带来极大便利。

Air2phin 是什么?


Air2phin 是一个最近宣布开源的调度系统迁移工具,旨在将 Apache Airflow DAGs 文件转换成 Apache DolphinScheduler Python SDK 定义文件,从而实现用户将调度系统(Workflow orchestration)从 Airflow 迁移到 DolphinScheduler 的目的。它是一个基于多规则的 AST 转换器,使用 LibCST 来解析和转换 Airflow 的 DAG 代码,其全部规则使用 Yaml 文件定义,并提供了一定的自定义规则扩展能力。


近期,Air2phin 已经发布了 0.0.12 版本,提供了丰富的功能,可以更好地帮助用户完成 Airflow 到 Apache DolphinScheduler 的迁移。


注释:AST 是 Abstract Syntax Tree(抽象语法树)的缩写,它是一种以树状结构表示代码语法结构的数据结构。在编译器中,AST 是由词法分析器和语法分析器生成的。词法分析器将源代码转换成标记流(token stream),语法分析器将标记流转换成抽象语法树。AST 是一种树状结构,它由一系列节点组成,每个节点表示代码中的一个语法结构(如表达式、语句、函数、类等),节点之间的关系表示语法结构之间的嵌套关系。

为什么开源 Air2phin?


可能有人会问,为什么我需要一个迁移工具?这是因为随着业务的发展,企业或组织原来使用的工作流编排系统已经无法满足当前的需求,需要将工作流编排系统迁移到新的平台或者更新到新的版本。经过调研,很多用户有了将调度系统从开源工作流编排系统 Airflow 迁移到 Apache DolphinScheduler 上来的需求。


在迁移过程中,由于数据处理任务可能涉及多个系统之间的依赖关系,迁移过程需要确保在不影响业务运行的前提下完成。此时,调度系统迁移工具就可以发挥重要作用,它能减少人工干预,尽量自动化地完成两个调度系统间的迁移工作,并且能兼容多个系统间的多个版本,几乎可以做到用户无干预完成迁移。

为此,白鲸开源专门研发了开源迁移工具 Air2phin,可以让用户 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为用户带来极大的便利。


为了让大家更好地理解 Air2phin 的重要性,我们先从调度系统的相关背景知识开始,了解将调度系统从 Airflow 迁移至 Apache DolphinScheduler 的好处。


为什么要从 Airflow 迁移至 DolphinScheduler?

什么是工作流编排系统?


工作流编排系统,是以尊重编排规则和业务逻辑的方式管理数据流。工作流编排工具让用户可以将多个有关联的任务转换为可以安排、运行和观测的工作流,帮助企业更好地管理和控制业务流程,从而提高业务效率。工作流编排是数据处理流程中不可或缺的组件之一,负责根据预先定义的规则和逻辑执行数据处理任务,确保数据处理流程按照预期顺利执行,常见工作流编排系统包括 Apache DolphinScheduler、Apache Airflow、Apache Oozie, Azkaban 等。

Airflow 是什么?


其中,Apache Airflow 是一个开源的工作流编排系统,它可以帮助用户创建、调度和监控复杂的工作流程。Airflow 最初由 Airbnb 开发,并于 2016 年开源,现在由 Apache 软件基金会维护。Airflow 使用 Python 语言编写,具有高度的可扩展性和灵活性,支持多种任务类型,如计算、数据处理、通知、交互等。Airflow 的工作流程是通过编写 Python 脚本来定义的,可以使用 Airflow 提供的操作符和钩子,以及自定义操作符和钩子来扩展其功能。但其有着不可忽视的缺陷,比如需要需要深度二次开发,脱离社区版本,升级成本高;Python 技术栈维护迭代成本高;scheduler loop 扫描 Dag folder 延迟降低性能的问题;以及在生产环境中使用稳定性差等。



在新数据时代业务需求下诞生的 Apache DolphinScheduler 是一个开源的分布式工作流调度系统,弥补了以往调度系统的弱势,旨在为企业用户提供一种可靠、高效、易于使用的工作流调度平台,支持多种任务类型,如计算、数据处理、ETL 等。


与 Airflow 相比,DolphinScheduler 采用了分布式架构,提供了多种任务类型,用户可以定义任务之间的依赖关系,设置任务的优先级和调度策略等,其使用可视化的界面来创建和管理工作流程的特性更是与 Airflow 形成鲜明对比,变得更加易于操作,对非编程人员来说更加友好。



经过调研对比,对于很多用户来说,将调度系统迁移至 Apache DolphinScheduler 是一个降本增效的更优选择。

Air2phin 如何安装和使用

Air2phin 是一个 python 的包,可以通过 Python 的包安装工具 pip 完成安装,详见 air2phin getting start。


python -m pip install --upgrade air2phin
复制代码


一个简单的例子

我们通过一个简单的例子,来说明如何使用 Air2phin 的。我们截取了 airflow tutorial.py 中的部分代码作为 Air2phin 转化的例子,来说明 Air2phin 如何逐步完成转化成 dolphinscheduler python sdk。

图 1:airflow tutorial.py 中的部分代码

图 2:Air2phin 如何逐步完成转化成 dolphinscheduler python sdk


假设将 airflow tutorial.py 部分内容保存至文件 tutorial_part.py,想要将其转化成 dolphinscheduler python sdk 定义,只需要一行命令就能完成。结果如图 2 所示,因为命令增加了 --inplace 参数,所以 Air2phin 会直接将原文件覆盖,如果不需要覆盖原问题,可以不使用 --inplace 参数,Air2phin 会新增一个 tutorial_part-air2phin.py 文件来保存转化后的内容。


air2phin migrate --inplace tutorial_part.py
复制代码

通过观察,我们发现这次转化分别触发了多条转化规则,包括

  • 将 airflow.DAG 转换成 pydolphinscheduler.core.process_definition.ProcessDefinition,这个规则在第三行(import 语句)以及第六行 DAG context

  • 将 airflow.operators.bash.BashOperator 转换成 pydolphinscheduler.tasks.shell.Shell,这个规则在任务 t1,t2 中都被使用

  • 除了对应的类转化之外,我们需要将类的属性进行转化,如将 airflow.DAG.schedule_interval 转换成了 ProcessDefinition.schedule,同时修改了部分值的内容,如将 timedelta(days=1) 转成 '0 0 0 * * ? *'

最后,我们只需要安装 pydolphinscheduler ,并且将转化后的文件通过 python 运行,就能完成工作流的迁移了,详见 pydolphinscheduler 使用(https://dolphinscheduler.apache.org/python/main/start.html#installing-pydolphinscheduler)

# 安装 apache-dolphinschedulerpython -m pip install apache-dolphinscheduler# 将工作流提交到 dolphinschedulerpython tutorial_part.py
复制代码


在运行 python tutorial_part.py 时,需要保证 dolphinscheduler API 和 python gateway 服务已经启动,并且开放了对应的端口,详见启动 python gateway service。


至此,我们通过一个简单的例子,说明了 Air2phin 是如何完成迁移的。


工作原理

Airflow 和 dolphinscheduler python sdk 如何工作?

在了解 Air2phin 如果工作之前,先了解 Airflow 和 dolphinscheduler python sdk 如何工作是非常重要的前置条件,帮助我们更好地了解 Air2phin 的迁移步骤,当遇到问题的时候也能更加从容地应对。


  • Airflow 如何工作:Airflow 工作流相关的信息都保存在 DAG 文件中,之后将 DAG 文件放置到 Airflow 的指定目录,Airflow 的 Scheduler 会间隔一定时间去扫描和解析 Airflow 的 DAG 文件,所以 DAG 文件是被动被扫描和更新的。

  • dolphinscheduler python sdk: 同 Airflow 类似,将全部工作流相关的信息都通过 Python 文件定义,但是 dolphinscheduler python sdk 是通过人为主动触发的方式,将工作流信息提交,运行命令 python 工作流文件名 即可完成主动任务提交。

Air2phin 工作流程


了解完两者是如何使用,如何提交/发现工作流的,将更加利于我们对 Air2phin 的工作原理的理解。因为 Airflow 的 DAG 文件以及 DolphinScheduler 的 Python sdk 定义文件都是 Python 编写的,所以 Air2phin 的大部分代码都是处理两者间的差异,最后将 Airflow 的代码转化成 dolphinscheduler python sdk 和定义。


Air2phin 使用了 LibCST(https://libcst.readthedocs.io/en/latest/) 来实现 airflow python DAG 代码的抽象语法树解析,然后通过 LibCST 的 Transformer(https://libcst.readthedocs.io/en/latest/tutorial.html#Build-Visitor-or-Transformer)结合转化规则最后转化成 dolphinscheduler python sdk 的定义。


Air2phin 整体工作流程如下:

  • 从标准输入或者文件中获取原本的 Airflow DAG 内容

  • 从 Yaml 文件加载所有转换规则

  • 将 Airflow DAG 内容通过 LibCST 解析成 CST 树

  • 通过 LibCST Transformer 转换 dolphinscheduler python sdk 定义内容

Air2phin 最佳实践

迁移整个文件夹而不是单个文件

当用户想要迁移 Airflow 到 DolphinScheduler 的时候,都是想要整体做迁移而不是单个文件迁移的,Air2phin 提供整体文件夹迁移的能力,只需要将路径从文件路径改成文件夹即可。


# 迁移整个 ~/airflow/dags 文件夹air2phin migrate --inplace ~/airflow/dags
复制代码


增加自定义的规则

部分使用 Airflow 的用户自定义 Hook 或者 Operator,用户自定义的 Operator 无法通过 Air2phin 内置的转化规则完成转化,需要用户增加自定义的规则,并告诉 Air2phin 规则的位置。例如我们有一个叫 MyCustomOperator 的算子是继承 PostgresOperator 的大部分功能, 只是命名不一样,其定义如下:


from airflow.providers.postgres.operators.postgres import PostgresOperatorclass MyCustomOperator(PostgresOperator):    def __init__(        self,        *,        sql: str | Iterable[str],        my_custom_conn_id: str = 'postgres_default',        autocommit: bool = False,        parameters: Iterable | Mapping | None = None,        database: str | None = None,        runtime_parameters: Mapping | None = None,        **kwargs,    ) -> None:        super().__init__(            sql=sql,            postgres_conn_id=my_custom_conn_id,            autocommit=autocommit,            parameters=parameters,            database=database,            runtime_parameters=runtime_parameters,            **kwargs,        )
复制代码

它在 Airflow 的多个 DAG 中被使用,使用的方式如下:

from custom.my_custom_operator import MyCustomOperatorwith DAG(    dag_id='my_custom_dag',    default_args=default_args,    schedule_interval='@once',    start_date=days_ago(2),    tags=['example'],) as dag:    t1 = MyCustomOperator(        task_id='my_custom_task',        sql='select * from table',        my_custom_conn_id='my_custom_conn_id',    )
复制代码

现在需要对这个 Operator 进行转化,我们可以自定义一个转化规则,并将其命名为 MyCustomOperator.yaml,内容如下,最主要的内容是 migration.module 和 migration.parameter 的定义,其确定了转化规则:

name: MyCustomOperatordescription: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.migration:  module:    - action: replace      src: custom.my_custom_operator.MyCustomOperator      dest: pydolphinscheduler.tasks.sql.Sql  parameter:    - action: replace      src: task_id      dest: name    - action: replace      src: my_custom_conn_id      dest: datasource_name
复制代码

再使用 --custom-rules 参数指定转化自定义参数,就能应用自定义规则的转化:


# 指定自定义规则路径为 /path/to/MyCustomOperator.yamlair2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags
复制代码


让 Air2phin 运行地更快

Air2phin 默认是一个进程运行 DAG 文件的转化的,当你有许多 DAG 文件时,Air2phin 转化非常耗时,我们提供了一个启动多进程运行 Air2phin 转化的参数 --multiprocess,可以将其指定为用户机器的 CPU 数量来缩短转化时间:


# 指定 air2phin 启动 12 个进程同时进行转化air2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml --multiprocess 12 ~/airflow/dags
复制代码


存在的问题


目前,作为一个转化工具,Air2phin 的使用方式已经算比较完善了,能够满足用户迁移调度系统的基本需求,但还有一些地方有待完善。


内置规则还不够多

转化规则还不够多,目前只有五个,分别是:

  • airflow.DAG

  • airflow.operators.bash.BashOperator

  • airflow.operators.dummy_operator.DummyOperator

  • airflow.operators.python_operator.PythonOperator

  • airflow.operators.spark_sql_operator.SparkSqlOperator


如果有更多的规则,Air2phin 将成为一个更加好用的转化工具,这里欢迎各位随时提交转化规则的 PR(https://github.com/WhaleOps/air2phin/pulls)


部分 Airflow 的用法不能被迁移过来


部分概念仅仅在 Airflow 中有,在 DolphinScheduler 中还没有,如任务的成功、失败、重试、触发 callback,任务的 owner,variable,工作流并发数,tag 等,这部分 Airflow DAG 可以被迁移,但兼容的属性将会丢失,无法迁移到 DolphinScheduler。

Air2phin 常见问题解答


Q:为什么选择解析 Airflow DAG 文件而不是数据库?

A:因为 Airflow DAG 文件中才有完成的工作流信息,Airflow 的数据库中只有工作流基本信息,没有任务定义的信息,也没有任务的关系,我们选择通过解析 Airflow 的 DAG 文件而不是数据库来完成转化。


Q:为什么要通过 dolphinscheduler python sdk 做中转不自己提交到 DolphinScheduler?

A:因为 Airflow DAG 就是 Python 定义的,在 Airflow DAG 中有很多 Python 的特性,我们不想将这部分特性转化成结构化的数据(转化可能存在信息丢失),恰好 DolphinScheduler 已经有了 Python 的 sdk,所以直接通过 LibCST 转化是成本更加低的做法。


Q:为什么使用 LibCST 而不是 python 内置的 AST?

A:因为 LibCST 更加符合我们,Python 内置的 AST 库解析成 AST 的时候会丢失掉 comment 的信息,但是我们呢希望保留着部分信息。且 LibCST 提供更加多 visitor 保证我们更加方便的实现替换。

参考链接:air2phin(https://github.com/WhaleOps/air2phin)

2023-02-25 12:303642

评论

发布
暂无评论
发现更多内容

生成式AI:游戏研发的革命者

百度开发者中心

游戏开发 #人工智能 生成式AI 文心一言

隐语小课|私有信息检索(PIR)及其应用场景

隐语SecretFlow

AI 数据安全 隐私计算 开源社区 数据要素

说点大实话丨知名技术博主 Kirito 测评云原生网关

阿里巴巴云原生

阿里云 微服务 云原生

钱包量化多币种质押挖矿系统开发合约源代码详情

V\TG【ch3nguang】

钱包系统开发 质押挖矿

低成本生成式AI:引领未来内容创作新篇章

百度开发者中心

#人工智能 生成式AI 文心一言

九科信息成功签约中咨数据有限公司RPA项目

九科Ninetech

Footprint Analytics 与 GasZero 达成合作,将打造 “0 Gas” 区块链生态系统的未来

Footprint Analytics

区块链 web3

轻量应用服务器和云服务器的区别

天翼云开发者社区

服务器 云服务器

质押挖矿模式系统开发,矿池系统部署搭建

V\TG【ch3nguang】

挖矿矿池系统开发案例 质押挖矿

浪潮信息 KeyarchOS 助力 IT 企业安全管理业务完成 CentOS 迁移替换 | 龙蜥案例

OpenAnolis小助手

开源 操作系统 IT 浪潮信息 龙蜥案例

量化智能机器人开发,炒币机器人功能部署搭建

V\TG【ch3nguang】

量化交易机器人开发 炒币机器人

开源微服务如何选型?Spring Cloud、Dubbo、gRPC、Istio 详细对比

阿里巴巴云原生

阿里云 云原生 dubbo

ARTS 打卡第二周

直须

个人成长 前端 ARTS 打卡计划

HoudahSpot最新中文版+补丁安装教程

胖墩儿不胖y

Mac软件 文件搜索 搜索工具 搜索软件

生成式AI技术原理与应用

百度开发者中心

#人工智能 生成式AI 文心一言

判断LED显示屏的质量指南

Dylan

性能 质量 环境 LED显示屏

5分钟,结合 LangChain 搭建自己的生成式智能问答系统

字节跳动云原生计算

大数据 云搜索

基于云原生网关的流量防护实践

阿里巴巴云原生

阿里云 云原生

使用EF Core更新与修改生产数据库

高端章鱼哥

EF Core .net6

关于工厂数字孪生应用实例的解析

3DCAT实时渲染

数字孪生 实时渲染

10倍性价比,万物新生基于 StarRocks 无缝直替 Trino

StarRocks

数据库 数据仓库 StarRocks trino

生成式AI掀起创意新革命

百度开发者中心

#人工智能 AI作画 生成式AI 文心一言

避坑PCB的常见设计问题

华秋电子

PCB

业财融合背景下,全面预算管理的发展之路

智达方通

业财融合 全面预算管理 全面预算管理系统 企业全面预算管理

和鲸科技为临床医学科研场景打造可供多角色协同的低代码研究平台

ModelWhale

人工智能 低代码 数据科学 在线编程 临床研究

安全可信| 首批!通过两项算力调度能力评估!

天翼云开发者社区

云计算 云服务

购买矿机产出代币模式系统开发搭建

V\TG【ch3nguang】

代币 挖矿矿池系统开发案例

缓存更新的四种策略及选取建议

这我可不懂

缓存 应用程序

基于 Argo CD 与 Argo Workflows 的 GreptimeDB 云端自动化升级实践

Greptime 格睿科技

rust 时序数据库 云原生数据库 国产时序数据库 自动升级

一劳永逸,解决.NET发布云服务器的时区问题

互联网工科生

.net 云服务器 时区

Dropzone 4 for Mac(文件拖拽操作增强工具) 4.6.8中文激活版

mac

苹果mac Windows软件 Dropzone 4 文件管理器

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler_大数据_钟嘉杰_InfoQ精选文章