Snowflake 的云原生架构提供了强大而灵活的数据导入导出机制。无论是执行初始数据迁移、建立持续数据管道,还是导出处理结果,掌握 Snowflake 的数据加载与卸载功能对有效管理数据平台至关重要。本文将深入探讨 Snowflake 数据移动的核心组件与最佳实践方案。
支持的文件格式
Snowflake 的卓越适应性体现在其对多种文件格式的支持上,每种格式都针对不同的使用场景和数据结构进行了优化。
CSV(逗号分隔值)
CSV 文件是数据交换的主力军——具有人类可读性、通用兼容性等特点,特别适用于结构化表格数据。虽然该格式缺乏压缩能力和模式强制功能,但其简洁性使其成为快速数据传输及与遗留系统集成的理想选择。
JSON(JavaScript 对象表示法)
JSON 在呈现层次化嵌套数据结构方面表现卓越。在 Snowflake 中,JSON 数据可直接加载至 VARIANT 列,既完整保留原始结构,又能通过 SQL 查询无缝遍历嵌套元素。
Parquet
Apache Parquet 提供列式存储架构,兼具内置压缩和模式保留特性。该格式能显著降低存储成本并提升查询性能,特别适用于需要跨数百万行扫描特定列的分析工作场景。
Avro
Avro 结合了行式存储与强大的模式演进能力,该格式特别适用于流处理场景,以及需要在不断裂现有数据管道的前提下实现数据结构随时间演变的场景。
ORC(优化行列式)
与 Parquet 类似,ORC 采用高效的列式存储结构并具备强压缩特性。它常见于 Hadoop 生态系统,能够为大规模分析查询提供卓越性能。
多格式支持的价值:不同团队和系统会生成不同格式的数据。支持多格式可消除中间转换步骤、降低延迟,使企业能够以数据原生格式进行处理,在保持数据真实性的同时降低系统复杂度。
Snowflake 中的 Stage
Stage 在 Snowflake 中充当数据文件的着陆区,为外部存储与 Snowflake 表之间建立连接桥梁。理解不同类型的 stage 对于设计高效的数据管道至关重要。
用户 Stage
每个 Snowflake 用户都拥有一个以前缀 @~表示的个人 stage 区域。用户 Stage 特别适合临时性分析和个人数据上传场景:
-- 将文件上传至用户StagePUT file:///local/path/sales_data.csv @~/my_data/; -- 查看用户Stage中的文件列表LIST @~;
复制代码
表 Stages
Snowflake 中的每个表都会自动拥有一个关联的 stage,可通过 @%table_name 语法访问。这些 stage 非常适合直接将数据加载到特定表中,而无需创建独立的暂存区域:
-- 将数据上传至表stagePUT file:///local/path/customer_data.parquet @%customers; -- 从表stage将数据加载到表中COPY INTO customersFROM @%customersFILE_FORMAT = (TYPE = PARQUET);
复制代码
内部 stage
内部 stage 在 Snowflake 内部提供共享的托管存储空间,非常适合团队协作和标准化 ETL 流程:
-- 创建内部stageCREATE STAGE my_internal_stage FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = '|' SKIP_HEADER = 1); -- 将文件上传至stagePUT file:///data/batch_*.csv @my_internal_stage;
复制代码
外部 Stages
外部 Stages 可连接到云存储服务(如 AWS S3、Azure Blob Storage、Google Cloud Storage),无需将文件复制到 Snowflake 中即可直接访问数据湖。
-- 创建指向S3的外部stageCREATE STAGE my_s3_stage URL = 's3://mybucket/data/' CREDENTIALS = (AWS_KEY_ID = 'xxx' AWS_SECRET_KEY = 'yyy') FILE_FORMAT = (TYPE = JSON); -- 直接查询外部存储中的文件SELECT $1:customer_id::STRING as customer_id, $1:order_total::NUMBER as order_totalFROM @my_s3_stage/orders/;
复制代码
COPY INTO 命令:批量加载的基石
COPY INTO 命令是 Snowflake 实现高效批量数据加载的核心工具,具有原子性、并行化加载和自动优化特性。
基础语法与示例
-- 创建目标表CREATE TABLE sales_transactions ( transaction_id NUMBER, product_id VARCHAR, quantity NUMBER, sale_date DATE, amount DECIMAL(10,2)); -- 基础COPY命令COPY INTO sales_transactionsFROM @my_stage/sales/2024/FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1 DATE_FORMAT = 'YYYY-MM-DD')ON_ERROR = CONTINUE; -- 带模式匹配的stage加载COPY INTO sales_transactionsFROM @my_stagePATTERN = '.*sales_2024.*[.]csv'FILE_FORMAT = (TYPE = CSV); -- 带转换的stage加载COPY INTO sales_transactionsFROM ( SELECT $1::NUMBER, $2::VARCHAR, $3::NUMBER, $4::DATE, $5::DECIMAL(10,2) * 1.1 -- Apply 10% markup FROM @my_stage/raw_sales/)FILE_FORMAT = (TYPE = CSV);
复制代码
COPY INTO 命令能够自动追踪已加载文件,防止重复加载并确保幂等性——这对构建可靠的数据管道至关重要。
基于 Snowpipe 的持续数据接入方案
Snowpipe 将批量加载转换为近实时数据流,当新文件到达您的 stages 时即可实现自动加载。
Snowpipe 的运行机制
Snowpipe 通过接收云存储事件通知触发自动化微批处理。当 S3 存储桶或 Azure 容器中出现新文件时,Snowpipe 会立即将其加入加载队列,通常可在数分钟内完成数据处理。
实施方案
场景:某电商平台需要持续从网络服务器接入点击流事件数据。
-- 创建目标表CREATE TABLE clickstream_events ( event_timestamp TIMESTAMP, user_id VARCHAR, session_id VARCHAR, page_url VARCHAR, event_type VARCHAR, raw_data VARIANT); -- 创建用于持续数据摄取的pipeCREATE PIPE clickstream_pipeAUTO_INGEST = TRUEASCOPY INTO clickstream_eventsFROM @my_s3_stage/clickstream/FILE_FORMAT = (TYPE = JSON)ON_ERROR = SKIP_FILE; -- 检查pipe状态SELECT SYSTEM$PIPE_STATUS('clickstream_pipe'); -- 监控近期加载任务SELECT *FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY( TABLE_NAME => 'clickstream_events', START_TIME => DATEADD(hours, -24, CURRENT_TIMESTAMP())));
复制代码
Snowpipe 在需要低延迟数据可用性的场景中表现卓越:实时仪表盘、欺诈检测系统以及运营监控等场景中,等待小时级批量加载是不可接受的。
数据卸载
将数据移出 Snowflake 与加载数据同等重要。COPY INTO location 语法支持高效的数据导出功能。
Exporting to Files 导出至文件
-- 导出为带标题行的CSV格式COPY INTO @my_stage/exports/customers_FROM ( SELECT customer_id, customer_name, total_purchases, last_order_date FROM customer_summary WHERE total_purchases > 1000)FILE_FORMAT = (TYPE = CSV COMPRESSION = GZIP FIELD_DELIMITER = '|')HEADER = TRUESINGLE = FALSEMAX_FILE_SIZE = 104857600; -- 100MB per file -- 导出为Parquet格式以供下游分析使用COPY INTO @external_stage/analytics/customer_segments.parquetFROM customer_segmentsFILE_FORMAT = (TYPE = PARQUET COMPRESSION = SNAPPY)OVERWRITE = TRUE; -- 导出为JSON格式用于API接口传输COPY INTO @my_stage/api_data/products.jsonFROM ( SELECT OBJECT_CONSTRUCT( 'product_id', product_id, 'name', product_name, 'category', category, 'price', price, 'inventory', current_inventory ) as product_data FROM products WHERE active = TRUE)FILE_FORMAT = (TYPE = JSON COMPRESSION = NONE)SINGLE = TRUE;
复制代码
支持的导出格式
Snowflake 支持导出为 CSV、JSON 和 Parquet 格式。每种格式均可使用 GZIP、BZIP2、BROTLI、ZSTD 或其他压缩算法进行压缩,从而在下游系统的处理需求与文件大小之间取得平衡。
使用 VARIANT 处理半结构化数据
VARIANT 数据类型是 Snowflake 针对半结构化数据的秘密武器,可原生存储 JSON、Avro、ORC 和 Parquet 数据,同时保持查询性能。
VARIANT 列操作指南
-- 创建包含VARIANT列的数据表CREATE TABLE iot_sensor_data ( sensor_id VARCHAR, timestamp TIMESTAMP, readings VARIANT); -- 直接加载JSON数据COPY INTO iot_sensor_dataFROM @sensor_stage/FILE_FORMAT = (TYPE = JSON); -- 查询嵌套JSON结构SELECT sensor_id, timestamp, readings:temperature::FLOAT as temp_celsius, readings:humidity::FLOAT as humidity_pct, readings:location.latitude::FLOAT as lat, readings:location.longitude::FLOAT as lng, readings:alerts[0].severity::STRING as primary_alertFROM iot_sensor_dataWHERE readings:temperature::FLOAT > 30 AND timestamp >= DATEADD(hour, -1, CURRENT_TIMESTAMP()); -- 展开嵌套数组SELECT sensor_id, timestamp, f.value:alert_type::STRING as alert_type, f.value:severity::STRING as severity, f.value:message::STRING as messageFROM iot_sensor_data, LATERAL FLATTEN(input => readings:alerts) fWHERE f.value:severity::STRING = 'CRITICAL'; -- 创建视图简化查询操作CREATE VIEW sensor_metrics ASSELECT sensor_id, timestamp, readings:temperature::FLOAT as temperature, readings:humidity::FLOAT as humidity, readings:pressure::FLOAT as pressure, ARRAY_SIZE(readings:alerts) as alert_countFROM iot_sensor_data;
复制代码
VARIANT 列通过列式压缩与剪枝自动优化存储与查询性能,使得半结构化数据能够达到与传统结构化数据同等的查询效率。
结论
Snowflake 的数据加载与卸载能力共同构成了一个完整的数据移动生态系统。从支持多种文件格式的灵活性,到各类 stage 的便捷性;从 COPY INTO 批量加载的高效性,到 Snowpipe 实时处理的即时性;再从 VARIANT 列的优雅设计到高效的数据导出功能,每个组件都在现代数据架构中扮演着关键角色。
成功的关键在于根据具体场景选择合适工具:
● 使用 internal stages 实现受管控的协同数据共享;
● 通过 external stages 直连数据湖存储;
● 采用 Snowpipe 满足流式与近实时需求;
● 运用 VARIANT 列消除半结构化数据的 ETL 复杂度;
● 利用 COPY INTO 转换功能在加载过程中清洗和增强数据。
通过熟练掌握这些能力,数据团队可以构建适应业务需求变化、同时保持简洁性与高性能的稳健可扩展管道。无论是迁移 TB 级历史数据,还是处理实时事件流,Snowflake 的加载与卸载功能都为现代数据平台的成功奠定了坚实基础。
原文地址:https://www.linkedin.com/in/jaiswalharshal/
点击链接立即报名注册:Ascent - Snowflake Platform Training - China
评论