写点什么

如何在 Snowflake 上构建可编程 AI Agent?Cortex Code Agent SDK 深度解析 | 技术实践

  • 2026-05-26
    北京
  • 本文字数:17494 字

    阅读完需:约 57 分钟

2026 年,智能体将在企业级应用中取得哪些实质性突破?点击下载《2026 年 AI 与数据发展预测》白皮书,获悉专家一手前瞻,抢先拥抱新的工作方式!

将 Cortex Code 从交互式 CLI 转变为一个可编程引擎,用于运行自主 AI 工作流。这些工作流既可以在你的本地机器上运行,也可以在 Snowflake 内部以服务端方式运行。

从“它在聊天里能用”到“它在生产环境里能用”的跨越,通常就是事情开始崩塌的地方。聊天适合一次性的代码片段,但它不是一种自动化策略。一旦你需要编排复杂任务,比如在服务端工作流中运行 Agent,或在 CI/CD 中运行 Agent,你需要的是一个可编程引擎,而不是一个文本框。

Cortex Code Agent SDK(或称 “CoCo” SDK)就是 Snowflake 对此给出的答案。它采用与 CLI 中相同的 Agentic 引擎,并将其封装成一个可编程接口。不是由你来驱动 Agent,而是由你的代码来驱动。如果你一直想启动一个真正能在服务端工作流中执行任务的 AI 助手,那么这就是你一直在寻找的工具包。

我们到底在说什么?

如果你最近一直在使用 Snowflake,你很可能已经见过 Cortex Code。它是 Snowflake 的 CLI Agent,不只是“建议”代码——它会真正围绕任务进行推理,读取你的本地文件,运行 SQL,并不断迭代,直到达成目标。

在底层,它正在调用 Claude Sonnet 4.6 或 OpenAI GPT-5.x 这类高端模型。SDK 只是为你提供了一种以编程方式接入这一循环的方法。

CLI 与 SDK:真正的区别

  • CLI 是给你用的:你坐在桌前,实时看着终端中的循环过程;

  • SDK 是适用于凌晨 2 点运行的 Python 脚本:需要检查 100 张表,优化一个视图,并在无需你盯着的情况下生成一份报告。

为什么要使用 SDK?

自动化是显而易见的答案,但真正的“为什么”归结为控制力:

  • 可重复性:在一个庞大的代码库中运行同一个 Agentic 审计,而不必反复手动操作到手腕酸痛;

  • 条件逻辑:你的 Python 代码可以为 Agent 设置“关卡”。例如:如果安全扫描发现高风险漏洞,就调用 Agent 来修复;如果没有,就直接继续;

  • 结构化输出:你可以强制 Agent 返回类型化 JSON,而不是让它给你一段闲聊式的文字,这样你的下游代码才能真正使用这些数据;

  • 自定义工具(MCP):你可以使用 Model Context Protocol 将 Agent 接入你的内部 API,例如 Jira 或 Slack。

SDK 将 Cortex Code 从一个人在环路中的助手,转变为一个由你的 Python 或 TypeScript 代码编排的可编程 Agentic 引擎。

使用场景

以下是 SDK 大放异彩的一些具体场景:

  • 自动化代码审查:扫描每个 PR 中的漏洞、安全问题或风格违规,并返回结构化发现;

  • 数据管道验证:在 ETL 运行后,让 Agent 检查表结构、验证数据质量,并生成报告;

  • 语义视图优化:一个多轮管道,可检查 Snowflake 表,优化语义视图,创建搜索服务,并自主完成整合;

  • 代码库迁移:分析文件中的弃用模式,并在整个代码仓库中应用修复;

  • 机器学习中的特征工程:以 Agentic 方式探索数据集,生成候选特征,评估它们,并为模型选择最佳特征子集;

  • 事件响应自动化:当告警触发时,让 Agent 读取日志、追踪问题,并提出修复方案。

准备基础环境

安装 CLI 和 SDK

你需要安装 CLI 和 SDK 包。安装 Cortex Code CLI:

curl -LsS https://ai.snowflake.com/static/cc-scripts/install.sh | sh
复制代码

从 PyPI 安装 SDK(Python ≥ 3.10):

pip install cortex-code-agent-sdk
复制代码

建立 Snowflake 连接

SDK 会复用你现有的 ~/.snowflake/connections.toml。这是一个很大的优势,因为你不需要管理一套新的凭据。

第 1 步:配置你的连接文件

创建或编辑 ~/.snowflake/connections.toml

[dev-connection]account = "myorg-dev"user = "dev_user"authenticator = "externalbrowser" # Keeps things simple with SSOrole = "ENGINEER_ROLE"warehouse = "COMPUTE_WH"
复制代码

