写点什么

探索 Snowflake 与 Postgres 之间的双向数据流动模式 | 技术实践

  • 2026-06-23
    北京
  • 本文字数:8032 字

    阅读完需:约 26 分钟

2026 年,智能体将在企业级应用中取得哪些实质性突破?点击下载《2026 年 AI 与数据发展预测》白皮书,获悉专家一手前瞻,抢先拥抱新的工作方式!

现代数据架构越来越要求事务型与分析型工作负载能够无缝共存。PostgreSQL 仍然是事务型应用的核心基础设施——支撑电商订单处理、实时库存系统以及面向客户的 API——而 Snowflake 则是所有分析型数据与 AI 的基础平台。挑战在于:如何让数据在这两个系统之间双向可靠流动,同时将延迟与运维开销降到最低。

历史上,要打通 OLTP 与 OLAP 系统,需要拼接外部 ETL 工具、管理云存储桶、配置 IAM 角色,并维护脆弱的 CDC 数据管道。团队花在基础设施“打通”上的时间,往往超过从数据中产生价值的时间。

本文的目标是探索如何利用 Snowflake 的一些最新创新能力,在 Postgres 与 Snowflake 之间支持五种关键的数据流动模式。如下所示:

有哪些变化?

近期 Snowflake 的一些产品能力显著简化了这一问题:

  • Snowflake Postgres(PuPr):一种完全托管的 PostgreSQL 服务,原生运行在 Snowflake 生态系统中。它消除了外部 PostgreSQL 托管的需求,同时与 Snowflake 数据平台实现一流集成;

  • pg_lake(PuPr):PostgreSQL 的扩展能力,允许在 Postgres 内直接创建 Apache Iceberg 表。在 Postgres 中写入的数据,可以通过共享 Iceberg 元数据被 Snowflake 直接查询——无需文件导出、无需中转存储、无需 ETL 管道;

  • pg_incremental(PuPr):用于调度式增量同步的扩展组件。与 pg_lake 结合使用时,可以实现轻量级 CDC,只同步发生变化的数据行;

  • Snowflake 托管 Iceberg 存储(PuPr):Postgres 管理的 Iceberg 表使用 Snowflake 内部存储,并通过托管凭证访问。无需外部 S3、无需 IAM、无需存储集成配置;

  • Openflow(正式发布):Snowflake 的托管数据集成平台(基于 Apache NiFi 构建),提供预置的 CDC 连接能力,包括基于 PostgreSQL WAL 的变更捕获,以及通过 Snowpipe Streaming 进行数据传输。

这些能力共同构建了一个完整的数据流动能力体系——从简单的批量加载,到实时 CDC——全部在一个统一平台内完成。

本文中的所有模式都使用 ORDERS 表作为示例数据源。该表模拟典型电商订单生命周期,约 28,000 行数据。本例中数据来源于 SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.Orders。

CREATE TABLE cdc_demo.orders (    order_id        BIGINT PRIMARY KEY,    customer_id     BIGINT,    order_status    VARCHAR(1),    total_price     DECIMAL(15,2),    order_date      DATE,    order_priority  VARCHAR(15),    clerk           VARCHAR(15),    ship_priority   INTEGER,    comment         VARCHAR(79),    created_at      TIMESTAMP DEFAULT NOW(),    updated_at      TIMESTAMP DEFAULT NOW());
复制代码

模式 1:批量数据流动 —— Postgres 到 Snowflake

业务场景:某零售公司在 PostgreSQL 电商系统中全天处理订单。每天夜间,分析团队需要在 Snowflake 中获取所有订单的完整快照,用于报表分析、需求预测以及财务对账;

技术方案:在 Postgres 中创建 Iceberg 表。“USING Iceberg”子句允许 pg_lake 在 Postgres 内创建并写入 Iceberg 表。然后通过 INSERT/SELECT 将订单数据写入该表。

在 Snowflake 侧,创建目录集成(Catalog Integration)后再创建 Iceberg 表。这些操作属于元数据层操作,不涉及实际数据搬运。当 Iceberg 表创建完成后,即可在 Snowflake 中直接查询。

步骤 1:在 Postgres 中启用 pg_lake

