亚马逊AWS官方博客

Strands Agents 快速上手 – dbt on Amazon Redshift 数据质量智能体

背景及关键技术

在当今数据驱动的商业环境中,数据质量已成为企业运营的关键因素。因为数据质量问题造成的企业运营决策失误的案例每天都在发生,例如,某电商平台在分析季度业绩时,发现其广告投放效果的 ROI 指标异常偏高。经过营销团队的深入分析,发现是因为数据建模过程中的一个微小错误导致的。在计算广告转化销售额时,ETL 过程中未正确处理订单退款数据,导致销售金额被重复计算。这个数据质量问题不仅造成了营销团队错误地增加了某些低效渠道的预算投入,还导致了近一个月的错误决策,造成了显著的预算浪费。如果能够及时发现订单金额的异常波动,这个问题本可以被提前预防。

随着数据规模的不断扩大和数据管道的日益复杂,数据工程师面临着越来越大的挑战。他们需要确保数据的准确性、完整性和及时性,同时还要能够快速识别和解决潜在的数据质量问题。在实际生产环境中,数据工程师往往面临着以下挑战:

  • 需要同时监控数百个数据模型的质量
  • 数据异常往往涉及多个上下游表的复杂关系
  • 问题定位和根因分析需要大量的人工时间
  • 数据质量问题发现往往滞后于业务影响

传统的数据质量管理方法往往依赖于人工检查和固定的监控规则,这不仅耗时费力,而且容易错过新出现的数据异常。在实际生产环境中,当数据质量出现问题时,数据工程师常常需要花费大量时间来排查原因,这严重影响了团队的工作效率和数据产品的可靠性。

为了解决这些挑战,我们可以借助新兴的 AI 技术和成熟的数据工程工具来构建智能化的数据质量监控系统。本文将介绍如何结合 Strands Agents 框架、Amazon Bedrockdbt 和 Amazon Redshift 这些技术,构建一个能够自动监控、诊断和报告数据质量问题的智能系统。实现:

  • 自动执行数据质量检测
  • 智能分析异常指标的根因
  • 追踪数据血缘关系辅助问题定位
  • 生成可执行的修复建议

技术实现

在数据质量监控逐渐引入 AI Agent 来增强自动化诊断能力。这类智能 Agent(通常由大语言模型+工具组成)可以自主扫描数据管道和指标,一旦发现异常立即触发分析。例如,某些数据质量平台中的数据血缘 Agent 能够自动绘制数据流向图,持续追踪上游/下游依赖;在检测到关键指标偏离时,这些 Agent 还会关联多维信号(跨列、跨时间的指标模式)来识别真正重要的异常。更重要的是,智能 Agent 具备快速根因归因能力:它们可以沿着 dbt 等工具提供的血缘关系链,将异常现象追溯到具体的数据表、字段或源头任务,从而立刻定位问题来源。

由亚马逊云科技推出的 Strands Agents 是一个开源、轻量级的 AI Agent 开发框架。其设计原则是“模型驱动”:将规划和执行任务的逻辑交给大语言模型完成,无需传统的硬编码工作流编排。Strands 框架拥有以下特性:模型无关(支持多种 LLM);无需编排(orchestration-free),让开发者关注结果而非流程细节;内置可观测性(集成 OpenTelemetry,可监控 Agent 运行时的跟踪和指标);通过 Anthropic 的 Model Context Protocol (MCP) 工具支持,使其能与上千种外部工具无缝集成(内置 20+工具,如数据库、API 调用、文件读取等,并可通过 MCP 扩展更多)。