第 2 步:验证你的连接是否可用

cortex --connection my-connection --print "G'day!"
复制代码

第 3 步:将连接传递给 SDK

from cortex_code_agent_sdk import query, CortexCodeAgentOptions# Explicit connection nameoptions = CortexCodeAgentOptions(    connection="my-connection",    cwd=".",)# Or omit 'connection' to use default_connection_name from the toml fileoptions = CortexCodeAgentOptions(cwd=".")
复制代码

connection选项会直接映射到 CLI 的 --connection 标志。SDK 会生成 CLI 子进程,该子进程会使用指定的连接配置文件或默认连接配置文件进行身份验证。

最小可运行示例

import asyncioimport jsonasync def main():    proc = await asyncio.create_subprocess_exec(        "cortex",        "-p", "What files are in this directory?",        "--output-format", "stream-json",        "--allowed-tools", "Read",        "--allowed-tools", "Glob",        "--allowed-tools", "Grep",        stdout=asyncio.subprocess.PIPE,        stderr=asyncio.subprocess.PIPE,    )    while True:        line = await proc.stdout.readline()        if not line:            break        text = line.decode().strip()        if not text:            continue        try:            msg = json.loads(text)        except json.JSONDecodeError:            continue        if msg.get("type") == "assistant":            for block in msg.get("message", {}).get("content", []):                if block.get("type") == "text":                    print(block["text"], end="")        elif msg.get("type") == "result":            print(f"\nDone: {msg.get('subtype', 'unknown')}")    await proc.wait()asyncio.run(main())
复制代码

两种 API 模式概览

在深入架构和示例之前,先快速了解使用 SDK 的两种主要方式:

单轮 —— 发送一个提示,消费 NDJSON(换行分隔 JSON)流,然后结束。

proc = await asyncio.create_subprocess_exec(    "cortex",    "-p", "Review utils.py for bugs. Fix any issues you find.",    "--output-format", "stream-json",    "--allowed-tools", "Read",    "--allowed-tools", "Edit",    "--allowed-tools", "Glob",    "--allowed-tools", "Grep",    stdout=asyncio.subprocess.PIPE,    stderr=asyncio.subprocess.PIPE,)# ... parse NDJSON lines from proc.stdout
复制代码

多轮 —— 通过生成顺序 CLI 调用来串联多个提示,或使用 SDK 客户端实现持久会话。

## Turn 1proc1 = await asyncio.create_subprocess_exec(    "cortex",    "-p", "Inspect the schema and optimise the semantic view",    "--output-format", "stream-json",    "--allowed-tools", "Read", "--allowed-tools", "Glob",    "--allowed-tools", "Grep", "--allowed-tools", "Edit",    stdout=asyncio.subprocess.PIPE,    stderr=asyncio.subprocess.PIPE,)# ... parse NDJSON lines from proc1.stdoutawait proc1.wait()## Turn 2: Launch another agent that builds on the file changes from Turn 1proc2 = await asyncio.create_subprocess_exec(    "cortex",    "-p", "Now create search services for text columns",    "--output-format", "stream-json",    "--allowed-tools", "Read", "--allowed-tools", "Write",    "--allowed-tools", "Glob", "--allowed-tools", "Grep",    stdout=asyncio.subprocess.PIPE,    stderr=asyncio.subprocess.PIPE,)# ... parse NDJSON lines from proc2.stdoutawait proc2.wait()
复制代码

它实际运行在哪里?

一个关键的架构要点是,Cortex Code Agent SDK 可在两种环境中运行,让你能够灵活部署自己的 Agentic 工作流。

Agentic 工作流

客户端侧(Snowflake 外部)

这是指任何由你控制、但并非 Snowflake 本身的环境——你的笔记本电脑、虚拟机、GitHub Actions runner、Docker 容器。你的 Python 或 TypeScript 脚本在那里运行,SDK 会将 Cortex CLI 作为子进程启动,通过 stdin/stdout 使用 NDJSON 流进行通信,而 CLI 则负责完整的 Agent 循环,包括通过 Snowflake Cortex 调用 LLM。

何时使用:开发工作流、CI/CD 管道、本地自动化脚本,以及任何需要访问项目目录文件系统的场景。

服务端侧(Snowflake 内部)

SDK 也可以在 Snowflake 内部运行。例如,在 Snowpark Container Services(SPCS)容器、Snowflake Notebook 或 Stored Procedure 中运行。在这种模式下,Agent 会作用于服务端资源,例如 Snowflake stage、内部表和 Snowflake 托管文件。

何时使用:以数据为中心的工作流,其中 Agent 需要操作 Snowflake 原生资源;对治理敏感、数据不应离开 Snowflake 的环境;或完全在平台内部运行的计划任务。