-- Connect to Snowflake Postgres instanceCREATE EXTENSION IF NOT EXISTS pg_lake CASCADE;
复制代码

步骤 2:创建 Iceberg 表并批量加载数据

-- Create an Iceberg table and bulk load all orders into itCREATE TABLE cdc_demo.orders_iceberg (    order_id        BIGINT,    customer_id     BIGINT,    order_status    VARCHAR(1),    total_price     DECIMAL(15,2),    order_date      DATE,    order_priority  VARCHAR(15),    clerk           VARCHAR(15),    ship_priority   INTEGER,    comment         VARCHAR(79),    created_at      TIMESTAMP,    updated_at      TIMESTAMP) USING iceberg;-- Bulk load from the source tableINSERT INTO cdc_demo.orders_icebergSELECT * FROM cdc_demo.orders;
复制代码

步骤 3:在 Snowflake 中创建目录集成

-- In Snowflake: create a catalog integration pointing to the Postgres instanceCREATE OR REPLACE CATALOG INTEGRATION pg_orders_catalog  CATALOG_SOURCE = SNOWFLAKE_POSTGRES  TABLE_FORMAT = ICEBERG  CATALOG_NAMESPACE = 'cdc_demo'  REST_CONFIG = (    POSTGRES_INSTANCE = 'Snowflake_Postgres_Demo'    CATALOG_NAME = 'postgres'    ACCESS_DELEGATION_MODE = VENDED_CREDENTIALS  )  ENABLED = TRUE;
复制代码

步骤 4:在 Snowflake 创建 Iceberg 表

-- Create the Snowflake Iceberg table referencing the Postgres-managed Iceberg dataCREATE OR REPLACE ICEBERG TABLE orders_iceberg  CATALOG = 'pg_orders_catalog'  CATALOG_TABLE_NAME = 'orders_iceberg'  CATALOG_NAMESPACE = 'cdc_demo'  AUTO_REFRESH = TRUE;
复制代码

步骤 5:查询验证数据

SELECT COUNT(*) FROM orders_iceberg;SELECT order_status, COUNT(*), SUM(total_price) AS total_revenueFROM orders_icebergGROUP BY order_status;
复制代码

模式 2:批量数据流动 —— Snowflake 到 Postgres

业务场景:数据科学团队在 Snowflake 中构建订单优先级预测模型,结果需要回写到 Postgres,使业务系统能够实时展示预测结果;

技术方案:在 Snowflake 中生成结果表后,将数据写入 stage(Parquet 文件)。Postgres 从 stage 拉取数据并写入本地表。

步骤 1:创建存储集成

-- In Snowflake: create a storage integration for the Postgres managed storageCREATE OR REPLACE STORAGE INTEGRATION pg_stage_integration  TYPE = POSTGRES_INTERNAL_STORAGE  POSTGRES_INSTANCE = 'Snowflake_Postgres_Demo';-- Create a stage using this integrationCREATE OR REPLACE STAGE pg_orders_stage  RELATIVE_URL = '/orders_export'  STORAGE_INTEGRATION = pg_stage_integration;
复制代码

步骤 2:导出数据到 stage

COPY INTO @pg_orders_stage/orders_bulk_FROM (    SELECT        order_id,        customer_id,        order_status,        total_price,        order_date,        order_priority,        clerk,        ship_priority,        comment    FROM orders_iceberg)FILE_FORMAT = (TYPE = PARQUET)HEADER = TRUEOVERWRITE = TRUE;
复制代码

步骤 3:Postgres 读取数据

-- On Postgres: create the destination tableCREATE TABLE cdc_demo.orders_from_snowflake (    order_id        BIGINT PRIMARY KEY,    customer_id     BIGINT,    order_status    VARCHAR(1),    total_price     DECIMAL(15,2),    order_date      DATE,    order_priority  VARCHAR(15),    clerk           VARCHAR(15),    ship_priority   INTEGER,    comment         VARCHAR(79));-- Load the Parquet files from the stage into the Postgres tableCOPY cdc_demo.orders_from_snowflakeFROM '@STAGE/orders_export/orders_bulk_*.parquet';
复制代码

步骤 4:查询验证

SELECT COUNT(*) FROM cdc_demo.orders_from_snowflake;-- Returns: 28,373SELECT order_status, COUNT(*)FROM cdc_demo.orders_from_snowflakeGROUP BY order_status;
复制代码

