【AICon】探索八个行业创新案例,教你在教育、金融、医疗、法律等领域实践大模型技术! >>> 了解详情
写点什么

FlinkSQL 数据权限之数据脱敏解决方案

  • 2023-05-19
    北京
  • 本文字数:3640 字

    阅读完需:约 12 分钟

FlinkSQL数据权限之数据脱敏解决方案

在当今数字化时代,数据已经成为企业和组织中不可或缺的重要资产,包括个人信息、商业机密、财务数据等等。然而,随着数据泄露和安全问题的不断增加,数据脱敏已经成为了一项非常重要的工作。随着以 Flink 为代表的实时数仓的兴起,企业对实时数据安全的需求越来越迫切。但由于 Flink 实时数仓领域发展相对较短,Apache Ranger 尚不支持 FlinkSQL,且依赖 Ranger 会导致系统的部署和运维愈加复杂。


因此,自研出 FlinkSQL 的数据脱敏方案,支持面向用户级别的数据脱敏访问控制,即特定用户只能访问到脱敏后的数据。在技术实现上做到对 Flink 和 Calcite 源码的零侵入,可以快速集成到已有实时平台产品中。

一、基础知识

1.1 数据脱敏

数据脱敏(Data Masking)是一种数据安全技术,用于保护敏感数据,以防止未经授权的访问。该技术通过将敏感数据替换为虚假数据或不可识别的数据来实现。例如可以使用数据脱敏技术将信用卡号码、社会安全号码等敏感信息替换为随机生成的数字或字母,以保护这些信息的隐私和安全。

1.2 业务流程

下面用订单表orders的两行数据来举例,示例数据如下:



1.2.1 设置脱敏策略

管理员配置用户、表、字段、脱敏条件,例如下面的配置。



1.2.2 用户访问数据

当用户在 Flink 上查询orders表的数据时,会在底层结合该用户的脱敏条件重新生成 SQL,即让数据脱敏生效。当用户 A 和用户 B 在执行下面相同的 SQL 时,会看到不同的结果数据。


SELECT * FROM orders
复制代码


用户 A 查看到的结果数据如下customer_name字段的数据被全部掩盖掉。



用户 B 查看到的结果数据如下customer_name字段的数据只显示前 4 位,剩下的用 x 代替。



二、Hive 数据脱敏解决方案


在离线数仓工具 Hive 领域,由于发展多年已有 Ranger Column Masking 方案来支持字段数据的脱敏控制,详见参考文献[1]。下图是在 Ranger 里配置 Hive 表数据脱敏条件的页面,供参考。



但由于 Flink 实时数仓领域发展相对较短,Ranger 还不支持 FlinkSQL,以及依赖 Ranger 的话会导致系统部署和运维过重,因此开始自研实时数仓的数据脱敏解决工具。当然本文中的核心思想也适用于 Ranger 中,可以基于此较快开发出 ranger-flink 插件。

三、FlinkSQL 数据脱敏解决方案

3.1 解决方案

3.1.1 FlinkSQL 执行流程

可以参考作者文章[FlinkSQL字段血缘解决方案及源码],本文根据 Flink1.16 修正和简化后的执行流程如下图所示。



CalciteParser进行parse()validate()处理后会得到一个 SqlNode 类型的抽象语法树(Abstract Syntax Tree,简称 AST),本文会针对此抽象语法树来组装行级过滤条件后生成新的 AST,以实现数据脱敏控制。

3.1.2 Calcite 对象继承关系


下面章节要用到 Calcite 中的 SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall 和 SqlSelect 等类,此处进行简单介绍以及展示它们间继承关系,以便读者阅读本文源码。


序号介绍
1SqlNodeA SqlNode is a SQL parse tree.
2SqlCallA SqlCall is a call to an SqlOperator operator.
3SqlIdentifierA SqlIdentifier is an identifier, possibly compound.
4SqlJoinParse tree node representing a JOIN clause.
5SqlBasicCallImplementation of SqlCall that keeps its operands in an array.
6SqlSelectA SqlSelect is a node of a parse tree which represents a select statement, the parent class is SqlCall



3.1.3 解决思路


针对输入的 Flink SQL,在CalciteParser进行语法解析(parse)和语法校验(validate)后生成抽象语法树(Abstract Syntax Tree,简称 AST)后,采用自定义Calcite SqlBasicVisitor的方法遍历 AST 中的所有SqlSelect,获取到里面的每个输入表。如果输入表中字段有配置脱敏条件,则针对输入表生成子查询语句,并把脱敏字段改写成CAST(脱敏函数(字段名) AS 字段类型) AS 字段名,再通过CalciteParser.parseExpression()把子查询转换成 SqlSelect,并用此 SqlSelect 替换原 AST 中的输入表来生成新的 AST,最后得到新的 SQL 来继续执行。



3.2 详细方案

3.2.1 解析输入表

通过对 Flink SQL 语法的分析和研究,最终出现输入表的只包含以下两种情况:


  1. SELECT 语句的 FROM 子句,如果是子查询,则递归继续遍历。

  2. SELECT ... JOIN 语句的 Left 和 Right 子句,如果是多表 JOIN,则递归查询遍历。


