最新发布《数智时代的AI人才粮仓模型解读白皮书(2024版)》,立即领取! 了解详情
写点什么

实现标记自动化:将海量元数据引入 Data Catalog

Shirley Cohen & Shekhar Bapat

  • 2020-11-03
  • 本文字数:4899 字

    阅读完需:约 16 分钟

实现标记自动化:将海量元数据引入 Data Catalog

Data Catalog 让您能够通过交互式接口摄取和编辑业务元数据。它包括可用于实现常见任务自动化的编程接口。许多企业必须使用 Data Catalog 定义和采集一组元数据,因此,我们将在这里提供一些关于如何从长期角度声明、创建和维护这类元数据的最佳实践。


在以往的文章中,我们介绍了标签模板能够如何通过描述用于对数据资产进行分类的词汇表来促进数据发现、治理和质量控制。在本文中,我们将探讨如何使用标签模板对数据进行标记。标记指创建一个标签模板的实例并为模板字段分配值,以对特定数据资产进行分类。撰写本文时,Data Catalog 支持三种存储后端:BigQuery、Cloud Storage 和 Pub/Sub。在此,我们将着重介绍如何对在这些后端中存储的资产进行标记,例如,表、列、文件和消息主题。


我们将介绍适合在数据湖和数据仓库环境中标记数据的三种使用模式:配置新数据源、处理派生数据以及更新标签和模板。对于每种应用场景,您将了解到我们推荐的用于大规模标记数据的方法。

1. 配置数据源

配置数据源通常涉及几种活动:根据存储后端创建表或者文件、利用一些初始数据填充它们以及对这些资源设置访问权限。我们在此基础至上还多增加了一种活动:在 Data Catalog 中标记新创建的资源。以下是涉及的具体步骤:


标记数据源需要了解拟使用的标签模板的含义以及数据源中的数据语义的领域专家。基于所具有的知识,领域专家会选择附加哪些模板以及从这些模板创建哪类标签。鉴于许多决策依赖于标签的准确性,人的参与至关重要。


基于我们与客户的合作经验,我们观察到两种类型的标签。一种类型称为 static(静态),因为,字段值是预先知道的,并且预计很少变更。另一种类型称为 dynamic(动态),因为字段值会根据基础数据的内容定期变更。静态标签的一个示例是包括 data_domain(数据域)、data confidentiality(数据保密性)和 data_retention(数据保留)的数据治理字段的集合。这些字段的值由组织的数据使用策略决定。它们通常在数据源创建时便已知晓,而且不会频繁变更。动态标签的一个示例是数据质量字段的集合,例如,number_values(数值)、unique_values(唯一值)、min_value(最小值)和 max_value(最大值)。每当运行新的负载或者对数据源进行修改时,这些字段值预计会频繁变更。


除了这些差异,静态标签还有级联属性,表明其字段应当以何种方式传播 —— 从源到派生数据。(在后续部分,我们将进一步详解这一概念。)与此形成对照的是,动态标签有查询表达式和刷新属性,指示应当用于计算字段值的查询以及计算的频率。在第一个代码段显示了一个静态标签的配置示例,第二个代码段显示的是动态标签的示例。