模式 3:CDC —— 从 Postgres 到 Snowflake

业务场景:一个电商应用持续处理新订单、更新订单状态(已发货、已送达、已退货)以及取消订单。分析团队需要在几分钟内将这些变化同步到 Snowflake,而不是等待数小时,以支持展示订单履约指标与营收追踪的实时仪表盘;

技术方案:这代表典型的 OLTP → OLAP 模式,即在源数据库中检测数据变更,并将其传递到分析系统。在该模式中,Postgres 源表上的插入与更新操作会被捕获。系统每分钟检测一次变更,并将其写入 Iceberg 表。在 Snowflake 侧,会创建 catalog integration 和 Iceberg 表,并通过每分钟一次的 REFRESH 进行轮询,从而使最新变更可以被查询访问。

步骤 1:在 Postgres 上启用 pg_incremental 和 pg_cron

CREATE EXTENSION IF NOT EXISTS pg_cron;CREATE EXTENSION IF NOT EXISTS pg_incremental CASCADE;
复制代码

步骤 2:创建 Iceberg 目标表并执行初始批量加载

-- Create an Iceberg table for CDC dataCREATE TABLE cdc_demo.orders_iceberg_cdc (    order_id        BIGINT,    customer_id     BIGINT,    order_status    VARCHAR(1),    total_price     DECIMAL(15,2),    order_date      DATE,    order_priority  VARCHAR(15),    clerk           VARCHAR(15),    ship_priority   INTEGER,    comment         VARCHAR(79),    updated_at      TIMESTAMP) USING iceberg;-- Initial bulk load of current dataINSERT INTO cdc_demo.orders_iceberg_cdcSELECT order_id, customer_id, order_status, total_price, order_date,       order_priority, clerk, ship_priority, comment, updated_atFROM cdc_demo.orders;
复制代码

步骤 3:创建时间间隔 pipeline,用于检测并同步变更

-- Create a pg_incremental pipeline that runs every minute-- Uses the updated_at column to detect rows modified within each time intervalSELECT incremental.create_time_interval_pipeline(    'sync_orders_cdc',    '1 minute'::interval,    'INSERT INTO cdc_demo.orders_iceberg_cdc     SELECT order_id, customer_id, order_status, total_price, order_date,            order_priority, clerk, ship_priority, comment, updated_at     FROM cdc_demo.orders     WHERE updated_at >= $1 AND updated_at < $2',    source_table_name := 'cdc_demo.orders'::regclass,    schedule := '* * * * *');
复制代码

该 pipeline 会自动执行以下操作:

  • 每分钟通过 pg_cron 运行一次;

  • 检测 updated_at 落在当前时间区间内的行;

  • 将发生变化的数据追加写入 Iceberg 表;

  • 维护高水位线(last_processed_time),以实现高效处理;

  • 可以通过 CALL incremental.execute_pipeline('sync_orders_cdc') 手动触发。

步骤 4:在 Snowflake 中创建 Iceberg 表并启用自动刷新

-- On Snowflake: create Iceberg table referencing the CDC Iceberg dataCREATE OR REPLACE ICEBERG TABLE orders_iceberg_cdc  CATALOG = 'pg_orders_catalog'  CATALOG_TABLE_NAME = 'orders_iceberg_cdc'  CATALOG_NAMESPACE = 'cdc_demo'  AUTO_REFRESH = TRUE;-- Configure polling frequency (how often Snowflake checks for new Iceberg snapshots)ALTER CATALOG INTEGRATION pg_orders_catalog  SET REFRESH_INTERVAL_SECONDS = 60;
复制代码

步骤 5:在 Postgres 中模拟数据变更

-- On Postgres: simulate order changesUPDATE cdc_demo.ordersSET order_status = 'X', total_price = 1.00, updated_at = NOW()WHERE order_id IN (SELECT order_id FROM cdc_demo.orders LIMIT 3);INSERT INTO cdc_demo.orders (order_id, customer_id, order_status, total_price,    order_date, order_priority, clerk, ship_priority, comment, created_at, updated_at)VALUES (9999999, 30016, 'N', 999.99, CURRENT_DATE, '1-URGENT',    'Clerk#000000001', 0, 'Test CDC order', NOW(), NOW());-- Manually trigger the pipeline (or wait for pg_cron to run it)CALL incremental.execute_pipeline('sync_orders_cdc');
复制代码