SDK 架构

理解架构有助于你推理当代码调用query() 或创建 CortexCodeSDKClient时会发生什么。完整图景如下:

Cortex Code SDK 架构

关键架构概念

  1. 子进程通信

你的代码永远不会直接与 LLM 对话。SDK 会将 Cortex 作为子进程启动,并通过 stdout 上的 NDJSON 流进行通信。每一行都是一条带类型的消息——SystemMessageAssistantMessageResultMessageStreamEvent

  1. Agent 循环

当你发送一个提示时,Agent 会进入一个自主推理循环:

  • 推理:LLM 读取上下文并决定要做什么;

  • 行动:Agent 调用一个工具(读取文件、执行 SQL、编辑代码……);

  • 观察:Agent 查看工具的输出;

  • 决策:任务完成了吗?如果没有,就回到“推理”。

像“修复 code.py 中的 bug”这样的单个提示,可能会触发多次迭代,包括读取文件、识别问题、应用修复、验证编辑。

  1. 你控制 vs. Agent 决策

控制与决策

  1. 权限模型

SDK 对工具执行提供细粒度控制:

对工具执行的控制

  1. 消息流

每次交互都会生成你可以消费的带类型消息:

带类型消息

AssistantMessage包含内容块:TextBlock(推理文本)、ToolUseBlock(工具调用)、ToolResultBlock(工具输出)或 ThinkingBlock(思维链)。

  1. Skills 继承可用

Cortex Code 附带一套内置 Skills 库。这些 Skills 并不是交互式 CLI 独有的。当你通过 SDK 运行 Agent 时,完整的 Skills 集仍然可用。如果某个提示匹配某项 Skill 的领域,Agent 会像在终端会话中一样激活它。

  1. MCP 集成

SDK 支持 Model Context Protocol,用于自定义工具。在 Python 中,你可以使用 @tool 装饰器和 create_sdk_mcp_server() 内联定义工具,该函数会在同一进程中运行一个 MCP 服务器(不需要外部服务器)。请注意,截至该 SDK 版本,MCP 是自定义工具集成唯一受支持的路径。

示例 1:“放手不管”的 Bug 猎手

我想看看,是否可以把 SDK 指向一个有问题的脚本,让它在没有任何人工干预的情况下“自动修复”代码。我使用了一个简单的 buggy_calculator.py,里面包含一些经典的 ZeroDivisionError 和 KeyError 陷阱。

有 Bug 的文件

创建 buggy_calculator.py

# buggy_calculator.pyimport jsonfrom datetime import datetimedef calculate_average(numbers):    total = 0    for num in numbers:        total += num    return total / len(numbers)  # Bug: ZeroDivisionError on empty listdef parse_user_config(config_string):    config = json.loads(config_string)    return {        "username": config["user"]["name"].strip(),    # Bug: KeyError if "user" or "name" missing        "timeout": int(config["settings"]["timeout"]), # Bug: KeyError if nested keys missing        "created": datetime.strptime(            config["metadata"]["created_at"],          # Bug: KeyError if key missing            "%Y-%m-%d"        ),    }def find_outliers(data, threshold):    mean = sum(data) / len(data)  # Bug: ZeroDivisionError on empty list    std_dev = (sum((x - mean) ** 2 for x in data) / len(data)) ** 0.5  # Same bug    return [x for x in data if abs(x - mean) > threshold * std_dev]def merge_sorted_lists(list_a, list_b):    result = []    i, j = 0, 0    while i < len(list_a) and j < len(list_b):        if list_a[i] <= list_b[j]:            result.append(list_a[i])            i += 1        else:            result.append(list_b[j])            j += 1    # Bug: remaining elements from both lists are never appended    return result
复制代码

Agent 脚本

该脚本会将 Cortex Code CLI 作为子进程调用,在 Agent 查找并修复每个 Bug 的同时,实时流式输出 NDJSON:

# fix_bugs.pyimport asyncioimport jsonPROMPT = """Review the file buggy_calculator.py for bugs that would cause crashes or incorrect results.For each bug you find:1. Explain what the bug is and when it would trigger2. Fix the bug in the fileBe thorough -- check for edge cases like empty inputs, missing keys, and incomplete logic."""async def main():    print("Agent starting bug analysis...\n")    proc = await asyncio.create_subprocess_exec(        "cortex",        "-p", PROMPT,        "--output-format", "stream-json",        "--allowed-tools", "Read",        "--allowed-tools", "Write",        "--allowed-tools", "Edit",        "--allowed-tools", "Glob",        "--allowed-tools", "Grep",        "--allowed-tools", "Bash",        stdout=asyncio.subprocess.PIPE,        stderr=asyncio.subprocess.PIPE,    )    while True:        line = await proc.stdout.readline()        if not line:            break        text = line.decode().strip()        if not text:            continue        try:            msg = json.loads(text)        except json.JSONDecodeError:            continue        msg_type = msg.get("type")        if msg_type == "assistant":            content = msg.get("message", {}).get("content", [])            for block in content:                if block.get("type") == "text":                    print(block["text"], end="")                elif block.get("type") == "tool_use":                    print(f"\n> Tool: {block['name']}")        elif msg_type == "result":            subtype = msg.get("subtype", "unknown")            duration = msg.get("duration_ms", 0)            if subtype == "success":                print(f"\n\nAll bugs fixed successfully. ({duration}ms)")            else:                print(f"\n\nAgent finished with status: {subtype} ({duration}ms)")    await proc.wait()asyncio.run(main())
复制代码

当你运行 python fix_bugs.py 时,Agent 会:

  1. 读取 buggy_calculator.py 以理解代码;

  2. 推理每个函数并识别边界情况;

  3. 多次编辑文件——添加空列表保护、使用 .get() 安全访问键,并在 merge_sorted_lists 中追加剩余元素;

  4. 报告它修复了什么以及原因。

你可以实时看到完整的推理流、工具调用和编辑过程。修复后的文件会被写回磁盘(无需人工干预)。

下面是 Agent 生成的内容(为清晰起见,推理部分有所缩写):

Agent starting bug analysis...I'll review buggy_calculator.py for bugs that could cause crashes or incorrect results.> Tool: ReadI've read the file. Let me analyse each function for bugs.**Bug 1: `calculate_average` -- ZeroDivisionError on empty list**The function divides by `len(numbers)` without checking if the list is empty.When called with `calculate_average([])`, this crashes with `ZeroDivisionError`.> Tool: EditFixed: Added an early return for empty lists:    if not numbers:        return 0**Bug 2: `parse_user_config` -- KeyError on missing nested keys**The function accesses `config["user"]["name"]`, `config["settings"]["timeout"]`,and `config["metadata"]["created_at"]` directly. If any of these keys are missing,it raises `KeyError`.> Tool: EditFixed: Replaced direct key access with `.get()` and sensible defaults:    user = config.get("user", {})    settings = config.get("settings", {})    metadata = config.get("metadata", {})**Bug 3: `find_outliers` -- ZeroDivisionError on empty list**Same pattern as `calculate_average` -- `sum(data) / len(data)` crashes on empty input.> Tool: EditFixed: Added an early return for empty data lists.**Bug 4: `merge_sorted_lists` -- remaining elements never appended**After the `while` loop exits, any remaining elements in `list_a` or `list_b` aresilently dropped. For example, `merge_sorted_lists([1, 3, 5], [2, 4])` returns`[1, 2, 3, 4]` instead of `[1, 2, 3, 4, 5]`.> Tool: EditFixed: Added `result.extend(list_a[i:])` and `result.extend(list_b[j:])` after the loop.All bugs fixed successfully. (12850ms)
复制代码

示例 2:面向机器学习的 Agentic 特征工程与选择

对于更复杂的工作,例如特征工程,一个提示通常是不够的。如果你试图一次性做太多事情,最终会出现“上下文漂移”。更好的方式是把它拆成多个回合。

我们来构建一个客户流失预测模型。你有一个原始数据集,需要从原始列出发,得到一个经过排序和验证的特征集,但你不想手动编写每一个转换。相反,你让 Agent 探索数据、工程化候选特征、评估它们,并以结构化 JSON 的形式返回最终排序后的特征列表。

数据集

创建 customer_churn.csv,这是一个包含 10 列的简化数据集:

customer_id,tenure_months,monthly_charges,total_charges,contract_type,payment_method,num_support_tickets,has_online_security,has_tech_support,churnedC001,12,64.50,774.00,month-to-month,credit_card,3,0,0,1C002,48,89.20,4281.60,one_year,bank_transfer,0,1,1,0C003,3,29.99,89.97,month-to-month,credit_card,5,0,0,1C004,72,105.00,7560.00,two_year,auto_pay,1,1,1,0C005,1,45.00,45.00,month-to-month,credit_card,2,0,0,1...
复制代码

在实践中,这会是数百或数千行,通过基于样本记录合成生成更多记录。列中混合了数值、分类和二进制数据——这正是能够从自动化特征工程中受益的那类复杂输入。

编排脚本

这是完整的管道脚本。它定义了一个可复用的 run_agent_turn() 辅助函数,该函数会将 Cortex Code CLI 作为子进程启动,并使用 --output-format stream-json--allowed-tools,从 stdout 逐行读取 NDJSON 消息,并返回解析后的结果,包括任何结构化 JSON 输出。随后,main() 函数会运行四个连续回合,并在每个回合之间使用 Python 决策逻辑来控制管道是继续还是中止。