tag_config:    template:     - template_id: dg_template     - project_id: sandbox     - region: us-central1   fields:      - {name: data_domain, value: HR, cascade: true}     - {name: data_confidentiality, value: SENSITIVE, cascade: true}     - {name: data_retention, value: 30_DAYS, cascade: false}   lineage:      - template_id: derived_template     - parents_field: parents   rules:     - included_uri_patterns: bigquery/project/sandbox/dataset/covid/*     - excluded_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_input_*     - included_uri_patterns: pubsub/project/sandbox/subscriptions/employee-RTO

复制代码


基于 YAML 的静态标签配置


tag_config:    template:     - template_id: dg_template     - project_id: sandbox     - region: us-central1   refresh: 1-hour   fields:      - {name: count, query_expression: select count(rto) from $$}     - {name: unique_values, query_expression: select distinct rto from $$}     - {name: null_values, query_expression: select count(*) from $$ where rto is null}   rules:     - included_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_RTO.rto
复制代码


基于 YAML 的动态标签配置


所前所述,当他们为数据源设置标记时,领域专家将为这些配置提供输入。更具体而言,他们首先选择要为数据源附加的模板。其次,他们会选择拟使用的标签类型,即,静态还是动态。接下来,他们会输入各字段的值,如果类型是静态的,还要输入级联设置,如果类型是动态的,要输入查询表达式和刷新设置。这些输入通过 UI 提供,因此,领域专家无需编写原始 YAML 文件。


一旦生成 YAML 文件,工具将基于规范在 Data Catalog 中解析配置并创建实际标签。工具还会根据刷新设置安排动态标签的重新计算。尽管需要领域专家进行初始输入,但实际的标记任务可完全自动化。我们推荐采用以下这种方法,这样,不仅能在发布时对新创建的数据源进行标记,而且无需人工操作即可对标签进行长期维护。

2. 处理派生数据

除了标记数据源,能够对派生数据进行大规模标记同样至关重要。我们将派生数据宽泛地定义为以从一个或多个数据源转换的方式创建的任何数据段。这种类型的数据与数据湖和数据仓库场景尤其具有相关性,在这类环境中,数据产品通常从各种数据源派生。


派生数据的标签应当由原数据源和应用于数据的转换类型组成。原数据源的 URI 被保持在标签中,并且在标签中还存储一个或多个转换类型 —— 例如,聚合、匿名化、归一化等。我们建议将标记创建逻辑融入生成派生数据的管道中。利用 Airflow DAGs 和 Beam,这是可行的。例如,如果一个数据管道连接两个数据源,聚合结果并将其存储到表中,您可以参考两个原数据源以及 aggregation:true,基于结果表创建标签。以下,您可以看到创建此标签的 Beam 管道的代码段:


  with beam.Pipeline(options=pipeline_options) as p:     sql = 'select covid_county, covid_state, sum_new_cases from            views.v_covid_new_cases'      bq_source = beam.io.BigQuerySource(query=sql, use_standard_sql=True)      covid_query_results = p | 'Read from BigQuery' >> beam.io.Read(bq_source)     subscription_name = 'projects/scohen-sandbox/subscriptions/employee-RTO'                           message = p | 'Read message' >> ReadFromPubSub(topic=None,                   subscription=subscription_name, timestamp_attribute=None)     emp_pcoll = message | 'Get Age' >> beam.ParDo(GetAge())                                                                                joined_emp_pcoll = emp_pcoll | 'Join Data' >> beam.ParDo(Join(),                         beam.pvalue.AsList(covid_query_results))     batch_joined_pcoll = joined_emp_pcoll | 'Batch Join' >>                          BatchElements(min_batch_size=10, max_batch_size=20)                                                                                                                                masked_dob_pcoll = batch_joined_pcoll | 'Mask DOB' >> beam.ParDo(MaskDOB())            batch_masked_pcoll = masked_dob_pcoll| 'Batch Mask' >>                           BatchElements(min_batch_size=10, max_batch_size=20)                                                                                                       bucket_age_pcoll = batch_masked_pcoll | 'Bucket Age' >>                         beam.ParDo(BucketAge())       batch_age_pcoll = bucket_age_pcoll | 'Batch Bucket Age' >>                        BatchElements(min_batch_size=4, max_batch_size=5)                                                                                                                     hash_id_pcoll = batch_age_pcoll | 'Hash Id' >> beam.ParDo(HashId())               hash_id_pcoll | 'Write Table' >> WriteToBigQuery(table, schema)     # Tag Employee_RTO table with Derived Data template      template = 'derived_template'       dc_client = datacatalog_v1.DataCatalogClient()     tag.template = dc_client.tag_template_path(project_id, region, template)      tag = datacatalog_v1.types.Tag()          table_resource = '//bigquery.googleapis.com/projects/' + project_id +                       '/datasets/' + dataset + '/tables/' + short_table_name      table_entry = dc_client.lookup_entry(linked_resource=table_resource)     tag.fields['parents'].string_value = 'pubsub/project/sandbox/subscriptions/employee-RTO,bigquery/project/sandbox/dataset/views/v_covid_new_cases'      tag.fields['aggregated_data'].bool_value = False     tag.fields['pseudo_anonymized_data'].bool_value = True     tag.fields['anonymized_data'].bool_value = False     tag.fields['origin_product'].enum_value.display_name = 'DATAFLOW'            long_ts = datetime.now(tz.gettz("America/Chicago")).isoformat()     ts = timestamp_value[0:19] + timestamp_value[26:32]      tag.fields['date_data_processed'].timestamp_value.FromJsonString(ts)       response = dc_client.create_tag(parent=table_entry.name, tag=tag)
复制代码


带标记逻辑的 Beam 管道


一旦利用其原数据源对派生数据进行标记,您可以使用此信息传播附加到原数据源的静态标签。这是 cascade 属性发挥作用的地方,指示哪些字段应当被传播到其派生数据。在以上所示的第一个代码段中显示了 cascade 属性的示例,其中 data_domain 和 data_confidentiality 字段被传播,而 data_retention 字段未被传播。这意味着,BigQuery 中派生的任何表将使用 dg_template 利用 data_domain:HR 和 data_confidentiality:CONFIDENTIAL 进行标记。

3. 处理更新

有几种场景需要针对标签和模板的更新能力。例如,如果业务分析师发现标签中的一个错误,需要对一个或多个值进行更正。如果要采用新的数据使用策略,可能需要为模板添加新的字段并对现有字段重命名或者删除。


我们为标记和模板更新提供配置,如下图所示。标记更新配置指定将变更的每个字段的当前值和新值。工具处理配置并基于规范更新标签中的字段的值。如果更新的标签是静态的,工具还会将变更传播至派生数据的相同标签。


模板更新配置制定变更的字段名、字段类型以及任何枚举值。工具通过首先确定变更的性质来处理更新。撰写本文时,Data Catalog 支持对模板添加和删除字段以及添加枚举值,但尚不支持字段重命名或者类型变更。因此,如果需要简单添加或者删除,工具会对现有模板进行修改。否则,必须重新创建整个模板以及所有从属标签。


  tag_config:  template:   - template_id: dg_template   - project_id: sandbox   - region: us-central1   fields:      - {name: data_confidentiality, current: SENSITIVE, new: SHARED_INTERNALLY, cascade: true}   - {name: data_retention, current: 30_DAYS, new: 60_DAYS, cascade: false}   rules:     - included_uri_patterns: bigquery/project/sandbox/dataset/covid/*   - excluded_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_input_*   - included_uri_patterns: pubsub/project/sandbox/subscriptions/employee-RTO

复制代码


基于 YAML 的标签更新配置


  template_config:  - template_id: dg_template   - project_id: sandbox   - region: us-central1   fields:     - {name: data_confidentiality, type: enum, values: {SENSITIVE, SHARED_INTERNALLY, SHARED_EXTERNALLY, PUBLIC, UNKNOWN}    - {name: data_retention, type: enum, values: {30_DAYS, 60_DAYS, 90_DAYS, 120_DAYS, 1_YEAR, 2_YEARS, 5_YEARS, UNKNOWN}

复制代码


基于 YAML 的模板更新配置


我们已经开始对这些方法进行原型创建,以发布一个开源工具,实现按照我们建议的使用模式在 Data Catalog 中创建和维护标签所涉及的许多任务的自动化。


2020-11-03 13:151173

评论 4 条评论

发布
用户头像
2021-02-27 11:21
回复
赞赞赞
2021-02-27 11:21
回复
赞赞赞赞赞赞
2021-02-27 11:22
回复
赞赞赞赞赞赞赞赞赞
2021-02-27 11:22
回复
没有更多了
发现更多内容

体育数据API接口:观看足球篮球比赛直播视频,获取即时比分数据

软件开发-梦幻运营部

探索未来云计算,华为云耀云服务器L实例引领行业新动力

平平无奇爱好科技

《三国杀》完成鸿蒙原生应用开发,更多游戏品类加入鸿蒙生态

最新动态

一种LED驱动专用控制电路方案

二哈侠

传输黑科技下的全景之旅—浅谈开源项目E3PO的思路与功能

计算机魔术师

文心一言 VS 讯飞星火 VS chatgpt (147)-- 算法导论12.2 2题

福大大架构师每日一题

福大大架构师每日一题

如何制作AI数字人短视频矩阵?

青否数字人

数字人 数字人短视频

SQL 算术运算符:加法、减法、乘法、除法和取模的用法

小万哥

MySQL 数据库 程序员 sql 后端开发

CentOS7如何使用fail2ban防范SSH暴力破解攻击?

百度搜索:蓝易云

Linux centos SSH 云服务器 Fail2ban

回顾2023,展望2024——小工程师的执着

工程师日月

#技术人的2023总结

关于VO/DTO/DO/PO价值的思考

姚秋实(Nacol)

Java 设计模式 架构设计 架构师

【云原生 | 最佳实践】一个实践驱动的云原生项目集—KubeWharf

计算机魔术师

字节跳动 云原生

AI数字人 vs 真人:成本与效率的对比

青否数字人

数字化时代的利器:华为云服务器L实例助力初创企业稳健成长

平平无奇爱好科技

Windows 11 的代理设置:启用和禁用

Geek_bf375d

爬虫 IP 代理IP 代理IP设置 HTTPS协议

使用极限网关助力 ES 集群无缝升级、迁移上/下云

极限实验室

console Gateway 数据迁移 极限网关 极限科技

CSS技巧:从高度0过渡到自动高度

南城FE

CSS 前端 动画

数字化转型新篇章:华为云耀云服务器L实例引领初创与成长型企业向前

平平无奇爱好科技

【高效视频处理】一窥火山引擎多媒体处理框架-BMF

计算机魔术师

Linux的代理设置

Geek_bf375d

爬虫 代理IP 代理IP设置 跨境电商 HTTPS协议

我在平台与 AIGC 的交互组件一些设计经验

软件工程师-罗小东

CentOS 7.8编译安装python 3.7教程。

百度搜索:蓝易云

Python Linux centos 运维 云服务器

INFINI Labs 产品更新 | 修复 Easysearch 跨集群复制索引同步问题,Gateway 内存异常增长等问题

极限实验室

Gateway 产品更新 easysearch 极限科技

Java互联网+医院智能导诊系统源码 自动兼容H5小程序、Uniapp

源码星辰

Java 源码 智慧导诊 智能导诊

AI数字人SaaS系统源码独立部署到底需要多少钱?

青否数字人

淘宝API接口与用户体验分析

联讯数据

解锁中小企业上云智选,华为云这款服务器你值得拥有

平平无奇爱好科技

Android 上的代理设置

Geek_bf375d

爬虫 代理IP 代理IP设置 跨境支付 HTTPS协议

如何设置和使用 Proxifier教程

Geek_bf375d

爬虫 https IP 代理IP 代理IP设置

Spring Boot 外部化配置的应用

玄兴梦影

数字化浪潮下云计算如何服务?华为云这款服务器用实力说话

平平无奇爱好科技

实现标记自动化:将海量元数据引入 Data Catalog_文化 & 方法_InfoQ精选文章