因此,下面的主要步骤会根据 FROM 子句的类型来寻找输入表。

3.2.2 主要步骤

主要通过 Calcite 提供的访问者模式自定义 DataMaskVisitor 来实现,遍历 AST 中所有的 SqlSelect 对象用子查询替换里面的输入表。下面详细描述替换输入表的步骤,整体流程如下图所示。



  1. 遍历 AST 中的 SELECT 语句。

  2. 判断是否自定义的 SELECT 语句(由下面步骤 9 生成),是则跳转到步骤 10,否则继续步骤 3。

  3. 判断 SELECT 语句中的 FROM 类型,按照不同类型对应执行下面的步骤 4、5 和 10。

  4. 如果 FROM 是 SqlJoin 类型,则分别遍历其左 Left 和 Right 右节点,即执行当前步骤 4 和步骤 6。由于可能是三张表及以上的 Join,因此进行递归处理,即针对其左节点跳回到步骤 3。

  5. 如果 FROM 是 SqlBasicCall 类型,还需要判断是否来自子查询,是则跳转到步骤 10 继续遍历 AST,后续步骤 1 会对子查询中的 SELECT 语句进行处理。否则跳转到步骤 7。

  6. 递归处理 Join 的右节点,即跳回到步骤 3。

  7. 遍历表中的每个字段,如果某个字段有定义脱敏条件,则把改字段改写成格式CAST(脱敏函数(字段名) AS 字段类型) AS 字段名,否则用原字段名。

  8. 针对步骤 7 处理后的字段,构建子查询语句,形如 (SELECT 字段名1, 字段名2, CAST(脱敏函数(字段名3) AS 字段类型) AS 字段名3、字段名4 FROM 表名) AS 表别名

  9. 对步骤 8 的子查询调用CalciteParser.parseExpression()进行解析,生成自定义的 SELECT 语句,并替换掉原 FROM。

  10. 继续遍历 AST,找到里面的 SELECT 语句进行处理,跳回到步骤 1。

3.2.3 Hive 及 Ranger 兼容性

在 Ranger 中,默认的脱敏策略的如下所示。通过调研发现 Ranger 的大部分脱敏策略是通过调用 Hive 自带或自定义的系统函数实现的。


序号策略名策略说明Hive系统函数
1Redact用x屏蔽字母字符,用n屏蔽数字字符mask
2Partial mask: show last 4仅显示最后四个字符,其他用x代替mask_show_last_n
3Partial mask: show first 4仅显示前四个字符,其他用x代替mask_show_first_n
4Hash用值的哈希值替换原值mask_hash
5Nullify用NULL值替换原值Ranger自身实现
6Unmasked原样显示Ranger自身实现
7Date: show only year仅显示日期字符串的年份mask
8CustomHive UDF来自定义策略


由于 Flink 支持 Hive Catalog,在 Flink 能调用 Hive 系统函数。 因此,本方案也支持在 Flink SQL 配置 Ranger 的脱敏策略。

四、用例测试

源码地址:https://github.com/HamaWhiteGG/flink-sql-security


注: 如果用 IntelliJ IDEA 打开源码,请提前安装 Manifold 插件。


用例测试数据来自于 CDC Connectors for Apache Flink 官网,本文给orders表增加一个 region 字段,再增加'connector'='print'类型的 print_sink 表,其字段和orders表的一样,数据库建表及初始化 SQL 位于 data/database 目录下。


下载本文源码后,可通过 Maven 运行单元测试,测试用例中的 catalog 名称是hive,database 名称是default


$ cd flink-sql-security$ mvn test
复制代码


详细测试用例可查看源码中的单测RewriteDataMaskTestExecuteDataMaskTest,下面只描述两个案例。

4.1 测试 SELECT

4.1.1 输入 SQL

用户 A 执行下述 SQL:


SELECT order_id, customer_name, product_id, region FROM orders
复制代码

4.1.2 根据脱敏条件重新生成 SQL

  1. 输入 SQL 是一个简单 SELECT 语句,经过语法分析和语法校验后 FROM 类型是SqlBasicCall,SQL 中的表名orders会被替换为完整的hive.default.orders,别名是orders

  2. 由于用户 A 针对字段customer_name定义脱敏条件 MASK(对应函数是脱敏函数是mask),该字段在流程图中的步骤 8 中被改写为CAST(mask(customer_name) AS STRING) AS customer_name,其余字段未定义脱敏条件则保持不变。

  3. 然后在步骤 8 的操作中,表名hive.default.orders被改写成如下子查询,子查询两侧用括号()进行包裹,并且用 AS 别名来增加表别名。


(SELECT     order_id,     order_date,     CAST(mask(customer_name) AS STRING) AS customer_name,     product_id,     price,     order_status,     regionFROM     hive.default.orders ) AS orders
复制代码

4.1.3 输出 SQL 和运行结果


最终执行的改写后 SQL 如下所示,这样用户 A 查询到的顾客姓名customer_name字段都是掩盖后的数据。