步骤 6:在 Snowflake 中查询 / 验证数据

-- On Snowflake (after auto-refresh or manual refresh):ALTER ICEBERG TABLE orders_iceberg_cdc REFRESH;SELECT * FROM orders_iceberg_cdc WHERE order_id = 9999999;-- Returns the newly inserted order
复制代码

模式 4:CDC —— 从 Snowflake 到 Postgres

业务场景:一个集中式定价引擎在 Snowflake 中重新计算订单优先级,并基于库存水平、需求预测以及促销活动进行动态价格调整。这些重新计算后的结果需要回流到生产环境的 Postgres 数据库,从而使履约系统能够据此进行订单路由;

技术方案:在该场景中,我们使用 STREAM 来检测 Snowflake 中 ORDERS_ENRICHED 表的变更。该 STREAM 会在 TASK 中被引用,并且该 TASK 每 5 分钟触发一次,将变更数据复制到 stage 中并以 Parquet 文件形式存储。在 Postgres 端,这些变更会被 COPY 到 staging 表中,并最终通过 INSERT、UPDATE 或 DELETE 的方式应用到目标表。

步骤 1:在 Snowflake 中创建源表并启用变更追踪

-- Create a table in Snowflake that the pricing engine updatesCREATE OR REPLACE TABLE orders_enriched (    order_id        NUMBER(38,0) PRIMARY KEY,    customer_id     NUMBER(38,0),    order_status    VARCHAR(1),    total_price     NUMBER(15,2),    order_date      DATE,    order_priority  VARCHAR(15),    clerk           VARCHAR(15),    ship_priority   NUMBER(38,0),    comment         VARCHAR(79),    updated_at      TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP());-- Populate with initial dataINSERT INTO orders_enrichedSELECT order_id, customer_id, order_status, total_price, order_date,       order_priority, clerk, ship_priority, comment, CURRENT_TIMESTAMP()FROM orders_iceberg;-- Create a stream to capture changesCREATE OR REPLACE STREAM orders_enriched_stream  ON TABLE orders_enriched  SHOW_INITIAL_ROWS = FALSE;
复制代码

步骤 2:创建一个将变更导出到 Postgres stage 的任务

-- Create a task that runs every 5 minutes when changes are detectedCREATE OR REPLACE TASK export_order_changes_to_pg  WAREHOUSE = COMPUTE_WH  SCHEDULE = '5 MINUTE'  WHEN SYSTEM$STREAM_HAS_DATA('orders_enriched_stream')AS  COPY INTO @pg_orders_stage/cdc_changes/changes_  FROM (      SELECT          order_id,          customer_id,          order_status,          total_price,          order_date,          order_priority,          clerk,          ship_priority,          comment,          METADATA$ACTION AS change_action,          METADATA$ISUPDATE AS is_update,          updated_at      FROM orders_enriched_stream  )  FILE_FORMAT = (TYPE = PARQUET)  OVERWRITE = TRUE;-- Resume the taskALTER TASK export_order_changes_to_pg RESUME;
复制代码

步骤 3:将变更数据加载到 Postgres