# ml_feature_pipeline.pyimport asyncioimport json# Tools the agent is allowed to use across all turns.ALLOWED_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep", "Bash"]# JSON Schema for the final structured output -- Turn 4's response# must conform to this shape.FEATURE_SCHEMA = {    "type": "object",    "properties": {        "target_column": {"type": "string"},        "selected_features": {            "type": "array",            "items": {                "type": "object",                "properties": {                    "name": {"type": "string"},                    "importance_score": {"type": "number"},                    "category": {                        "type": "string",                        "enum": ["original", "engineered"],                    },                    "rationale": {"type": "string"},                },                "required": ["name", "importance_score", "category", "rationale"],            },        },        "dropped_features": {            "type": "array",            "items": {                "type": "object",                "properties": {                    "name": {"type": "string"},                    "reason": {"type": "string"},                },                "required": ["name", "reason"],            },        },    },    "required": ["target_column", "selected_features", "dropped_features"],}async def run_agent_turn(prompt: str, output_schema: dict | None = None) -> dict:    """Run a single agent turn via the cortex CLI and return parsed results.    Returns a dict with:        - "text": concatenated assistant text blocks        - "result_subtype": "success" | "error" | ...        - "duration_ms": execution time        - "structured_output": parsed JSON if output_schema was provided, else None    """    cmd = ["cortex", "-p", prompt, "--output-format", "stream-json"]    for tool in ALLOWED_TOOLS:        cmd.extend(["--allowed-tools", tool])    if output_schema:        cmd.extend(["--output-format-json-schema", json.dumps(output_schema)])    proc = await asyncio.create_subprocess_exec(        *cmd,        stdout=asyncio.subprocess.PIPE,        stderr=asyncio.subprocess.PIPE,    )    text_parts = []    result_subtype = "unknown"    duration_ms = 0    structured_output = None    while True:        line = await proc.stdout.readline()        if not line:            break        text = line.decode().strip()        if not text:            continue        try:            msg = json.loads(text)        except json.JSONDecodeError:            continue        msg_type = msg.get("type")        if msg_type == "assistant":            content = msg.get("message", {}).get("content", [])            for block in content:                if block.get("type") == "text":                    text_parts.append(block["text"])                    print(block["text"], end="")                elif block.get("type") == "tool_use":                    print(f"\n> Tool: {block['name']}")        elif msg_type == "result":            result_subtype = msg.get("subtype", "unknown")            duration_ms = msg.get("duration_ms", 0)            structured_output = msg.get("structured_output")            print(f"\n[Turn complete: {result_subtype} ({duration_ms}ms)]")    await proc.wait()    return {        "text": "".join(text_parts),        "result_subtype": result_subtype,        "duration_ms": duration_ms,        "structured_output": structured_output,    }async def main():        ## Turn 1: Explore the dataset    print("=" * 60)    print("TURN 1: Data Exploration")    print("=" * 60)    turn1 = await run_agent_turn("""        Read customer_churn.csv and profile it:        - List every column with its dtype, unique count, and missing-value count        - For numeric columns, report min/median/max and any outliers        - For categorical columns, report value distributions        - Compute correlations with the 'churned' target column        - Summarise your findings at the end    """)    # Decision gate: check if the data is usable     if "missing" in turn1["text"].lower() and "100%" in turn1["text"]:        print("\n[PIPELINE] Dataset has entirely empty columns. Aborting.")        return        ## Turn 2: Engineer candidate features     print("\n" + "=" * 60)    print("TURN 2: Feature Engineering")    print("=" * 60)    await run_agent_turn("""        Based on the customer_churn.csv dataset, write a Python script called        feature_engineering.py that:        1. Reads customer_churn.csv        2. Generates candidate features:           - charge_per_month = total_charges / max(tenure_months, 1)           - ticket_rate = num_support_tickets / max(tenure_months, 1)           - has_protection = has_online_security | has_tech_support           - is_new_customer = 1 if tenure_months <= 6 else 0           - One-hot encode contract_type and payment_method        3. Drops customer_id (non-predictive) and the original           categorical columns after encoding        4. Saves the result to features_output.csv        5. Prints the shape and column list when done        Make sure the script handles edge cases (division by zero,        missing values) and runs without errors.    """)    ## Turn 3: Evaluate feature importance    print("\n" + "=" * 60)    print("TURN 3: Feature Evaluation")    print("=" * 60)    turn3 = await run_agent_turn("""        Run feature_engineering.py to generate features_output.csv.        Then evaluate each feature's predictive power for the 'churned' target:        1. Compute the absolute Pearson correlation of every feature with 'churned'        2. Rank features from most to least important        3. Flag any features with near-zero variance or high mutual correlation           (>0.9 with another feature)        Print a ranked table of features with their scores.    """)    ## Turn 4: Structured selection    print("\n" + "=" * 60)    print("TURN 4: Feature Selection (Structured Output)")    print("=" * 60)    turn4 = await run_agent_turn(        f"""        Here are the feature evaluation results from a previous analysis:        {turn3['text']}        Based on these results, select the final feature set for a churn        prediction model. Include features with importance score > 0.05.        Drop features that are redundant or have near-zero variance.        Return your selection as structured JSON matching the provided schema.        For each selected feature, include its importance score, whether it's        an original or engineered feature, and a brief rationale.        For each dropped feature, explain why it was excluded.        """,        output_schema=FEATURE_SCHEMA,    )    ## Use the structured result programmatically    result = turn4["structured_output"]    if result:        print("\n" + "=" * 60)        print("PIPELINE RESULT")        print("=" * 60)        print(f"Target: {result['target_column']}")        print(f"Selected features: {len(result['selected_features'])}")        for f in result["selected_features"]:            print(f"  {f['importance_score']:.3f}  {f['name']:30s}  [{f['category']}]")        print(f"\nDropped features: {len(result['dropped_features'])}")        for f in result["dropped_features"]:            print(f"  x {f['name']:30s}  ({f['reason']})")        # Write the final feature manifest for downstream consumers        with open("feature_manifest.json", "w") as fp:            json.dump(result, fp, indent=2)        print("\nFeature manifest saved to feature_manifest.json")asyncio.run(main())
复制代码