SELECT    orders.order_id,    orders.customer_name,    orders.product_id,    orders.regionFROM (    SELECT          order_id,         order_date,         CAST(mask(customer_name) AS STRING) AS customer_name,         product_id,         price,         order_status,         region    FROM          hive.default.orders      ) AS orders
复制代码

4.2 测试 INSERT-SELECT

4.2.1 输入 SQL


用户 A 执行下述 SQL:


INSERT INTO print_sink SELECT * FROM orders
复制代码

4.2.2 根据脱敏条件重新生成 SQL

通过自定义 Calcite DataMaskVisitor 访问生成的 AST,能找到对应的 SELECT 语句SELECT * FROM orders,注意在语法校验阶段 * 会被改写成表中所有字段。针对此 SELECT 语句的改写逻辑同上,不再阐述。

4.2.3 输出 SQL 和运行结果

最终执行的改写后 SQL 如下所示,注意插入到print_sink表的customer_name字段是掩盖后的数据。


INSERT INTO print_sink (    SELECT         orders.order_id,        orders.order_date,        orders.customer_name,        orders.product_id,        orders.price,        orders.order_status,        orders.region    FROM (        SELECT             order_id,             order_date,             CAST(mask(customer_name) AS STRING) AS customer_name,             product_id,             price,             order_status,             region         FROM             hive.default.orders    ) AS orders)
复制代码

五、参考文献

  1. Apache Ranger Column Masking in Hive

  2. FlinkSQL字段血缘解决方案及源码

  3. 从SQL语句中解析出源表和结果表

  4. 基于Flink CDC构建MySQL和Postgres的Streaming ETL

  5. HiveQL—数据脱敏函数

2023-05-19 15:086615

评论

发布
暂无评论
发现更多内容

基于模板配置的数据可视化平台

百度Geek说

InfoQ 极客传媒 15 周年庆征文|在Flutter中自定义应用程序内键盘

坚果

InfoQ极客传媒15周年庆

工资管理系统该如何使用?

低代码小观

企业管理 工资 管理系统

SphereEx 正式开源面向 Database Mesh 的解决方案 Pisanix

SphereEx

开源 SphereEx 云上数据库 Database Mesh Pisanix

从 0 到 1,探究百亿流量验证下的 MVVM 框架设计

图灵教育

百度 MVVM 全栈设计

八连冠!浪潮云连续8年蝉联中国政务云市场第一位

浪潮云

云计算

后端适用,Apifox接口文档设计和调试教程【工具篇】

Liam

Java 后端 Postman 后端开发 API文档

数据安全刻不容缓,国产智能化厂商首获SOC 2鉴证报告有何意义?

王吉伟频道

RPA 数据安全 机器人流程自动化 智能自动化 SOC 2

5年“研究”3年“实战” 之后的满分答卷

青藤云安全

网络安全 容器安全 安全服务 云原生安全

Springcloud Oauth2 HA篇

Damon

微服务架构 安全架构 6月月更

细说腾讯如何做到直播延时降低90%以上方案

C++后台开发

WebRTC CDN 音视频开发 视频直播 直播低延迟

大数据培训Flink高频面试题

@零度

flink 大数据开发

ARM64 上的性能怪兽:API 网关 Apache APISIX 在 AWS Graviton3 上的安装和性能测试

API7.ai 技术团队

AWS 网关 arm APISIX

中原银行统一日志平台

中原银行

海量数据 中原银行 日志平台

Ubuntu20.04设置静态IP

echeverra

Linux 静态IP

网络安全等级测评和商用密码应用安全性评估是一回事吗?

行云管家

网络安全 等级保护 商用密码

defi存币生息理财dapp系统开发逻辑

开发微hkkf5566

直播预告 | 社交新纪元,共探元宇宙社交新体验

ZEGO即构

为什么 SQL 语句使用了索引,但却还是慢查询?

okokabcd

MySQL

各厂商的数据湖解决方案

五分钟学大数据

数据湖 6月月更

企业数字化转型该如何做?三个融合、三个转换

小炮

618 大促来袭,浅谈如何做好大促备战

阿里巴巴云原生

阿里云 微服务 高可用 云原生

陕西西安等保测评单位有哪些?在哪里可以查到?

行云管家

西安 等保测评 等保测评机构

分布式数据对象:超级终端的"全局变量"

OpenHarmony开发者

OpenHarmony

低代码分析盘点:银行业低代码应用需要规避两大误区

易观分析

代码 银行

资深OpenStacker - 彭博、Vexxhost升级为OpenInfra基金会黄金成员

Geek_2d6073

flutter系列之:Material主题的基础-MaterialApp

程序那些事

flutter 程序那些事 6月月更 widget

构建基于React18的电子表格程序

葡萄城技术团队

React 表格 纯前端表格技术

技术干货 | Linkis实践:新引擎实现流程解析

康月牙

Apache 开源社区 WeDataSphere Linkis 使用实践

技术干货 | Linkis1.0.2安装及使用指南

康月牙

开源社区 微众银行 WeDataSphere Linkis 使用实践

数据产品学习-实时计算平台

第519区

实时计算 数据产品 数据开发 大数据平台

FlinkSQL数据权限之数据脱敏解决方案_大数据_HamaWhite_InfoQ精选文章