-- On Postgres: create a staging table for incoming changesCREATE TABLE cdc_demo.orders_changes_staging (    order_id        BIGINT,    customer_id     BIGINT,    order_status    VARCHAR(1),    total_price     DECIMAL(15,2),    order_date      DATE,    order_priority  VARCHAR(15),    clerk           VARCHAR(15),    ship_priority   INTEGER,    comment         VARCHAR(79),    change_action   TEXT,    is_update       BOOLEAN,    updated_at      TIMESTAMP);-- Load changes from the stageCOPY cdc_demo.orders_changes_stagingFROM '@STAGE/orders_export/cdc_changes/changes_*.parquet';-- Apply changes using UPSERT logic (INSERT ... ON CONFLICT)INSERT INTO cdc_demo.orders_from_snowflake (order_id, customer_id, order_status,    total_price, order_date, order_priority, clerk, ship_priority, comment)SELECT order_id, customer_id, order_status, total_price, order_date,       order_priority, clerk, ship_priority, commentFROM cdc_demo.orders_changes_stagingWHERE change_action = 'INSERT'ON CONFLICT (order_id) DO UPDATE SET    customer_id = EXCLUDED.customer_id,    order_status = EXCLUDED.order_status,    total_price = EXCLUDED.total_price,    order_date = EXCLUDED.order_date,    order_priority = EXCLUDED.order_priority,    clerk = EXCLUDED.clerk,    ship_priority = EXCLUDED.ship_priority,    comment = EXCLUDED.comment;-- Handle true deletes (not part of an update pair)DELETE FROM cdc_demo.orders_from_snowflakeWHERE order_id IN (    SELECT order_id FROM cdc_demo.orders_changes_staging    WHERE change_action = 'DELETE' AND is_update = FALSE);-- Clean up stagingTRUNCATE cdc_demo.orders_changes_staging;
复制代码

模式 5:Openflow CDC —— 从 Postgres 到 Snowflake

业务场景:一个高吞吐量的订单处理系统需要亚秒级的 CDC 延迟,并且要求具备 exactly-once 交付保障。该系统每秒处理数千笔事务,任何数据丢失或重复都会影响财务报告的准确性;

技术方案:Snowflake Openflow 基于 Apache NiFi 构建,并提供连接器,可以针对源系统(本例为 Postgres)执行基于 WAL 的 CDC,该连接器称为 CaptureChangePostgreSQL。变更数据可以通过其他 Openflow 处理器进行转换、过滤与增强处理,随后通过 Snowpipe Streaming 推送到 Snowflake。这是从 Postgres 向 Snowflake 处理数据变更时延迟最低的方案。

  • CaptureChangePostgreSQL 通过逻辑复制连接 PostgreSQL 实例。它会创建一个复制槽(replication slot),对配置的表执行初始快照,然后持续读取 WAL(Write-Ahead Log,预写日志)流中的所有 INSERT、UPDATE 和 DELETE 操作;

  • Transform / Filter(可选处理器)可以对记录进行重组、过滤特定事件、添加计算字段,或将不同类型的变更路由到不同的目标端;

  • PutSnowpipeStreaming 通过 Snowpipe Streaming 的高性能架构,将记录直接写入 Snowflake 表。它通过 offset token 跟踪实现 exactly-once 交付,并且数据在摄取后数秒内即可被查询。

核心能力:

  • 近实时延迟:端到端交付以秒计,而不是分钟;

  • Exactly-once 语义:内置 offset 追踪机制可防止重复与数据丢失;

  • Schema 演进:自动适配源端 schema 的变化;

  • 软删除:被删除的行会标记为 _SNOWFLAKE_DELETED = TRUE,从而保留审计历史;

  • Journal 表:保留完整变更历史,用于合规与回放;

  • 无需自定义代码:通过 Openflow 参数实现完全声明式配置。

整体总结

在了解了这 5 种模式之后,下面将从多个维度对其进行对比总结,以帮助你在不同场景下选择最合适的方案:

结论

Snowflake + Snowflake Postgres 生态系统如今提供了完整的数据流动模式谱系——从使用 pg_lake 和 Iceberg 的简单批量加载,到通过 Openflow 实现的实时 CDC——全部都在一个统一平台内完成。

共享的 Iceberg 存储模型消除了传统架构中管理外部存储、IAM 策略以及文件格式所带来的复杂性,而 Openflow 则为需要亚秒级延迟的工作负载提供生产级 CDC 能力。

选择上:

  • 批量模式适用于追求简单性与成本效率的场景;

  • pg_incremental CDC 适用于在较少基础设施投入下平衡数据新鲜度的场景;

  • Openflow 适用于关键任务级实时数据管道。

最终采用哪种模式,取决于你的延迟需求、数据规模以及对运维复杂度的容忍程度——借助这些工具,你不再被限制在单一方案之中。

原文地址:https://medium.com/snowflake/exploring-bidirectional-data-movement-patterns-between-snowflake-and-postgres-563664bd6a8d

点击链接立即报名注册:Ascent - Snowflake Platform Training - China更多 Snowflake 精彩活动请关注专区