它的工作方式

  1. Turn 1(探索):Agent 分析 CSV,并检查数据质量;

  2. Turn 2(代码):Agent 编写一个 transform.py 脚本来处理 one-hot encoding;

  3. Turn 3(执行):Agent 使用 Bash 工具运行该脚本并检查输出;

  4. Turn 4(JSON):Agent 使用我提供的 schema 返回结构化的 feature_manifest.json

下面是我构建的管道所产生的内容(为清晰起见,Agent 推理部分有所缩写):

============================================================TURN 1: Data Exploration============================================================I'll read customer_churn.csv and profile the dataset.> Tool: ReadThe dataset has 50 rows and 10 columns. Here's the profile:| Column               | Dtype   | Unique | Missing | Min    | Median  | Max      ||----------------------|---------|--------|---------|--------|---------|----------|| customer_id          | string  | 50     | 0       | -      | -       | -        || tenure_months        | int     | 38     | 0       | 1      | 20.0    | 72       || monthly_charges      | float   | 46     | 0       | 29.99  | 74.0    | 110.0    || total_charges        | float   | 50     | 0       | 30.0   | 1976.0  | 7560.0   || contract_type        | string  | 3      | 0       | -      | -       | -        || payment_method       | string  | 3      | 0       | -      | -       | -        || num_support_tickets  | int     | 8      | 0       | 0      | 1.0     | 7        || has_online_security  | int     | 2      | 0       | 0      | -       | 1        || has_tech_support     | int     | 2      | 0       | 0      | -       | 1        || churned              | int     | 2      | 0       | 0      | -       | 1        |**Correlations with `churned`**:- tenure_months:       -0.82 (strong negative -- longer tenure = less churn)- num_support_tickets:  0.79 (strong positive -- more tickets = more churn)- has_online_security: -0.72- has_tech_support:    -0.60- monthly_charges:     -0.48- total_charges:       -0.74No columns are entirely missing. The dataset is clean and usable.[Turn complete: success (14520ms)]============================================================TURN 2: Feature Engineering============================================================I'll create feature_engineering.py with the candidate features.> Tool: WriteI've written feature_engineering.py. The script:1. Reads customer_churn.csv2. Engineers 4 new features: charge_per_month, ticket_rate, has_protection, is_new_customer3. One-hot encodes contract_type (3 categories) and payment_method (3 categories)4. Drops customer_id and the original categorical columns> Tool: BashOutput shape: (50, 16). Columns:tenure_months, monthly_charges, total_charges, num_support_tickets,has_online_security, has_tech_support, churned, charge_per_month,ticket_rate, has_protection, is_new_customer, contract_month-to-month,contract_one_year, contract_two_year, payment_auto_pay, payment_bank_transfer[Turn complete: success (21340ms)]============================================================TURN 3: Feature Evaluation============================================================I'll run feature_engineering.py and evaluate feature importance.> Tool: BashFeature Importance (absolute Pearson correlation with `churned`):| Rank | Feature                   | Correlation ||------|---------------------------|-------|| 1    | tenure_months             | 0.821 || 2    | contract_month-to-month   | 0.806 || 3    | num_support_tickets       | 0.789 || 4    | total_charges             | 0.742 || 5    | is_new_customer           | 0.723 || 6    | has_online_security       | 0.718 || 7    | ticket_rate               | 0.695 || 8    | has_tech_support          | 0.601 || 9    | has_protection            | 0.584 || 10   | monthly_charges           | 0.483 || 11   | charge_per_month          | 0.412 || 12   | contract_two_year         | 0.378 || 13   | payment_auto_pay          | 0.305 || 14   | contract_one_year         | 0.267 || 15   | payment_bank_transfer     | 0.041 |**Redundancy flags**:- total_charges & tenure_months: correlation 0.94 (>0.9 threshold)- has_protection & has_online_security: correlation 0.91 (>0.9 threshold)[Turn complete: success (18760ms)]============================================================TURN 4: Feature Selection (Structured Output)============================================================Based on the evaluation results, I'll select the optimal feature setand return structured JSON.[Turn complete: success (9830ms)]============================================================PIPELINE RESULT============================================================Target: churnedSelected features: 9  0.821  tenure_months                 [original]  0.806  contract_month-to-month       [engineered]  0.789  num_support_tickets           [original]  0.723  is_new_customer               [engineered]  0.718  has_online_security           [original]  0.695  ticket_rate                   [engineered]  0.601  has_tech_support              [original]  0.483  monthly_charges               [original]  0.412  charge_per_month              [engineered]Dropped features: 6  x total_charges                   (redundant with tenure_months, r=0.94)  x has_protection                  (redundant with has_online_security, r=0.91)  x contract_two_year               (below importance threshold after multicollinearity check)  x payment_auto_pay                (moderate importance but highly correlated with contract_type features)  x contract_one_year               (below importance threshold after multicollinearity check)  x payment_bank_transfer           (near-zero correlation: 0.041)Feature manifest saved to feature_manifest.json
复制代码