dbt(data build tool)是一款流行的数据建模与转换工具,广泛用于现代数据堆栈中,特别适配如 Amazon Redshift 等云数据仓库,能够帮助数据工程师以 SQL 方式构建、测试、文档化数据模型。通过 dbt,团队可以实现数据转换过程的版本控制、可重复执行及自动测试。dbt MCP Server 是一个开源的元数据服务,基于 dbt 项目的 manifest、catalog 等文件构建,提供标准化 API 接口,支持项目构建、测试、获取模型依赖、列级血缘、描述信息等。它可以与多个工具集成,如 数据质量监控系统、指标平台(如 Lightdash)、AI Agent 工具(如 Strands 等,帮助系统实现自动化的数据血缘分析和异常诊断。

本文从一个以 Amazon Redshift 示例数据集 TICKIT 配合 dbt 进行数仓建模,在建模过程中,预设几个计算及逻辑错误,并且编写了单元测试。通过使用 Strands Agents 框架编写的数据质量检测 Agent ,结合 dbt MCP Server 进行自动化运行项目构建和运行测试,判断数据质量问题及给出详细解决建议及优化思路。

Agent 逻辑

数据质量检测 Agent 采用了分离式执行+工作流编排的混合架构,具有重要的业务价值。首先直接调用 dbt MCP 工具执行数据管道构建和质量检测,这确保了数据完整性和业务规则合规性(如 15% 佣金率、票价范围验证);然后通过 workflow 编排深度分析流程,使用 think 工具进行多维度业务影响评估,从技术、业务、系统三个层面分析数据质量问题对 TICKIT 票务系统的影响;最终生成精确到代码行级别的双语修复报告,为技术团队提供可执行的修复方案,为业务团队提供决策支持。这种设计避免了传统数据质量检测中人工分析耗时、修复建议模糊、业务影响评估不准确等问题,实现了自动化、智能化、可操作化的数据质量管理。

前置操作

准备 Redshift 集群,并导入 TICKIT  sample_data

下载示例工程代码

创建虚拟 Python 环境,安装所需依赖

python -m venv .venv
source .venv/bin/activate  # For macOS/Linux
pip install strands-agents strands-agents-tools dbt-core dbt-redshift

修改 dbt/profiles.yml 中 redshift 连接信息

tickit_analytics:
  outputs:
    dev:
      dbname: sample_data_dev
      host: redshift
      password: redshift
      port: 5439
      schema: tickit
      threads: 4
      type: redshift
      user: awsuser
  target: dev

下载 dbt mcp 配置项目环境变量

mv dbt-mcp/.env.example dbt-mcp/.env
## 修改 DBT_PROJECT_DIR
DBT_PROJECT_DIR=/path/to/your/dbt/project 

修改 Agent 执行路径,可选修改模型 id ,本示例使用 Amazon Nova Premier 模型

StdioServerParameters(
    command="/path/to/.venv/bin/dbt-mcp",
    args=[],
    env={}
)

关键代码解读

本案例中在 dbt 工程中,预设错误数仓执行逻辑

  • 事件小时提取错误

问题:event_hour 提取的是分钟而不是小时

错误代码片段(如第 10 行):

extract(minute from starttime) as event_hour
  • 佣金计算错误

问题:佣金率用错,实际应为 15%,但代码用 20%

错误代码片段(如第 12 行):

pricepaid * 0.20 as calculated_commission
  • 全名格式化错误

问题:姓名拼接时缺少空格

错误代码片段(如第 8 行):

firstname || lastname as full_name

Strands Agents 调用 dbt MCP server

  • 通过 MCPClient 连接 dbt-mcp server,注册所有 dbt 工具(build, test, list, show 等)
  • Agent 直接调用 MCP 工具执行:
    • build:构建整个 dbt 项目
    • test:运行所有数据质量测试
    • list:统计模型和测试覆盖率
    • show:查看失败测试的详细内容
from mcp import stdio_client, StdioServerParameters
from strands.tools.mcp import MCPClient
from strands import Agent
from strands.models import BedrockModel
from strands_tools import workflow, think

mcp_client = MCPClient(lambda: stdio_client(
    StdioServerParameters(
        command="/dbt-mcp/.venv/bin/dbt-mcp",
        args=[],
        env={}
    )
))

with mcp_client:
    tools = mcp_client.list_tools_sync()
    
    print(f"Available tools: {[tool.tool_name for tool in tools]}")
    
    bedrock_model = BedrockModel(
        model_id="us.amazon.nova-premier-v1:0",
        max_tokens=10000
    )
    
    SYSTEM_PROMPT = """
    You are a data quality agent for the TICKIT ticketing system.

    TICKIT Business Background:
    - Ticketing sales system with users, events, venues, sales entities
    - Key business rules: 15% commission rate, ticket price $10-$500, standardized user names, event time accurate to hour
    - Data quality KPIs: sales record completeness, price rationality, user info standardization, time data accuracy

    Your task: Execute dbt MCP tools, analyze results, and provide precise code-level fix suggestions with exact file paths and line numbers.
    """
    
    agent = Agent(
        system_prompt=SYSTEM_PROMPT, 
        tools=tools + [workflow, think],
        model=bedrock_model
    )
    
    print("Agent messages:", len(agent.messages))

    # Step 1: Execute dbt MCP tools directly (not in workflow)
    print("=== Executing dbt MCP Tools ===")
    dbt_execution = agent("""
    Please execute the following dbt MCP tools in sequence:
    
    1. Use 'build' tool to build the tickit_analytics project
    2. Use 'test' tool to run all data quality tests  
    3. Use 'list' tool to analyze model and test coverage
    4. Use 'show' tool to examine failed test details
    
    Focus on identifying failed tests and provide specific file paths and line numbers for issues.
    Report results in English.
    """)
    
    print(f"dbt MCP Execution Results: {dbt_execution}")

Strands Agents workflow 编排任务

  • workflow 只负责后续分析与报告生成,不直接执行 dbt
  • 编排如下任务:
    • deep_analysis:调用 think 工具,结合业务规则,对失败测试进行多维度分析
    • english_report_generation:生成英文数据质量报告,精确到文件路径和行号
    • chinese_translation:将英文报告翻译成中文,保持技术细节不变
workflow_creation = agent.tool.workflow(
        action="create",
        workflow_id="tickit_dual_report_workflow",
        tasks=[
            {
                "task_id": "deep_analysis",
                "description": "Execute deep analysis using think tool",
                "system_prompt": "Use think tool to analyze TICKIT data quality issues from business and technical perspectives. Provide precise code-level fixes in English.",
                "priority": 4
            },
            {
                "task_id": "english_report_generation",
                "description": "Generate comprehensive English data quality report",
                "dependencies": ["deep_analysis"],
                "system_prompt": "Generate comprehensive English data quality report with exact code fixes, file paths, and line numbers based on dbt execution results and think analysis.",
                "priority": 3
            },
            {
                "task_id": "chinese_translation",
                "description": "Translate English report to Chinese",
                "dependencies": ["english_report_generation"],
                "system_prompt": "Translate the English data quality report to Chinese, maintaining all technical details, file paths, and line numbers exactly as they are. Keep code snippets and technical terms in original form.",
                "priority": 2
            }
        ]
    )

Strands Agents thinks

在 deep_analysis 任务中,think 工具多轮推理,分析:

  • 技术根因(如 SQL 逻辑错误、字段拼接错误)
  • 业务影响(如佣金计算错误导致财务报表失真)
  • 风险优先级和修复建议(如哪些问题影响最大,需优先修复)
context_message = agent(f"""
    Execute the dual reporting workflow with the following dbt execution context:
    
    dbt Results: {dbt_execution}
    
    Please complete all workflow tasks:
    1. Deep analysis using think tool
    2. English report generation with precise code fixes
    3. Chinese translation maintaining technical accuracy
    
    Focus on providing exact file paths and line numbers for all code fixes.
    Generate both English and Chinese versions.
    """)

执行过程

启动 Agent,查看列举了 dbt MCP Server 的工具集,并按步骤进行构建和运行测试

Workflow 中总结 MCP Server 输出,总结思考 test 错误问题

查阅报告

总结及生产建议

  • 最小权限与输入校验,严格限制 Agent 和各工具的权限,仅开放必要的 API 和数据库访问。对所有输入(包括 API、文件、用户数据)进行校验和内容过滤,防止注入和越权风险。
  • 容器化和弹性部署,推荐使用 Docker 容器部署 Strands Agent,结合 Fargate、EKS 等平台实现弹性伸缩和高可用。支持微服务架构,Agent 与工具可独立部署与扩展。
  • 监控与日志审计,集成 OpenTelemetry、CloudWatch 等工具,实时监控 token 消耗、响应延迟、错误率和工具调用情况。启用详细审计日志,追踪所有 Agent 行为,支持安全合规和问题溯源。
  • 与 dbt 深度集成,流程自动化,明确配置 dbt 项目路径、profiles 和目标环境,确保 Agent 能自动化调度 dbt build/test 等命令。利用 dbt 测试和模型元数据,结合 Agent 智能分析,实现端到端的数据质量自动检测与修复建议

*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。

延伸阅读

本篇作者

张鑫

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的解决方案咨询和架构设计,在软件系统架构、数仓和实时及离线计算领域有丰富的研发和架构经验。致力于结合数据开源软件与亚马逊云科技云原生服务组件构建高可用数据架构的实践探索。