最终的 feature_manifest.json 是一个机器可读的工件,下游训练管道可以直接消费它(无需解析 Agent 文本)。

Python SDK 深入解析:配置、API 与模式

既然你已经看到了 SDK 的实际运行方式,下面进一步了解你最常用的关键配置选项、API 接口和模式。

使用 CortexCodeAgentOptions 配置 Agent

CortexCodeAgentOptions是你传递给 query()CortexCodeSDKClient 的配置 dataclass。这些选项可以分为四组:

环境:cwd(工作目录)、model(例如 "claude-sonnet-4-6""claude-opus-4-6""gpt-5.2")、connection(Snowflake 连接名称)、profile(命名设置配置文件)、cli_path(自定义 CLI 二进制文件路径);

权限:--allowed-tools(通过 CLI 显式设置工具白名单)、permission_mode"default""acceptEdits""plan""bypassPermissions")、allowed_tools / disallowed_tools(SDK 中的白名单/黑名单)、can_use_tool(用于自定义逻辑的异步回调);

可扩展性:mcp_servers(自定义工具服务器)、hooks(生命周期事件处理器,仅 Python)、agents(子 Agent 定义)。

以下展示了不同环境下的配置差异:

# Development: full autonomy via CLIdev_cmd = [    "cortex", "-p", prompt,    "--output-format", "stream-json",    "--allowed-tools", "Read",    "--allowed-tools", "Write",    "--allowed-tools", "Edit",    "--allowed-tools", "Glob",    "--allowed-tools", "Grep",    "--allowed-tools", "Bash",]# Production: scoped permissions via SDKprod = CortexCodeAgentOptions(    cwd="/path/to/project",    model="claude-opus-4-6",    allowed_tools=["Read", "Glob", "Grep", "snowflake_sql_execute"],    disallowed_tools=["Bash", "Write"],    can_use_tool=my_permission_callback,)
复制代码

单轮 API 与多轮 API

query(prompt, options)

即发即忘。它会启动一个 CLI 子进程,流式返回消息,然后终止。适用于一次性任务;

CortexCodeSDKClient

持久会话。CLI 子进程会在多个回合之间保持运行,并保留完整上下文。适用于后续步骤依赖前面步骤的情况。

# Multi-turn: each query() call builds on the previous context# Turn 1async with CortexCodeSDKClient(options) as client:    await client.query("Read the database schema and summarise the tables.")    async for msg in client.receive_response():        handle_message(msg)# Turn 2 knows about the schema from Turn 1    await client.query("Now create a semantic view for the customer table.")    async for msg in client.receive_response():        handle_message(msg)
复制代码

除了 query()receive_response() 之外,关键的 CortexCodeSDKClient 方法还包括:

SDK 客户端方法

消息类型概览

SDK 会流式输出带类型的消息,你可以在 async for 循环中消费它们:

SDK 流式带类型消息

每个 AssistantMessage 都包含内容块:TextBlock(推理文本)、ToolUseBlock(带有 .name.input 的工具调用)、ToolResultBlock(工具输出)或 ThinkingBlock(思维链)。

使用 can_use_tool 处理权限

对于生产系统,can_use_tool 回调会在每次工具调用执行前进行拦截:

from cortex_code_agent_sdk import PermissionResultAllow, PermissionResultDenyasync def can_use_tool(tool_name, tool_input, context):    # Allow read-only tools unconditionally    if tool_name in ("Read", "Glob", "Grep"):        return PermissionResultAllow()    # Block destructive commands    if tool_name == "Bash" and "rm " in tool_input.get("command", ""):        return PermissionResultDeny(message="Destructive commands not allowed")    # Allow but add a timeout constraint    if tool_name == "Bash":        return PermissionResultAllow(            updated_input={**tool_input, "command": f"timeout 30 {tool_input['command']}"}        )    return PermissionResultDeny(message=f"Tool '{tool_name}' requires approval")
复制代码

你可以将 can_use_toolpermission_mode 结合使用——内置模式会先进行过滤,然后由你的回调处理其余部分。

Hooks:观察 Agent 行为(仅 Python)

Hooks 让你可以接入 Agent 生命周期,用于日志记录、审计或自定义逻辑。最有用的 hooks 包括:

Hooks

options = CortexCodeAgentOptions(    allowed_tools=["Read", "Write", "Edit", "Glob", "Grep", "Bash"],    hooks={        "PreToolUse": my_pre_tool_logger,        "PostToolUse": my_post_tool_logger,        "Stop": my_completion_handler,    },)
复制代码

其他可用 hooks:PostToolUseFailureUserPromptSubmitSubagentStartSubagentStopNotificationPermissionRequestPreCompact

MCP 自定义工具:扩展 Agent

Model Context Protocol 允许你为 Agent 提供自定义工具。在 Python 中,可以使用 @tool 内联定义这些工具,并在进程内运行它们:

from cortex_code_agent_sdk import tool, create_sdk_mcp_server@tool("search_jira", "Search Jira tickets", {"query": str, "project": str})async def search_jira(args):    results = await jira_client.search(args["query"], project=args["project"])    return {"content": [{"type": "text", "text": json.dumps(results)}]}server = create_sdk_mcp_server(name="internal", version="1.0.0", tools=[search_jira])options = CortexCodeAgentOptions(    allowed_tools=["Read", "Write", "Edit", "Glob", "Grep", "Bash"],    mcp_servers={"internal": server},)
复制代码

现在,Agent 可以像调用内置工具一样调用 search_jira——无需外部服务器进程。

错误处理

SDK 会针对不同失败模式抛出特定异常:

失败模式

这些异常都继承自 CortexCodeSDKError。同时,也要检查 ResultMessage.subtype,以识别 Agent 级别的错误,例如 "error_max_turns""error_max_budget_usd"

Agentic 工作流设计原则

  1. 提示工程是你的主要控制机制。要明确说明你想要什么、使用什么格式,以及需要遵循哪些约束。在提示中包含角色上下文;

  2. 根据任务限定权限范围。使用 allowed_tools 仅将所需工具列入白名单。代码分析 Agent 需要 ReadGlobGrep——不需要 WriteBash

  3. 使用结构化输出供机器消费。当下游代码需要消费 Agent 结果时,应通过 output_format 定义 JSON Schema,而不是解析自由文本;

  4. 只有在需要上下文延续时才使用多轮。单轮更简单,也更便宜。只有当后续步骤确实依赖前面上下文时,才使用多轮;

  5. 观察消息流。即使在自主模式下,也要记录 Agent 的行为。消息流为调试和审计提供了完整可见性;

  6. 显式处理错误。检查 ResultMessage.subtype 中的 error_max_turnserror_during_executionerror_max_budget_usd。用 try/except 包裹 JSON 解析。

总结

Cortex Code Agent SDK 将 AI 从一个“聊天伙伴”转变为基础设施的一部分。它并不是要取代开发者,而是要卸载生命周期中那些枯燥、重复的部分(样板代码、基础重构和初始数据画像),让我们能够专注于真正的架构。

原文地址:https://medium.com/snowflake/building-programmable-ai-agents-on-snowflake-a-deep-dive-into-the-cortex-code-agent-sdk-811be94b004e

点击链接立即报名注册:Ascent - Snowflake Platform Training - China更多 Snowflake 精彩活动请关注专区