第一章:什么是Agent?——从生活例子到AI Agent

AI Agent 入门 12 讲

 


 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

开篇:你早就见过"Agent"

想象一下这样的场景:

你走进一家房产中介公司,对经纪人小王说:"我想在市中心买一套三居室,预算300万,附近有地铁,最好带学区。"

小王会怎么做?

  1. 1. 接收需求:他拿出笔记本,记录下你的预算、地段、户型要求
  2. 2. 分析问题:他在脑海中盘算:"市中心的学区房很抢手,300万可能有点紧张,需要看看周边新盘或者二手房"
  3. 3. 调用资源:他打开房源系统,筛选符合条件的楼盘;打电话给熟悉的开发商销售;查询最新的学区划分政策
  4. 4. 返回结果:三天后,他给你整理了一份详细的看房清单,附带每套房源的优缺点分析

在这个过程中,小王就是一个Agent(代理/中介)——他接收你的需求,独立思考,调动各种资源,最终给你一个完整的解决方案。

类似的例子还有很多:

  • • 保险经纪人:了解你的家庭情况后,帮你对比十几家保险公司的产品,推荐最适合的方案
  • • 律师:听完你的案情,查阅法律条文、检索判例、分析证据,给出专业的法律意见
  • • 旅行社定制师:根据你的预算和喜好,设计一条独一无二的旅行路线,预订机票酒店

这些职业的共同点是:他们不是简单地传递信息,而是主动帮你解决问题


AI Agent的定义:会思考的数字助手

现在,我们把"Agent"这个概念搬到人工智能领域。

AI Agent(智能体)是能够自主感知环境、做出决策、执行行动的智能系统。

听起来有点抽象?让我们拆解一下:

能力房产中介小王AI Agent
感知环境听你说话、看你的表情读取用户输入、监控系统状态、获取外部数据
做出决策分析预算和需求是否匹配判断用户意图、规划执行步骤
执行行动打电话、查系统、约看房调用API、执行代码、生成报告

一个AI Agent就像数字世界里的"小王"——你告诉它一个问题,它会自己思考怎么解决,然后动手去做。


Agent的三个关键词

学术界给Agent下了很多定义,但核心离不开这三个特性:

1. 自主性(Autonomy)

Agent能独立运行,不需要人类一步一步地指导。

就像你委托小王找房后,不需要每天催他"今天查了哪几个小区"——他会主动推进。AI Agent也是如此:你给它一个目标("分析一下上个月的销售数据"),它会自己决定怎么做,不需要你教它"先打开数据库,再写SQL,最后做图表"。

2. 反应性(Reactivity)

Agent能感知环境变化并做出反应

小王带你看房时,如果你说"这个小区太旧了",他会立刻调整方向,推荐新楼盘。AI Agent也一样:如果执行SQL时数据库连接断了,它会感知到这个错误,尝试重连或者换一种方式获取数据。

3. 主动性(Pro-activeness)

Agent不只会被动响应,还会主动采取行动

优秀的小王不会等你问"学区政策变了吗",他会主动提醒你:"最近学区划分有调整,我帮你确认了一下,这个小区还在学区范围内。"AI Agent也能如此:发现数据异常时主动预警,或者在分析完销售数据后主动建议"下个月可以重点推广A产品"。

一句话总结:Agent = 自主性 + 反应性 + 主动性


Agent vs 传统程序:即兴演员 vs 按剧本演戏

你可能想问:Agent和普通程序有什么区别?我写一个脚本也能自动执行任务啊。

好问题!让我们用一个比喻来说明:

传统程序 = 按剧本演戏的演员

剧本写好了每一步:

  1. 1. 走到舞台中央
  2. 2. 说台词A
  3. 3. 转身
  4. 4. 说台词B

演员(程序)严格按剧本执行,不会变通。如果舞台上突然多了把椅子,演员会愣住——剧本里没写怎么处理椅子。

Agent = 即兴演员

即兴演员没有固定剧本,只有一个目标("演一场关于友情的戏")。他会根据现场情况灵活发挥:

  • • 看到椅子,可能把它当成道具坐上去
  • • 搭档忘词了,他自然地接话圆场
  • • 观众笑了,他顺势加一段互动

代码层面的对比

// 传统程序:固定的执行流程
public class TraditionalProgram {
    public void analyzeSales() {
        // 步骤1:连接数据库(写死的配置)
        Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/sales");
        
        // 步骤2:执行固定SQL
        String sql = "SELECT * FROM monthly_sales WHERE month = '2024-01'";
        ResultSet rs = conn.createStatement().executeQuery(sql);
        
        // 步骤3:输出固定格式报告
        System.out.println("月度销售报告:");
        while (rs.next()) {
            System.out.println(rs.getString("product") + ": " + rs.getDouble("amount"));
        }
    }
}

传统程序的问题是:一切都预先写死了。如果用户问"对比2023年和2024年的数据",或者"只看华东区的销售",程序就无能为力了——因为它只会执行那一条固定的SQL。

而Agent的做法完全不同:

// DataAgent的意图识别节点:先理解用户要什么
public class IntentRecognitionNode implements NodeAction {
    
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 1. 获取用户输入(感知)
        String userInput = StateUtil.getStringValue(state, INPUT_KEY);
        log.info("User input for intent recognition: {}", userInput);
        
        // 2. 构建意图识别提示词(思考)
        String prompt = PromptHelper.buildIntentRecognitionPrompt(multiTurn, userInput);
        
        // 3. 调用大模型判断用户意图(决策)
        Flux<ChatResponse> responseFlux = llmService.callUser(prompt);
        
        // 4. 返回识别结果(行动)
        return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, generator);
    }
}

看到区别了吗?Agent不会预先写死SQL,而是:

  1. 1. 先理解用户问的是什么("分析一下上个月的销售数据,按产品分类")
  2. 2. 再决定需要做什么(需要查sales表、按product分组、计算amount总和)
  3. 3. 最后生成对应的SQL并执行

如果用户换个问法:"华东区Q3的销售趋势如何?"Agent会重新理解意图,生成完全不同的执行计划。


Agent vs Chatbot:你派我办 vs 你问我答

还有一个容易混淆的概念:Agent和Chatbot(聊天机器人)有什么区别?

维度ChatbotAgent
交互模式你问我答你派我办
能力范围只说话能动手
记忆能力当前对话跨会话记忆
主动性被动等待提问主动推进任务
典型例子客服机器人、Siri自动驾驶、数据分析Agent

Chatbot是"百科全书":你问它"什么是SQL",它给你解释;你再问"怎么写JOIN",它给你示例。但它不会真的帮你去数据库里查数据。

Agent是"办事员":你说"帮我分析一下用户增长趋势",它会:

  1. 1. 理解你要分析什么指标(DAU?新增用户?留存率?)
  2. 2. 去数据库里找到相关表
  3. 3. 写SQL查询数据
  4. 4. 用Python画图
  5. 5. 生成一份完整的分析报告

整个过程它自己推进,不需要你一步步指导。

类比:Chatbot像电话客服——能解答问题,但办不了事;Agent像你的私人助理——你交代任务,他搞定一切。


DataAgent:一个真实的企业级AI Agent

说了这么多概念,让我们看看真实的AI Agent长什么样。

DataAgent是阿里云开发的企业级智能数据分析Agent。它能听懂你的数据问题,自动写SQL、画图、出报告。

DataAgent的工作流程

当你问DataAgent:"近三个月各产品线的销售额趋势如何?"

DataAgent内部会经历16+个节点的处理:

用户提问
    ↓
[意图识别] → 这是数据分析请求,不是闲聊
    ↓
[证据召回] → 查询相关知识库,了解"销售额"对应哪些表和字段
    ↓
[查询增强] → 把"近三个月"翻译成具体的日期范围
    ↓
[Schema召回] → 找到sales、product、date等相关表
    ↓
[表关系] → 分析这些表怎么关联(sales.product_id = product.id)
    ↓
[可行性评估] → 判断这个问题能不能用现有数据回答
    ↓
[规划] → 制定执行计划:Step1查数据 → Step2画图 → Step3写报告
    ↓
[人机反馈] → (可选)把计划给用户确认
    ↓
[计划执行] → 按步骤执行...
    ↓
[SQL生成] → 自动生成SQL查询语句
    ↓
[语义一致性] → 检查SQL是否符合用户意图
    ↓
[SQL执行] → 连接数据库执行查询
    ↓
[Python生成] → 根据查询结果生成画图代码
    ↓
[Python执行] → 运行代码生成图表
    ↓
[Python分析] → 对数据做深度分析(趋势、异常等)
    ↓
[报告生成] → 整合所有结果,生成最终报告
    ↓
返回给用户:数据表格 + 趋势图 + 分析结论

整个流程完全自动化,你只需要说一句话,剩下的交给Agent。

看看DataAgent的"大脑"——意图识别

DataAgent的第一个节点是IntentRecognitionNode(意图识别节点),它的作用是判断用户想干什么。

/**
 * 意图识别节点,用于识别用户输入是闲聊还是数据分析请求
 */
@Slf4j
@Component
@AllArgsConstructor
public class IntentRecognitionNode implements NodeAction {

    private final LlmService llmService;
    private final JsonParseUtil jsonParseUtil;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 第1步:获取用户输入(感知)
        String userInput = StateUtil.getStringValue(state, INPUT_KEY);
        log.info("User input for intent recognition: {}", userInput);
        
        // 获取多轮对话上下文(如果有历史对话)
        String multiTurn = StateUtil.getStringValue(state, MULTI_TURN_CONTEXT, "(无)");

        // 第2步:构建意图识别提示词(思考准备)
        String prompt = PromptHelper.buildIntentRecognitionPrompt(multiTurn, userInput);
        log.debug("Built intent recognition prompt as follows \n {} \n", prompt);

        // 第3步:调用大模型进行意图识别(决策)
        Flux<ChatResponse> responseFlux = llmService.callUser(prompt);

        // 第4步:处理模型返回的结果(行动)
        Flux<GraphResponse<StreamingOutput>> generator = FluxUtil.createStreamingGenerator(this.getClass(), state,
                responseFlux,
                Flux.just(ChatResponseUtil.createResponse("正在进行意图识别..."),
                        ChatResponseUtil.createPureResponse(TextType.JSON.getStartSign())),
                Flux.just(ChatResponseUtil.createPureResponse(TextType.JSON.getEndSign()),
                        ChatResponseUtil.createResponse("\n意图识别完成!")),
                result -> {
                    // 使用JsonParseUtil解析JSON并转换为IntentRecognitionOutputDTO对象
                    IntentRecognitionOutputDTO intentRecognitionOutput = jsonParseUtil.tryConvertToObject(result,
                            IntentRecognitionOutputDTO.class);
                    return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, intentRecognitionOutput);
                });
        return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, generator);
    }
}

这段代码做了什么?

  1. 1. 感知:从系统状态(OverAllState)中读取用户输入
  2. 2. 思考:把用户输入包装成提示词,交给大模型(LLM)分析
  3. 3. 行动:把大模型的返回结果解析成结构化的意图对象,传递给下一个节点

比如用户说"你好",意图识别节点会判断这是"闲聊",直接返回友好的问候语;用户说"上个月销售额多少",它会判断这是"数据分析请求",继续走后续的数据库查询流程。

这就是Agent的自主性——它能自己判断该走哪条路,不需要人类预先写好所有分支。


本章小结

让我们回顾一下本章的核心概念:

  1. 1. Agent的本质:接收需求 → 分析问题 → 调用资源 → 返回结果。房产中介、保险经纪人都是生活中的Agent。
  2. 2. AI Agent的定义:能够自主感知环境、做出决策、执行行动的智能体。
  3. 3. Agent的三个关键词
    • • 自主性:独立运行,不需要逐步指导
    • • 反应性:感知环境变化并做出反应
    • • 主动性:主动采取行动,不止被动响应
  4. 4. Agent vs 传统程序:传统程序按固定剧本执行,Agent像即兴演员,能根据情况灵活应对。
  5. 5. Agent vs Chatbot:Chatbot是"你问我答"的百科全书,Agent是"你派我办"的办事员。
  6. 6. DataAgent:阿里云的企业级数据分析Agent,包含16+个节点,能自动完成从意图识别到报告生成的完整流程。

思考题

  1. 1. 生活中的Agent:除了本章提到的房产中介、保险经纪人、律师,你还能想到哪些生活中的Agent?它们分别体现了自主性、反应性、主动性中的哪些特性?
  2. 2. 场景辨析:假设你正在使用一个智能音箱。
    • • 你对它说"今天天气怎么样",它回答"今天北京晴,25度"——这是Chatbot还是Agent?为什么?
    • • 你对它说"明天下雨的话,帮我取消下午的户外活动,并给所有参加者发通知",它真的去检查日历、发送邮件——这是Chatbot还是Agent?为什么?
  3. 3. DataAgent的意图识别:在DataAgent的IntentRecognitionNode代码中,如果用户输入是"你好,请问怎么用SQL查重复数据",你觉得意图识别节点会把它归类为"闲聊"还是"数据分析请求"?为什么?(提示:思考这句话的语义重心在哪里)

 


 


第二章:Agent的核心能力——感知、思考、行动

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

餐厅服务员的启示

想象你走进一家高档餐厅,服务员小李接待了你。

迎客(感知)

  • • 看到你穿着正装,判断可能是商务宴请
  • • 注意到你带着生日蛋糕,意识到是生日聚餐
  • • 听到你说"两位",同时看到门口还有朋友在停车

推荐菜品(思考)

  • • 分析:商务宴请需要体面,生日需要惊喜,等人到齐再点菜
  • • 决策:先推荐招牌菜和红酒,等朋友到了再点主菜

上菜(行动)

  • • 把凉菜先上,热菜等朋友到了再下单
  • • 悄悄通知后厨准备长寿面
  • • 朋友到了,主动加椅子、倒茶水

这个过程中,小李展现了Agent的三个核心能力:感知(Perception)、思考(Reasoning/Planning)、行动(Action)

AI Agent也是如此。本章我们就来深入理解这三个能力,并通过DataAgent的真实代码,看看企业级Agent是如何运转的。


感知(Perception):Agent如何接收和理解输入

什么是感知?

感知是Agent与外部世界交互的"感官系统"。就像人类通过眼睛、耳朵、皮肤接收信息,Agent通过各种"接口"接收输入:

人类感官Agent的感知方式例子
眼睛文本输入用户在对话框里打字
耳朵语音输入用户对着手机说话
皮肤系统状态数据库连接是否正常
鼻子环境数据当前时间、用户位置
味觉多模态输入上传的图片、Excel文件

DataAgent的感知系统

DataAgent主要接收文本输入,但它感知的内容远不止用户当前说的话。

// DataAgent的意图识别节点:从状态中获取各种输入信息
public class IntentRecognitionNode implements NodeAction {
    
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 感知1:获取用户当前的输入
        String userInput = StateUtil.getStringValue(state, INPUT_KEY);
        log.info("User input for intent recognition: {}", userInput);
        
        // 感知2:获取多轮对话历史(如果有之前的对话)
        String multiTurn = StateUtil.getStringValue(state, MULTI_TURN_CONTEXT, "(无)");
        
        // 感知3:构建提示词,交给大模型分析
        String prompt = PromptHelper.buildIntentRecognitionPrompt(multiTurn, userInput);
        
        // ...后续处理
    }
}

看到没有?DataAgent不仅感知了用户当前的输入,还感知了对话历史。这意味着:

  • • 用户第一句:"帮我查一下销售数据"
  • • 用户第二句:"只看华东区的"

如果没有多轮对话感知,第二句"只看华东区的"就毫无意义——华东区的什么?但有了对话历史,Agent就知道这是在上一条查询的基础上加过滤条件。

多模态感知

除了文本,现代Agent还能感知更多类型的输入:

  • • 语音:智能音箱把语音转成文字
  • • 图片:用户上传一张报表截图,Agent识别其中的表格数据
  • • 文件:用户上传Excel,Agent读取并分析
  • • 系统状态:Agent感知数据库是否可用、网络是否通畅

关键洞察:Agent的感知能力决定了它的"视野"。感知越丰富,Agent越能理解复杂的上下文。


思考(Reasoning/Planning):Agent如何分析问题

感知到信息后,Agent需要思考。这是Agent最核心、最复杂的能力。

思考的层次

Agent的思考通常分为三个层次:

第一层:意图识别
    "用户到底想干什么?"
    
第二层:任务拆解
    "要完成这个目标,需要分几步?"
    
第三层:细节规划
    "每一步具体怎么做?用什么工具?"

让我们通过DataAgent的真实代码,看看这三个层次是如何实现的。

第一层:意图识别——这是聊天还是数据分析?

DataAgent的第一个思考节点是IntentRecognitionNode

/**
 * 意图识别节点,用于识别用户输入是闲聊还是数据分析请求
 */
@Slf4j
@Component
@AllArgsConstructor
public class IntentRecognitionNode implements NodeAction {

    private final LlmService llmService;
    private final JsonParseUtil jsonParseUtil;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 获取用户输入
        String userInput = StateUtil.getStringValue(state, INPUT_KEY);
        log.info("User input for intent recognition: {}", userInput);

        String multiTurn = StateUtil.getStringValue(state, MULTI_TURN_CONTEXT, "(无)");

        // 构建意图识别提示
        String prompt = PromptHelper.buildIntentRecognitionPrompt(multiTurn, userInput);
        log.debug("Built intent recognition prompt as follows \n {} \n", prompt);

        // 调用LLM进行意图识别
        Flux<ChatResponse> responseFlux = llmService.callUser(prompt);

        Flux<GraphResponse<StreamingOutput>> generator = FluxUtil.createStreamingGenerator(this.getClass(), state,
                responseFlux,
                Flux.just(ChatResponseUtil.createResponse("正在进行意图识别..."),
                        ChatResponseUtil.createPureResponse(TextType.JSON.getStartSign())),
                Flux.just(ChatResponseUtil.createPureResponse(TextType.JSON.getEndSign()),
                        ChatResponseUtil.createResponse("\n意图识别完成!")),
                result -> {
                    // 使用JsonParseUtil解析JSON并转换为IntentRecognitionOutputDTO对象
                    IntentRecognitionOutputDTO intentRecognitionOutput = jsonParseUtil.tryConvertToObject(result,
                            IntentRecognitionOutputDTO.class);
                    return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, intentRecognitionOutput);
                });
        return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, generator);
    }
}

这段代码的核心是第62行llmService.callUser(prompt)——把用户的输入交给大模型(LLM)分析。

大模型会返回一个结构化的结果,比如:

{
  "intent": "data_analysis",
  "confidence": 0.95,
  "description": "用户想要查询销售数据"
}

或者:

{
  "intent": "chitchat",
  "confidence": 0.88,
  "description": "用户在打招呼"
}

基于这个结果,DataAgent决定走哪条路:

  • • 数据分析 → 继续后续的数据库查询流程
  • • 闲聊 → 直接返回友好的回复,结束流程

这就是第一层思考:先搞清楚用户要什么。

第二层:任务拆解——大问题拆成小步骤

确定了意图后,DataAgent进入PlannerNode(规划节点),这是Agent的"大脑司令部"。

/**
 * 规划节点:根据用户需求生成执行计划
 */
@Slf4j
@Component
@AllArgsConstructor
public class PlannerNode implements NodeAction {

    private final LlmService llmService;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 是否为NL2SQL模式(直接翻译SQL,不做复杂分析)
        Boolean onlyNl2sql = state.value(IS_ONLY_NL2SQL, false);

        Flux<ChatResponse> flux = onlyNl2sql ? handleNl2SqlOnly() : handlePlanGenerate(state);
        // ...后续处理
    }

    private Flux<ChatResponse> handlePlanGenerate(OverAllState state) {
        // 获取查询增强节点的输出(经过改写和扩展的用户问题)
        String canonicalQuery = StateUtil.getCanonicalQuery(state);
        log.info("Using processed query for planning: {}", canonicalQuery);

        // 检查是否为修复模式(用户之前否决了计划,需要重新生成)
        String validationError = StateUtil.getStringValue(state, PLAN_VALIDATION_ERROR, null);
        if (validationError != null) {
            log.info("Regenerating plan with user feedback: {}", validationError);
        }
        else {
            log.info("Generating initial plan");
        }

        // 构建提示参数
        String semanticModel = (String) state.value(GENEGRATED_SEMANTIC_MODEL_PROMPT).orElse("");
        SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
        String schemaStr = PromptHelper.buildMixMacSqlDbPrompt(schemaDTO, true);

        // 构建用户提示
        String userPrompt = buildUserPrompt(canonicalQuery, validationError, state);
        String evidence = StateUtil.getStringValue(state, EVIDENCE);

        // 构建模板参数
        BeanOutputConverter<Plan> beanOutputConverter = new BeanOutputConverter<>(Plan.class);
        Map<String, Object> params = Map.of("user_question", userPrompt, "schema", schemaStr, "evidence", evidence,
                "semantic_model", semanticModel, "plan_validation_error", formatValidationError(validationError),
                "format", beanOutputConverter.getFormat());
        // 生成计划
        String plannerPrompt = PromptConstant.getPlannerPromptTemplate().render(params);
        log.debug("Planner prompt: as follows \n{}\n", plannerPrompt);

        // 调用LLM生成计划
        return llmService.callUser(plannerPrompt);
    }
}

PlannerNode的核心任务是生成执行计划。它会输出一个结构化的计划,比如:

{
  "thoughtProcess": "用户想了解近三个月各产品线的销售趋势。我需要:1)查询sales表获取销售数据;2)按产品线和月份分组统计;3)用Python绘制趋势图;4)生成分析报告",
  "executionPlan": [
    {
      "step": 1,
      "toolToUse": "sql_generate",
      "toolParameters": {
        "instruction": "查询近三个月各产品线的销售额,按月份和产品线分组"
      }
    },
    {
      "step": 2,
      "toolToUse": "python_generate",
      "toolParameters": {
        "instruction": "根据SQL查询结果,绘制各产品线近三个月的销售趋势折线图"
      }
    },
    {
      "step": 3,
      "toolToUse": "report_generator",
      "toolParameters": {
        "summaryAndRecommendations": "总结销售趋势,指出增长最快的产品线和潜在问题"
      }
    }
  ]
}

看到没有?Agent把一个大问题拆解成了三个小步骤:

  1. 1. 先查数据(SQL)
  2. 2. 再画图(Python)
  3. 3. 最后写报告

这就是第二层思考:把复杂任务拆解成可执行的步骤。

第三层:查询增强——让问题更清晰

在规划之前,DataAgent还有一个重要的思考节点:QueryEnhanceNode(查询增强节点)

/**
 * 查询增强节点:根据evidence信息把业务术语翻译、查询改写、扩展
 */
@Slf4j
@Component
@AllArgsConstructor
public class QueryEnhanceNode implements NodeAction {

    private final LlmService llmService;
    private final JsonParseUtil jsonParseUtil;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 获取用户输入
        String userInput = StateUtil.getStringValue(state, INPUT_KEY);
        log.info("User input for query enhance: {}", userInput);

        // 获取证据信息(从知识库召回的相关信息)
        String evidence = StateUtil.getStringValue(state, EVIDENCE);
        // 获取多轮对话上下文
        String multiTurn = StateUtil.getStringValue(state, MULTI_TURN_CONTEXT, "(无)");

        // 构建查询处理提示
        String prompt = PromptHelper.buildQueryEnhancePrompt(multiTurn, userInput, evidence);
        log.debug("Built query enhance prompt as follows \n {} \n", prompt);

        // 调用LLM进行查询处理
        Flux<ChatResponse> responseFlux = llmService.callUser(prompt);

        // ...后续处理
    }
}

QueryEnhanceNode的作用是什么?

假设用户问:"上个月GMV怎么样?"

Agent可能不知道"GMV"是什么。但通过查询增强:

  1. 1. 从知识库召回证据:"GMV = Gross Merchandise Volume = 商品交易总额 = order表中的amount字段总和"
  2. 2. 把用户的问题改写成:"统计上个月(2024年1月)所有订单的amount字段总和"

这就是第三层思考:把模糊的用户语言翻译成精确的技术语言。

思考的本质:大模型是Agent的"大脑"

你会发现,DataAgent的每个思考节点都调用了llmService.callUser()——大模型(LLM)是Agent思考能力的核心来源

但Agent不只是简单地调用大模型。它通过提示词工程(Prompt Engineering)工作流编排,让大模型在合适的时机做合适的事:

节点大模型的任务输入输出
意图识别分类用户问题意图类型
查询增强翻译/改写用户问题 + 知识库证据规范化查询
规划拆解任务规范化查询 + 数据库Schema执行计划
SQL生成写代码执行步骤 + SchemaSQL语句
语义一致性检查SQL + 用户问题通过/不通过
报告生成写作所有执行结果分析报告

关键洞察:Agent的思考能力 = 大模型的推理能力 + 精心设计的提示词 + 清晰的工作流分工


行动(Action):Agent如何执行和输出

思考完成后,Agent需要行动——把想法变成现实。

行动的类型

Agent的行动可以分为几类:

行动类型说明DataAgent中的例子
工具调用调用外部API或工具查询数据库、调用大模型
代码执行运行程序代码执行SQL、运行Python
内容生成创造新的内容生成报告、绘制图表
状态更新修改内部状态记录执行结果、更新对话历史
人机交互与人类互动请求确认、展示中间结果

DataAgent的行动节点

让我们看看DataAgent中几个典型的行动节点。

行动1:SQL执行

/**
 * SQL执行节点:执行SQL查询并处理结果
 */
@Slf4j
@Component
@AllArgsConstructor
public class SqlExecuteNode implements NodeAction {

    private final DatabaseUtil databaseUtil;
    private final Nl2SqlService nl2SqlService;
    private final LlmService llmService;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        Integer currentStep = PlanProcessUtil.getCurrentStepNumber(state);
        
        // 获取生成的SQL
        String sqlQuery = StateUtil.getStringValue(state, SQL_GENERATE_OUTPUT);
        sqlQuery = nl2SqlService.sqlTrim(sqlQuery);
        log.info("Executing SQL query: {}", sqlQuery);

        // 获取Agent ID,动态获取数据源配置
        String agentIdStr = StateUtil.getStringValue(state, Constant.AGENT_ID);
        Long agentId = Long.valueOf(agentIdStr);
        DbConfigBO dbConfig = databaseUtil.getAgentDbConfig(agentId);

        return executeSqlQuery(state, currentStep, sqlQuery, dbConfig, agentId);
    }

    private Map<String, Object> executeSqlQuery(OverAllState state, Integer currentStep, String sqlQuery,
            DbConfigBO dbConfig, Long agentId) {
        // 准备查询参数
        DbQueryParameter dbQueryParameter = new DbQueryParameter();
        dbQueryParameter.setSql(sqlQuery);
        dbQueryParameter.setSchema(dbConfig.getSchema());

        Accessor dbAccessor = databaseUtil.getAgentAccessor(agentId);
        final Map<String, Object> result = new HashMap<>();

        // 执行SQL查询并获取结果
        ResultSetBO resultSetBO = dbAccessor.executeSqlAndReturnObject(dbConfig, dbQueryParameter);
        
        // 调用大模型获取图表配置信息
        DisplayStyleBO displayStyleBO = enrichResultSetWithChartConfig(state, resultSetBO);
        
        // 更新步骤结果
        Map<String, String> existingResults = StateUtil.getObjectValue(state, SQL_EXECUTE_NODE_OUTPUT,
                Map.class, new HashMap<>());
        Map<String, String> updatedResults = PlanProcessUtil.addStepResult(existingResults, currentStep,
                strResultSetJson);

        // 准备返回结果
        result.putAll(Map.of(SQL_EXECUTE_NODE_OUTPUT, updatedResults, SQL_REGENERATE_REASON,
                SqlRetryDto.empty(), SQL_RESULT_LIST_MEMORY, resultSetBO.getData(), PLAN_CURRENT_STEP,
                currentStep + 1, SQL_GENERATE_COUNT, 0));
        
        return result;
    }
}

SqlExecuteNode的行动流程:

  1. 1. 从状态中获取要执行的SQL
  2. 2. 动态获取数据库连接配置(不同Agent可能连接不同数据库)
  3. 3. 执行SQL查询
  4. 4. 调用大模型分析结果,生成图表配置(比如判断用柱状图还是折线图)
  5. 5. 把结果存入状态,供后续节点使用

这就是工具调用 + 代码执行的行动。

行动2:Python代码执行

/**
 * Python执行节点:运行Python代码获取分析结果
 */
@Slf4j
@Component
public class PythonExecuteNode implements NodeAction {

    private final CodePoolExecutorService codePoolExecutor;
    private final ObjectMapper objectMapper;
    private final JsonParseUtil jsonParseUtil;
    private final CodeExecutorProperties codeExecutorProperties;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 获取生成的Python代码
        String pythonCode = StateUtil.getStringValue(state, PYTHON_GENERATE_NODE_OUTPUT);
        // 获取SQL执行结果作为输入数据
        List<Map<String, String>> sqlResults = StateUtil.hasValue(state, SQL_RESULT_LIST_MEMORY)
                ? StateUtil.getListValue(state, SQL_RESULT_LIST_MEMORY) : new ArrayList<>();

        // 检查重试次数
        int triesCount = StateUtil.getObjectValue(state, PYTHON_TRIES_COUNT, Integer.class, 0);

        // 构建执行任务
        CodePoolExecutorService.TaskRequest taskRequest = new CodePoolExecutorService.TaskRequest(pythonCode,
                objectMapper.writeValueAsString(sqlResults), null);

        // 运行Python代码
        CodePoolExecutorService.TaskResponse taskResponse = this.codePoolExecutor.runTask(taskRequest);
        
        if (!taskResponse.isSuccess()) {
            // 执行失败,检查是否超过最大重试次数
            if (triesCount >= codeExecutorProperties.getPythonMaxTriesCount()) {
                // 超过重试次数,启动降级策略
                return handleFallback(state, taskResponse);
            }
            // 抛出异常,触发重试
            throw new RuntimeException("Python execution failed");
        }

        // 执行成功,处理输出结果
        String stdout = taskResponse.stdOut();
        // Python输出的JSON字符串可能有Unicode转义,解析回汉字
        Object value = jsonParseUtil.tryConvertToObject(stdout, Object.class);
        if (value != null) {
            stdout = objectMapper.writeValueAsString(value);
        }

        return Map.of(PYTHON_EXECUTE_NODE_OUTPUT, stdout, PYTHON_IS_SUCCESS, true);
    }
}

PythonExecuteNode的行动流程:

  1. 1. 获取PythonGenerateNode生成的Python代码
  2. 2. 获取SQL执行结果作为输入数据
  3. 3. 在隔离环境中执行Python代码(可能是Docker容器,保证安全)
  4. 4. 如果失败,自动重试(最多3次)
  5. 5. 如果一直失败,启动降级策略(跳过Python分析,继续后续流程)

这就是代码执行 + 错误处理 + 降级策略的行动。

行动3:报告生成

/**
 * 报告生成节点:创建综合分析报告
 */
@Slf4j
@Component
public class ReportGeneratorNode implements NodeAction {

    private final LlmService llmService;
    private final UserPromptService promptConfigService;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 获取必要输入
        String plannerNodeOutput = StateUtil.getStringValue(state, PLANNER_NODE_OUTPUT);
        String userInput = StateUtil.getCanonicalQuery(state);
        Integer currentStep = StateUtil.getObjectValue(state, PLAN_CURRENT_STEP, Integer.class, 1);
        
        @SuppressWarnings("unchecked")
        HashMap<String, String> executionResults = StateUtil.getObjectValue(state, SQL_EXECUTE_NODE_OUTPUT,
                HashMap.class, new HashMap<>());

        // 解析计划,获取当前步骤
        Plan plan = converter.convert(plannerNodeOutput);
        ExecutionStep executionStep = getCurrentExecutionStep(plan, currentStep);
        String summaryAndRecommendations = executionStep.getToolParameters().getSummaryAndRecommendations();

        // 生成报告流
        Flux<ChatResponse> reportGenerationFlux = generateReport(userInput, plan, executionResults,
                summaryAndRecommendations, agentId);

        // ...返回结果
    }

    private Flux<ChatResponse> generateReport(String userInput, Plan plan, HashMap<String, String> executionResults,
            String summaryAndRecommendations, Long agentId) {
        // 构建用户需求和计划描述
        String userRequirementsAndPlan = buildUserRequirementsAndPlan(userInput, plan);
        
        // 构建分析步骤和数据结果描述
        String analysisStepsAndData = buildAnalysisStepsAndData(plan, executionResults);

        // 获取优化配置
        List<UserPromptConfig> optimizationConfigs = promptConfigService.getOptimizationConfigs("report-generator",
                agentId);

        // 构建报告生成提示词
        String reportPrompt = PromptHelper.buildReportGeneratorPromptWithOptimization(userRequirementsAndPlan,
                analysisStepsAndData, summaryAndRecommendations, optimizationConfigs);
        
        // 调用大模型生成报告
        return llmService.callUser(reportPrompt);
    }
}

ReportGeneratorNode的行动流程:

  1. 1. 收集所有前置节点的执行结果(SQL查询结果、Python分析结果、图表配置)
  2. 2. 构建完整的提示词,包含用户需求、执行计划、所有数据结果
  3. 3. 调用大模型生成结构化的分析报告
  4. 4. 清空中间状态,准备迎接下一个用户请求

这就是内容生成的行动——把所有中间结果整合成一份人类可读的报告。


三者闭环:感知→思考→行动→(反馈)→感知...

Agent的真正威力在于:感知、思考、行动不是一次性的,而是一个持续运转的闭环

        ┌─────────────┐
        │   感知环境   │
        │  (用户输入)  │
        └──────┬──────┘
               │
               ▼
        ┌─────────────┐
        │   思考决策   │
        │ (意图识别、  │
        │  任务规划)   │
        └──────┬──────┘
               │
               ▼
        ┌─────────────┐
        │   执行行动   │
        │ (SQL查询、   │
        │ Python执行)  │
        └──────┬──────┘
               │
               ▼
        ┌─────────────┐
        │   观察结果   │
        │ (成功?失败?│
        │  需要调整?) │
        └──────┬──────┘
               │
               └────────→ 回到"感知",继续下一轮

DataAgent中的闭环体现

DataAgent的工作流完美体现了这个闭环:

第一轮循环(感知→思考→行动→观察)

  1. 1. 感知:用户问"近三个月销售趋势如何?"
  2. 2. 思考:意图识别→查询增强→规划(3个步骤:查数据、画图、写报告)
  3. 3. 行动:SQL生成节点写出SQL
  4. 4. 观察:语义一致性节点检查SQL是否正确

第二轮循环(根据反馈调整)

  • • 如果SQL检查通过 → 继续执行SQL
  • • 如果SQL检查不通过 → 回到SQL生成节点,重新生成

第三轮循环(执行和再调整)

  1. 1. 感知:SQL执行成功,拿到了数据
  2. 2. 思考:Python生成节点分析数据特点,决定画什么图
  3. 3. 行动:Python执行节点运行代码
  4. 4. 观察:如果Python代码报错,回到Python生成节点重试

这个闭环可以一直运转,直到任务完成或者达到最大重试次数。

关键洞察:Agent的智能不仅体现在单次思考的质量,更体现在闭环运转的鲁棒性——出错时能感知、能调整、能恢复。


本章小结

  1. 1. 感知(Perception):Agent通过文本、语音、系统状态等多种方式接收输入。DataAgent不仅感知用户当前输入,还感知对话历史、知识库证据、数据库Schema等上下文信息。
  2. 2. 思考(Reasoning/Planning):Agent的思考分为三个层次:
    • • 意图识别:判断用户想干什么
    • • 任务拆解:把大问题拆成小步骤
    • • 细节规划:每一步具体怎么做
      DataAgent通过QueryEnhanceNode、PlannerNode等节点实现分层思考。
  3. 3. 行动(Action):Agent的行动包括工具调用、代码执行、内容生成等。DataAgent的SqlExecuteNode执行SQL查询,PythonExecuteNode运行Python代码,ReportGeneratorNode生成最终报告。
  4. 4. 闭环运转:感知→思考→行动→观察反馈→再感知... Agent的智能体现在能根据执行结果不断调整,直到任务完成。
  5. 5. 大模型是大脑,工作流是骨架:Agent的思考能力依赖大模型,但仅靠大模型不够。DataAgent通过16+个节点的精心编排,让大模型在合适的时机做合适的事,形成完整的智能体。

思考题

  1. 1. 感知的边界:在DataAgent中,IntentRecognitionNode不仅读取用户输入,还读取MULTI_TURN_CONTEXT(多轮对话历史)。假设用户连续提问:

    如果没有多轮对话感知,第二句会出现什么问题?Agent会怎么误解用户的意图?

    • • 第一句:"查一下上个月的销售额"
    • • 第二句:"再对比一下前年的"
  2. 2. 思考的分层:DataAgent的PlannerNode会生成一个包含多个步骤的执行计划。假设用户问:"分析近半年各产品线的销售趋势,找出增长最快的产品,并预测下个月的销量"。

    你觉得PlannerNode会生成几个步骤?每个步骤分别调用什么工具(sql_generate、python_generate、report_generator)?为什么?

  3. 3. 闭环的鲁棒性:PythonExecuteNode在执行失败时会重试,超过最大次数后启动降级策略。假设你在使用DataAgent分析数据,Python代码因为数据格式问题一直执行失败,Agent进入降级模式。作为用户,你希望Agent如何告诉你这个情况?是直接说"Python分析失败",还是更详细地说明"由于数据包含异常值,自动分析暂时不可用,这是基础统计结果..."?哪种方式更体现Agent的"智能"?

 



第三章:LLM是Agent的大脑——大语言模型的角色

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

Agent就像一个人

想象一下,你面前站着一个"数据分析助手"。他能听懂你的问题,会查数据库,会写代码,还会画图做报告。这个助手为什么这么厉害?

如果把Agent比作一个人:

  • • 工具(SQL、Python) 是他的手脚——负责干活
  • • 工作流(StateGraph) 是他的神经系统——负责协调
  • • LLM(大语言模型) 是他的大脑皮层——负责思考、理解、决策

没有手脚,人无法行动;没有神经系统,手脚会乱成一团;但没有大脑,人就只是一具空壳。LLM在Agent中扮演的正是这个"大脑"的角色。

本章将带你理解:LLM到底是什么?它有哪些核心能力?DataAgent如何利用LLM来驱动整个数据分析流程?


3.1 什么是LLM(大语言模型)

3.1.1 简单比喻:一个读了海量书籍的"学霸"

LLM(Large Language Model,大语言模型) 本质上是一个经过特殊训练的计算机程序。你可以把它想象成一个"超级学霸":

  • • 他读过互联网上数以亿计的书籍、文章、代码、对话记录
  • • 在这个过程中,他学会了语言的规律、语法、逻辑,以及各行各业的知识
  • • 当你问他问题时,他不是"搜索"答案,而是基于学到的知识"生成"答案

就像人类大脑通过神经元连接来存储和加工信息,LLM通过数以千亿计的参数(可以理解为虚拟的"神经元连接")来理解和生成语言。

小知识:GPT-4、Claude、通义千问、文心一言等都是LLM的具体产品。DataAgent支持接入多种LLM,通过统一接口调用。

3.1.2 LLM不是搜索引擎

很多初学者容易混淆LLM和搜索引擎的区别:

特性搜索引擎(如百度、Google)LLM(如ChatGPT)
工作方式检索已有网页,返回链接基于知识生成新回答
对问题的理解关键词匹配语义理解,能处理复杂表达
推理能力能进行多步逻辑推理
知识时效性实时(最新网页)有截止日期(训练数据的截止时间)
创造性能创作文章、写代码、生成方案

举个例子:

  • • 你问搜索引擎"2024年Q3销售额环比下降的原因",它返回的是包含这些关键词的网页链接
  • • 你问LLM同样的问题,它会分析"销售额"、"环比下降"、"原因"这些概念之间的关系,给出一个结构化的分析框架

3.2 LLM的四大核心能力

LLM之所以能成为Agent的"大脑",是因为它具备四种关键能力:

3.2.1 理解能力(Understanding)

LLM能理解人类语言的真正含义,而不只是字面意思。

例子

  • • 用户说:"上个月北京卖得怎么样?"
  • • LLM理解:"上个月"=上一个月,"北京"=地区维度,"卖得怎么样"=销售额/销量指标
  • • 它还能理解这是口语化表达,需要转化为正式的数据查询

在DataAgent中,QueryEnhanceNode就利用LLM的理解能力,把用户的口语化问题改写成规范的数据分析查询。

3.2.2 生成能力(Generation)

LLM能根据需求生成高质量的文本、代码、结构化数据。

例子

  • • 需求:"查询2024年北京地区各月的销售额"
  • • LLM生成:SELECT month, SUM(amount) FROM sales WHERE city='北京' AND year=2024 GROUP BY month

在DataAgent中,SqlGenerateNode利用LLM的生成能力,根据自然语言描述自动编写SQL语句。

3.2.3 推理能力(Reasoning)

LLM能进行多步逻辑推理,把复杂问题拆解成可执行的步骤。

例子

  • • 用户问:"分析近一年各地区销售趋势,找出增长最快的前3个地区,并预测下季度表现"
  • • LLM推理过程:
    1. 1. 先提取近一年的销售数据(SQL查询)
    2. 2. 计算各地区增长率(Python计算)
    3. 3. 排序找出前3名(Python排序)
    4. 4. 基于历史趋势做预测(Python预测模型)
    5. 5. 汇总成报告(文本生成)

在DataAgent中,PlannerNode利用LLM的推理能力,把用户的复杂问题拆解成一步一步的执行计划。

3.2.4 总结能力(Summarization)

LLM能从大量信息中提取关键要点,生成简洁的总结。

例子

  • • 输入:1000行销售数据 + 5个分析图表 + 统计指标
  • • LLM总结:"华东地区Q3销售额同比增长23%,主要驱动力来自新客户增长(+35%),但客户留存率下降5%需关注。"

在DataAgent中,ReportGeneratorNode利用LLM的总结能力,把分析结果转化为可读的业务报告。


3.3 LLM如何驱动DataAgent

现在我们来看DataAgent中LLM的实际工作流程。想象一下用户问了一个问题:"分析上季度各产品线的利润情况"。

步骤1:理解用户意图

用户输入:"分析上季度各产品线的利润情况"
    ↓
IntentRecognitionNode(意图识别)调用LLM
    ↓
LLM判断:这是一个"数据分析"意图,需要查询数据库并生成报告

LLM不是盲目执行,而是先"想清楚"用户想要什么。是查数据?还是生成报告?还是需要预测?

步骤2:生成执行计划

PlannerNode(计划生成)调用LLM
    ↓
LLM分析:
  - 需要哪些表?products、sales、costs
  - 需要几步?
    Step 1: SQL查询各产品线收入和成本
    Step 2: Python计算利润率和排名
    Step 3: 生成分析报告
    ↓
输出:Plan对象(JSON格式)

这就是LLM的推理能力在发挥作用——把大问题拆成小步骤。

步骤3:编写SQL/Python代码

SqlGenerateNode调用LLM
    ↓
LLM根据Plan中的指令生成SQL:
  SELECT product_line, SUM(revenue) as total_revenue, 
         SUM(cost) as total_cost
  FROM sales s JOIN costs c ON s.product_id = c.product_id
  WHERE quarter = '2024Q3'
  GROUP BY product_line

这里用到的是LLM的生成能力。LLM学过大量的SQL语法和数据库知识,所以能写出正确的查询语句。

步骤4:总结分析结果

ReportGeneratorNode调用LLM
    ↓
输入:SQL查询结果 + Python分析结果
    ↓
LLM生成报告:
  "上季度各产品线利润分析:
   1. 电子产品线利润最高(¥1,200万,利润率28%)
   2. 家居产品线利润增长最快(环比+15%)
   3. 服装产品线利润率下降,需关注成本控制"

这是LLM的总结能力——把冷冰冰的数据变成有洞察力的业务语言。


3.4 DataAgent中的LLM调用抽象

DataAgent没有直接硬编码某个LLM的调用方式,而是定义了一个统一的接口:LlmService

3.4.1 LlmService接口设计

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/LlmService.java

public interface LlmService {

    // 同时传入系统提示词和用户提示词
    Flux<ChatResponse> call(String system, String user);

    // 只传入系统提示词
    Flux<ChatResponse> callSystem(String system);

    // 只传入用户提示词
    Flux<ChatResponse> callUser(String user);

    // 将流式响应转换为字符串流
    default Flux<String> toStringFlux(Flux<ChatResponse> responseFlux) {
        return responseFlux.map(ChatResponseUtil::getText);
    }
}

设计亮点

  • • 统一接口:无论底层是通义千问、GPT-4还是其他模型,上层代码都用同样的方式调用
  • • 流式返回Flux<ChatResponse>表示响应是流式的,用户可以实时看到LLM的输出,不用干等
  • • 三种调用模式:同时传system+user、只传system、只传user,覆盖不同场景

3.4.2 系统提示词 vs 用户提示词

在调用LLM时,DataAgent区分两种提示词:

类型作用例子
System Prompt(系统提示词)定义Agent的角色、能力、约束"你是一个数据分析专家,擅长SQL和Python..."
User Prompt(用户提示词)携带具体的任务内容"查询2024年北京地区的销售额"

这种分离的好处是:系统提示词定义"你是谁",用户提示词定义"你要做什么"。当用户连续问多个问题时,系统提示词可以复用,只替换用户提示词即可。


3.5 Prompt Engineering基础

3.5.1 什么是Prompt

Prompt(提示词) 就是你给LLM的"任务说明书"。就像你给下属布置任务时需要说清楚要求,给LLM的Prompt也需要清晰明确。

一个差的Prompt:

"查一下销售数据"

问题:太模糊。LLM不知道要查哪个表、哪些字段、什么时间段。

一个好的Prompt:

角色:你是一位数据分析专家,精通SQL查询。

任务:根据下面的数据库Schema,编写SQL查询。

Schema:
- sales表:id, product_name, amount, sale_date, region

要求:
1. 查询2024年北京地区的销售总额
2. 按月份分组
3. 返回月份和销售额两列

输出格式:只返回SQL代码,不要解释。

3.5.2 好的Prompt = 清晰的角色 + 明确的任务 + 具体的格式要求

DataAgent的Prompt设计遵循这个公式:

1. 清晰的角色

你是一位拥有深厚业务洞察力的高级数据分析专家。
你的核心职责是解析用户的业务问题,并基于给定的数据库Schema,
制定一个严谨、可执行的分步执行计划。

2. 明确的任务

核心任务流程:
1. 解构需求:深入分析用户问题,明确核心业务目标
2. Schema验证:确认计划查询的字段在表中真实存在
3. 制定策略:创建逻辑清晰的多步计划
4. 生成计划:输出JSON对象

3. 具体的格式要求

输出格式(必须是合法的JSON):
{
  "thought_process": "分析思路...",
  "execution_plan": [
    {"step": 1, "tool_to_use": "SQL_GENERATE_NODE", ...}
  ]
}

3.6 DataAgent中的Prompt设计实战

3.6.1 系统Prompt定义Agent角色

DataAgent的PlannerNode使用了一个精心设计的系统Prompt模板(planner.txt),定义了Agent的角色和能力边界:

# 角色:高级数据分析智能体 (Senior Data Analysis Agent)

你是一位拥有深厚业务洞察力的高级数据分析专家。你的核心职责是解析用户的业务问题,
并基于给定的数据库Schema,制定一个严谨、可执行的分步执行计划(Plan)。

你需要决定在每一步使用什么工具,从而将模糊的业务问题转化为最终的洞察结论。

**重要原则:你必须且只能输出一个合法的JSON对象。严禁包含markdown标记、注释或任何JSON结构之外的文本。**

# 可用工具 (Available Tools)

## 1. SQL_GENERATE_NODE
- 核心用途:执行SQL查询以提取或聚合数据
- 参数:instruction(详细的SQL需求描述)

## 2. PYTHON_GENERATE_NODE
- 核心用途:当SQL难以满足需求时使用
- 适用于:复杂逻辑计算、数据清洗、高级统计分析、图表绘制

## 3. REPORT_GENERATOR_NODE
- 核心用途:流程的最后一步,整合所有输出,总结发现并给出建议

这个Prompt的精妙之处:

  • • 角色定位清晰:"高级数据分析专家"——LLM会调用相关知识来生成更专业的计划
  • • 能力边界明确:只能用三种工具,不能天马行空
  • • 输出格式强制:必须输出合法JSON,方便程序解析

3.6.2 Prompt模板文件的位置

DataAgent的所有Prompt模板都放在 resources/prompts/ 目录下:

resources/prompts/
  ├── planner.txt              # 计划生成Prompt
  ├── new-sql-generate.txt     # SQL生成Prompt
  ├── python-generator.txt     # Python代码生成Prompt
  ├── report-generator-plain.txt # 报告生成Prompt
  ├── intent-recognition.txt   # 意图识别Prompt
  ├── query-enhancement.txt    # 查询增强Prompt
  └── ...

这种模板化设计的优势:

  • • 修改Prompt不需要改代码,只需编辑文本文件
  • • 不同场景(SQL生成、Python生成、报告生成)有独立的Prompt,互不干扰
  • • 可以通过配置动态切换Prompt,实现A/B测试

3.6.3 PlannerNode中的Prompt构建

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PlannerNode.java

public class PlannerNode implements NodeAction {

    private final LlmService llmService;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 获取用户查询
        String canonicalQuery = StateUtil.getCanonicalQuery(state);
        
        // 获取数据库Schema信息
        SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
        String schemaStr = PromptHelper.buildMixMacSqlDbPrompt(schemaDTO, true);
        
        // 获取证据信息(RAG检索结果)
        String evidence = StateUtil.getStringValue(state, EVIDENCE);
        
        // 构建模板参数
        BeanOutputConverter<Plan> beanOutputConverter = new BeanOutputConverter<>(Plan.class);
        Map<String, Object> params = Map.of(
            "user_question", canonicalQuery,
            "schema", schemaStr,
            "evidence", evidence,
            "format", beanOutputConverter.getFormat()
        );
        
        // 渲染Prompt模板
        String plannerPrompt = PromptConstant.getPlannerPromptTemplate().render(params);
        
        // 调用LLM生成计划
        return llmService.callUser(plannerPrompt);
    }
}

关键流程

  1. 1. 从State中获取上下文(用户问题、Schema、证据)
  2. 2. 用BeanOutputConverter生成JSON格式说明(告诉LLM应该输出什么结构)
  3. 3. 将变量填充到Prompt模板中
  4. 4. 调用llmService.callUser()执行LLM推理

3.7 结构化输出:让LLM返回JSON

3.7.1 为什么需要结构化输出

LLM天生擅长生成自然语言,但Agent需要程序可解析的数据结构。如果LLM的输出是:

"我觉得应该先查sales表,然后算一下各地区的销售额..."

程序很难从中提取"要查什么表"、"要做什么计算"。但如果LLM输出的是JSON:

{
  "thought_process": "先查sales表获取原始数据,再按地区分组计算销售额",
  "execution_plan": [
    {"step": 1, "tool_to_use": "SQL_GENERATE_NODE", "tool_parameters": {"instruction": "查询sales表..."}}
  ]
}

程序就能直接解析,知道第一步要调用SQL_GENERATE_NODE,参数是什么。

3.7.2 DataAgent的结构化输出设计

DataAgent定义了两个核心的结构化输出DTO:

Plan(执行计划)

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/dto/planner/Plan.java

@Data
public class Plan {

    @JsonProperty("thought_process")
    @JsonPropertyDescription("简要描述你的分析思路。必须明确提到你检查了哪些表和字段")
    private String thoughtProcess;

    @JsonProperty("execution_plan")
    @JsonPropertyDescription("执行计划的步骤列表")
    private List<ExecutionStep> executionPlan;
}

ExecutionStep(执行步骤)

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/dto/planner/ExecutionStep.java

@Data
public class ExecutionStep {

    @JsonProperty("step")
    @JsonPropertyDescription("步骤顺序号")
    private int step;

    @JsonProperty("tool_to_use")
    @JsonPropertyDescription("工具名称")
    private String toolToUse;

    @JsonProperty("tool_parameters")
    @JsonPropertyDescription("工具参数")
    private ToolParameters toolParameters;

    @Data
    @JsonInclude(JsonInclude.Include.NON_NULL)  // 忽略null值,让JSON更干净
    public static class ToolParameters {

        @JsonProperty("instruction")
        @JsonPropertyDescription("当工具是SQL_GENERATE_NODE时,填写详细的SQL需求")
        private String instruction;

        @JsonProperty("sql_query")
        @JsonPropertyDescription("SQL_GENERATE_NODE运行完后,会把生成的SQL塞进来")
        private String sqlQuery;

        @JsonProperty("summary_and_recommendations")
        @JsonPropertyDescription("REPORT_GENERATOR_NODE专用,报告的大纲")
        private String summaryAndRecommendations;
    }
}

3.7.3 如何让LLM输出JSON

DataAgent使用了Spring AI的BeanOutputConverter来实现结构化输出:

// 在PlannerNode中
BeanOutputConverter<Plan> beanOutputConverter = new BeanOutputConverter<>(Plan.class);

// 将JSON格式说明注入Prompt
Map<String, Object> params = Map.of(
    // ... 其他参数
    "format", beanOutputConverter.getFormat()
);

beanOutputConverter.getFormat()会生成这样的格式说明:

你的响应必须是以下JSON格式:
{
  "thought_process": "string类型,简要描述你的分析思路",
  "execution_plan": [
    {
      "step": "integer类型,步骤顺序号",
      "tool_to_use": "string类型,工具名称",
      "tool_parameters": {
        "instruction": "string类型,详细的SQL需求"
      }
    }
  ]
}

这样,LLM就知道"我要输出什么结构",而不是自由发挥写一段话。

3.7.4 一个真实的Plan示例

当用户问"分析极曜汽车近一年的购车线索转化质量"时,LLM生成的Plan可能是:

{
  "thought_process": "用户想要分析线索转化质量。我已确认leads_table表包含渠道、区域、时间以及各转化阶段字段。计划:先按渠道和区域提取转化漏斗数据,再用Python计算转化率和排名,最后生成报告。",
  "execution_plan": [
    {
      "step": 1,
      "tool_to_use": "SQL_GENERATE_NODE",
      "tool_parameters": {
        "instruction": "按渠道来源分组,查询近一年的线索转化漏斗核心指标(留资数、到店数、试驾数、下定数、交车数)"
      }
    },
    {
      "step": 2,
      "tool_to_use": "SQL_GENERATE_NODE",
      "tool_parameters": {
        "instruction": "按地理区域(省份、城市)分组,查询近一年的线索转化漏斗核心指标"
      }
    },
    {
      "step": 3,
      "tool_to_use": "PYTHON_GENERATE_NODE",
      "tool_parameters": {
        "instruction": "基于步骤1和步骤2的结果,计算各地区转化率,识别Top 5高转化率城市和Top 5低转化率城市"
      }
    },
    {
      "step": 4,
      "tool_to_use": "REPORT_GENERATOR_NODE",
      "tool_parameters": {
        "summary_and_recommendations": "总结高转化率渠道和区域的共同特征,指出转化漏斗瓶颈,提出优化建议"
      }
    }
  ]
}

这个JSON被程序解析后,PlanExecutorNode就会按顺序执行这4个步骤。


3.8 本章小结

本章我们学习了LLM在Agent中的核心地位:

  1. 1. LLM是Agent的大脑:负责理解、推理、生成和总结
  2. 2. 四大核心能力
    • • 理解能力:听懂用户的真实意图
    • • 生成能力:写出SQL、Python代码和报告
    • • 推理能力:把复杂问题拆解成执行步骤
    • • 总结能力:从数据中提取业务洞察
  3. 3. Prompt Engineering:好的Prompt = 清晰的角色 + 明确的任务 + 具体的格式要求
  4. 4. 结构化输出:通过JSON格式让LLM的输出可被程序解析,Plan和ExecutionStep是DataAgent的核心数据结构
  5. 5. LlmService抽象:统一接口屏蔽底层LLM差异,支持流式响应

理解LLM的角色后,下一章我们将学习Agent的"手脚"——工具系统。没有工具,LLM只能"空想";有了工具,Agent才能真正改变世界。


思考题

  1. 1. 场景题:假设用户输入"最近手机卖得怎么样",请尝试写一段Prompt(包含角色、任务、格式要求),让LLM生成一个包含SQL查询和Python分析的两步Plan。
  2. 2. 分析题:DataAgent的PlannerNode在构建Prompt时,会把数据库Schema、用户问题、证据信息都传给LLM。为什么要传Schema?如果不传Schema,LLM可能会犯什么错误?

 



第四章:工具是Agent的手脚——Tool Use与Function Calling

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

人之所以强大,是因为会用工具

想象一个场景:

  • • 原始人徒手打不过狮子,但拿起长矛就能狩猎
  • • 木匠徒手砍不断大树,但用锯子就能精准加工
  • • 程序员徒手算不出千万级数据的统计,但写一段Python代码就能秒出结果

工具,是人类文明的基石。

Agent也是如此。LLM虽然聪明,但它"困在"服务器里:不能上网查最新新闻,不能连接你的数据库,不能运行代码,不能生成文件。没有工具,LLM只能给你"纸上谈兵"的建议;有了工具,Agent才能真正解决问题。

如果把Agent比作一个人:

  • • LLM 是大脑,负责思考
  • • 工具 是手脚,负责行动
  • • Function Calling 是神经系统,负责"大脑指挥手脚"

本章将带你理解:Agent为什么需要工具?有哪些常见工具?DataAgent如何利用Function Calling机制来调用SQL、Python等工具?以及如何保证工具调用的安全性?


4.1 为什么Agent需要工具

4.1.1 LLM的三大局限

尽管LLM很聪明,但它有三个根本性的局限:

局限1:知识有截止日期

LLM的知识来自训练数据,而训练数据有截止时间。比如GPT-4的知识截止到2024年初,它不知道:

  • • 今天的股价是多少
  • • 明天的天气怎么样
  • • 你家数据库里有什么表

类比:就像一位学识渊博但隐居山林的学者,他读过很多书,但不知道外面世界正在发生什么。

局限2:不能直接与外部世界交互

LLM运行在封闭的环境中,它不能:

  • • 连接你的MySQL数据库执行查询
  • • 访问你的公司内部系统
  • • 发送邮件或消息
  • • 浏览实时网页

它只能"说话"(生成文本),不能"动手"(执行操作)。

局限3:数学计算和代码执行需要精确性

LLM擅长"理解"数学问题,但不一定擅长"计算":

  • • 问它"12345 × 67890 = ?",它可能算错
  • • 让它心算一个复杂统计,结果可能不精确
  • • 让它"假想"运行代码,输出可能和实际运行不一致

类比:就像心算和用计算器的区别。再聪明的人,面对复杂计算也需要工具。

4.1.2 工具弥补LLM的短板



LLM的局限对应的工具作用
知识有截止日期搜索工具获取实时信息
不能连接数据库数据库工具执行SQL查询
计算不精确计算工具执行Python代码
不能持久化结果文件工具读写文件、生成报告

核心思想:让LLM做它擅长的事(理解、推理、决策),让工具做它们擅长的事(精确执行、外部交互)。


4.2 常见工具类型

在数据分析Agent中,常见的工具有以下几类:

4.2.1 搜索工具(获取实时信息)

作用:弥补LLM知识时效性的不足,获取最新信息。

例子

  • • 用户问"昨天特斯拉的股价是多少?"
  • • Agent调用搜索工具,查询金融API或搜索引擎
  • • 把搜索结果返回给LLM,LLM再生成回答

在DataAgent中,EvidenceRecallNode就是一种特殊的"搜索工具"——它在向量数据库中搜索与用户问题相关的业务知识文档,把搜索结果作为"证据"提供给LLM。

4.2.2 数据库工具(SQL查询)

作用:让Agent能够查询结构化数据。

例子

  • • 用户问"上季度各地区的销售额"
  • • Agent生成SQL:SELECT region, SUM(amount) FROM sales WHERE quarter='2024Q3' GROUP BY region
  • • 调用数据库工具执行SQL
  • • 把查询结果返回给LLM

在DataAgent中,SqlExecuteNode就是数据库工具的执行节点。它连接MySQL等数据库,执行LLM生成的SQL,并把结果返回。

4.2.3 计算工具(Python执行)

作用:执行复杂计算、数据清洗、统计分析、图表绘制。

例子

  • • SQL查询返回了原始数据
  • • 用户要求"计算环比增长率,并画出趋势图"
  • • Agent生成Python代码,调用Pandas计算增长率,调用Matplotlib画图
  • • 调用Python执行工具运行代码
  • • 把执行结果(图表数据)返回给LLM

在DataAgent中,PythonExecuteNode就是计算工具的执行节点。它在Docker沙箱中运行Python代码,确保安全。

4.2.4 文件工具(读写文件)

作用:持久化分析结果,生成报告文件。

例子

  • • 分析完成后,Agent生成一份HTML报告
  • • 调用文件工具把报告保存到磁盘
  • • 用户可以通过链接下载报告

在DataAgent中,ReportGeneratorNode会生成HTML/Markdown格式的分析报告,并支持导出。


4.3 Function Calling机制

现在我们来到本章的核心:Function Calling(函数调用)。这是LLM与工具交互的标准机制。

4.3.1 Function Calling的工作流程

Function Calling的本质是:LLM决定"我要调用什么工具",程序执行工具,再把结果返回给LLM

┌─────────────┐     1.用户提问      ┌─────────────┐
│   用户      │ ──────────────────> │    LLM      │
│             │                     │   (大脑)     │
└─────────────┘                     └──────┬──────┘
                                           │
                                           │ 2.分析:需要调用工具
                                           ▼
                                    ┌─────────────┐
                                    │  生成调用   │
                                    │  请求:     │
                                    │  tool: SQL  │
                                    │  params: {} │
                                    └──────┬──────┘
                                           │
                                           │ 3.执行工具
                                           ▼
                                    ┌─────────────┐
                                    │   工具      │
                                    │  (SQL执行)  │
                                    └──────┬──────┘
                                           │
                                           │ 4.返回结果
                                           ▼
                                    ┌─────────────┐
                                    │  查询结果   │
                                    │  {数据...}  │
                                    └──────┬──────┘
                                           │
                                           │ 5.带着结果继续思考
                                           ▼
                                    ┌─────────────┐
                                    │    LLM      │
                                    │  生成最终   │
                                    │  回答/下一步 │
                                    └──────┬──────┘
                                           │
                                           │ 6.输出给用户
                                           ▼
                                    ┌─────────────┐
                                    │   用户      │
                                    │ (看到结果)  │
                                    └─────────────┘

关键洞察:LLM不是直接执行工具,而是生成调用指令。程序解析指令,执行工具,再把结果喂回给LLM。这是一个"LLM → 工具 → LLM"的循环。

4.3.2 类比:Function Calling就像"开处方"

想象LLM是一位医生:

  • • 病人(用户)说"我头疼、发烧"
  • • 医生(LLM)诊断后,开处方:"需要做血常规检查"
  • • 护士(程序)拿到处方,执行检查(抽血、化验)
  • • 护士把检查报告交给医生
  • • 医生根据报告,给出诊断:"你是病毒性感冒,需要休息和服药"

医生不会亲自操作化验仪器,但他知道什么时候该开什么检查。这就是Function Calling的精髓。


4.4 DataAgent中的工具系统

DataAgent的工具系统由多个**Node(节点)**组成,每个节点负责一类工具的调用。这些节点被编排在一个StateGraph工作流中,由PlanExecutorNode统一调度。

4.4.1 SQL执行工具:SqlExecuteNode

SqlExecuteNode负责执行LLM生成的SQL查询,是DataAgent最基础的工具节点。

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SqlExecuteNode.java

@Slf4j
@Component
@AllArgsConstructor
public class SqlExecuteNode implements NodeAction {

    private final DatabaseUtil databaseUtil;
    private final Nl2SqlService nl2SqlService;
    private final LlmService llmService;
    private final DataAgentProperties properties;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 1. 获取当前步骤和SQL查询
        Integer currentStep = PlanProcessUtil.getCurrentStepNumber(state);
        String sqlQuery = StateUtil.getStringValue(state, SQL_GENERATE_OUTPUT);
        sqlQuery = nl2SqlService.sqlTrim(sqlQuery);

        log.info("Executing SQL query: {}", sqlQuery);

        // 2. 获取Agent对应的数据库配置
        String agentIdStr = StateUtil.getStringValue(state, Constant.AGENT_ID);
        Long agentId = Long.valueOf(agentIdStr);
        DbConfigBO dbConfig = databaseUtil.getAgentDbConfig(agentId);

        // 3. 执行SQL查询
        return executeSqlQuery(state, currentStep, sqlQuery, dbConfig, agentId);
    }
}

核心流程

  1. 1. 从State中获取LLM生成的SQL语句
  2. 2. 根据Agent ID获取对应的数据库连接配置(DataAgent支持多租户,不同Agent连接不同数据库)
  3. 3. 创建数据库访问器,执行SQL
  4. 4. 处理查询结果,更新State

执行SQL的关键代码

// 构建查询参数
DbQueryParameter dbQueryParameter = new DbQueryParameter();
dbQueryParameter.setSql(sqlQuery);
dbQueryParameter.setSchema(dbConfig.getSchema());

// 获取数据库访问器
Accessor dbAccessor = databaseUtil.getAgentAccessor(agentId);

// 执行SQL并获取结果
ResultSetBO resultSetBO = dbAccessor.executeSqlAndReturnObject(dbConfig, dbQueryParameter);

// 将结果存入State,供后续节点使用
Map<String, String> updatedResults = PlanProcessUtil.addStepResult(
    existingResults, currentStep, strResultSetJson
);

设计亮点

  • • 多租户支持:通过agentId动态获取数据库配置,不同Agent可以连接不同的数据库
  • • 结果丰富化:执行SQL后,还会调用LLM生成图表配置(enrichResultSetWithChartConfig),决定数据应该用表格、柱状图还是折线图展示
  • • 错误处理:SQL执行失败时,会记录错误信息,触发重试或降级逻辑

4.4.2 Python执行工具:PythonExecuteNode

PythonExecuteNode负责在沙箱环境中执行LLM生成的Python代码,用于复杂计算、数据分析和图表生成。

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PythonExecuteNode.java

@Slf4j
@Component
public class PythonExecuteNode implements NodeAction {

    private final CodePoolExecutorService codePoolExecutor;
    private final ObjectMapper objectMapper;
    private final JsonParseUtil jsonParseUtil;
    private final CodeExecutorProperties codeExecutorProperties;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 1. 获取Python代码和SQL结果数据
        String pythonCode = StateUtil.getStringValue(state, PYTHON_GENERATE_NODE_OUTPUT);
        List<Map<String, String>> sqlResults = StateUtil.hasValue(state, SQL_RESULT_LIST_MEMORY)
                ? StateUtil.getListValue(state, SQL_RESULT_LIST_MEMORY) : new ArrayList<>();

        // 2. 构建任务请求
        CodePoolExecutorService.TaskRequest taskRequest = new CodePoolExecutorService.TaskRequest(
            pythonCode,
            objectMapper.writeValueAsString(sqlResults),  // SQL结果作为输入数据
            null
        );

        // 3. 在Docker沙箱中执行Python代码
        CodePoolExecutorService.TaskResponse taskResponse = this.codePoolExecutor.runTask(taskRequest);

        // 4. 处理执行结果
        if (!taskResponse.isSuccess()) {
            // 执行失败,检查是否需要重试
            int triesCount = StateUtil.getObjectValue(state, PYTHON_TRIES_COUNT, Integer.class, 0);
            if (triesCount >= codeExecutorProperties.getPythonMaxTriesCount()) {
                // 超过最大重试次数,启动降级策略
                log.error("Python执行失败且已超过最大重试次数,采用降级策略");
                return createFallbackResult(state);
            }
            throw new RuntimeException("Python Execute Failed: " + taskResponse.stdErr());
        }

        // 5. 执行成功,返回结果
        String stdout = taskResponse.stdOut();
        log.info("Python Execute Success! StdOut: {}", stdout);
        return Map.of(PYTHON_EXECUTE_NODE_OUTPUT, stdout, PYTHON_IS_SUCCESS, true);
    }
}

核心流程

  1. 1. 从State中获取LLM生成的Python代码
  2. 2. 从State中获取上一步SQL查询的结果数据(作为Python的输入)
  3. 3. 调用codePoolExecutor.runTask()在沙箱中执行代码
  4. 4. 如果成功,返回标准输出(stdout);如果失败,根据重试策略处理

为什么需要Python工具?

SQL擅长"查数据",但不擅长"深度分析"。比如:

  • • 计算环比/同比增长率(需要跨行计算)
  • • 做线性回归预测(需要统计库)
  • • 绘制复杂图表(需要可视化库)
  • • 数据清洗和转换(需要灵活的逻辑)

这些任务用Python的Pandas、NumPy、Matplotlib等库来做,比SQL更高效。

4.4.3 向量检索工具:EvidenceRecallNode

EvidenceRecallNode是DataAgent的"知识检索工具",负责在向量数据库中搜索与用户问题相关的业务知识。

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/EvidenceRecallNode.java

@Slf4j
@Component
public class EvidenceRecallNode implements NodeAction {

    private final LlmService llmService;
    private final AgentVectorStoreService vectorStoreService;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 1. 获取用户问题和Agent ID
        String question = StateUtil.getStringValue(state, INPUT_KEY);
        String agentId = StateUtil.getStringValue(state, AGENT_ID);

        // 2. 调用LLM进行查询重写(把口语化问题改写成适合检索的查询)
        String prompt = PromptHelper.buildEvidenceQueryRewritePrompt(multiTurn, question);
        Flux<ChatResponse> responseFlux = llmService.callUser(prompt);

        // 3. 从LLM响应中提取重写后的查询
        String standaloneQuery = extractStandaloneQuery(llmOutput);

        // 4. 在向量数据库中检索相关文档
        List<Document> businessTermDocuments = vectorStoreService
            .getDocumentsForAgent(agentId, standaloneQuery, DocumentMetadataConstant.BUSINESS_TERM);
        List<Document> agentKnowledgeDocuments = vectorStoreService
            .getDocumentsForAgent(agentId, standaloneQuery, DocumentMetadataConstant.AGENT_KNOWLEDGE);

        // 5. 构建格式化的证据内容
        String evidence = buildFormattedEvidenceContent(businessTermDocuments, agentKnowledgeDocuments);

        // 6. 返回证据,供后续节点使用
        return Map.of(EVIDENCE, evidence);
    }
}

核心流程

  1. 1. 获取用户的原始问题
  2. 2. 调用LLM把口语化问题改写成"检索友好"的查询(比如把"最近手机卖得怎么样"改写成"手机产品销售趋势分析")
  3. 3. 在向量数据库中检索相关的业务知识文档(如产品定义、指标口径、业务规则)
  4. 4. 把检索结果格式化成"证据"文本,注入到后续Prompt中

为什么需要这个工具?

因为不同公司有不同的业务术语和规则。比如:

  • • "活跃用户"在A公司定义是"7天内有登录",在B公司是"30天内有下单"
  • • "GMV"是否包含退款?不同公司口径不同
  • • "新客"是指首次注册还是首次下单?

这些业务知识存储在向量数据库中,EvidenceRecallNode负责把它们找出来,告诉LLM:"你分析时要按这个口径来"。


4.5 工具调用的安全性

让LLM执行SQL和Python代码,就像让一位"聪明的实习生"直接操作生产数据库——必须做好安全防护。

4.5.1 SQL执行的安全性

DataAgent在SQL执行层面做了以下防护:

1. 只读查询(默认)

DataAgent的SQL生成Prompt中明确要求"只生成SELECT查询",禁止生成INSERT、UPDATE、DELETE、DROP等修改性语句。

2. 数据库权限控制

为Agent配置的数据库账号应该只有只读权限,即使LLM生成了危险SQL,数据库也会拒绝执行。

3. Schema隔离

不同Agent连接不同的数据库/Schema,一个Agent无法访问其他Agent的数据。

4.5.2 Python执行的安全性:Docker沙箱

Python代码的危险性更高——它可以删除文件、访问网络、消耗大量资源。DataAgent使用Docker容器沙箱来隔离Python执行环境。

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/code/impls/DockerCodePoolExecutorService.java

public class DockerCodePoolExecutorService extends AbstractCodePoolExecutorService 
        implements CodePoolExecutorService {

    private final DockerClient dockerClient;

    // 创建容器的HostConfig(安全隔离配置)
    private HostConfig createHostConfig(Path tempDir) {
        HostConfig config = newHostConfig()
            // 1. 内存限制:防止内存耗尽攻击
            .withMemory(this.properties.getLimitMemory() * 1024L * 1024L)
            // 2. CPU限制:防止CPU占满
            .withCpuCount(this.properties.getCpuCore())
            // 3. 丢弃所有Linux Capability:禁止特权操作
            .withCapDrop(Capability.ALL)
            // 4. 网络隔离:限制网络访问
            .withNetworkMode(this.properties.getNetworkMode())
            // 5. 临时文件系统:容器内/tmp是临时的
            .withTmpFs(Map.of("/tmp", ""));

        // 6. 只读挂载:把脚本文件挂载为只读
        if (!this.isRemote) {
            List<Bind> binds = new ArrayList<>();
            binds.add(new Bind(
                tempDir.resolve("script.py").toAbsolutePath().toString(),
                new Volume("/app/script.py"),
                AccessMode.ro  // 只读!
            ));
            config.withBinds(binds.toArray(new Bind[0]));
        }
        return config;
    }
}

Docker沙箱的六层安全防护


安全措施作用防护的攻击
内存限制限制容器最大内存内存耗尽攻击(OOM)
CPU限制限制可用CPU核心数CPU占满攻击
Capability丢弃禁止所有特权系统调用提权、内核攻击
网络隔离限制容器网络访问内网探测、数据外发
只读挂载脚本文件不可修改篡改执行代码
临时文件系统/tmp目录不持久化恶意文件驻留


4.5.3 超时控制

除了资源隔离,DataAgent还设置了严格的超时机制:

// Docker容器执行超时
dockerClient.waitContainerCmd(containerId)
    .start()
    .awaitCompletion(this.properties.getContainerTimeout(), TimeUnit.SECONDS);

// Python代码执行超时(通过timeout命令)
private String buildExecutionCommand(Path tempDir) {
    return String.format(
        "... timeout -s SIGKILL %s python3 -u script.py ...",
        properties.getCodeTimeout()
    );
}

如果Python代码陷入死循环或执行时间过长,会被强制终止,避免占用系统资源。

4.5.4 重试与降级

当工具调用失败时,DataAgent有完善的容错机制:

// 在PythonExecuteNode中
if (!taskResponse.isSuccess()) {
    int triesCount = StateUtil.getObjectValue(state, PYTHON_TRIES_COUNT, Integer.class, 0);
    
    if (triesCount >= codeExecutorProperties.getPythonMaxTriesCount()) {
        // 超过最大重试次数,启动降级策略
        log.error("Python执行失败且已超过最大重试次数,采用降级策略继续处理");
        String fallbackOutput = "{}";
        return createFallbackResult(state);
    }
    
    // 未超过重试次数,抛出异常触发重试
    throw new RuntimeException("Python Execute Failed: " + taskResponse.stdErr());
}

降级策略:如果Python执行多次都失败,Agent不会直接报错退出,而是返回一个空结果({}),让工作流继续执行后续节点(如报告生成),只是分析深度会降低。


4.6 本章小结

本章我们学习了Agent的"手脚"——工具系统:

  1. 1. 为什么需要工具:LLM有三大局限(知识截止日期、不能外部交互、计算不精确),工具弥补这些短板
  2. 2. 常见工具类型:搜索工具、数据库工具、计算工具、文件工具
  3. 3. Function Calling机制:LLM生成调用指令 → 程序执行工具 → 结果返回LLM → LLM继续思考
  4. 4. DataAgent的工具系统
    • • SqlExecuteNode:执行SQL查询,连接多租户数据库
    • • PythonExecuteNode:在Docker沙箱中执行Python代码
    • • EvidenceRecallNode:在向量数据库中检索业务知识
  5. 5. 安全性
    • • SQL层:只读查询 + 数据库权限控制
    • • Python层:Docker沙箱隔离(内存/CPU/Capability/网络/只读挂载/临时文件)
    • • 超时控制:防止死循环和资源耗尽
    • • 重试与降级:失败时不中断整个流程

理解工具系统后,你应该能回答这个问题:为什么Agent比单纯的LLM更强大? 因为Agent = LLM(大脑)+ 工具(手脚)+ 工作流(神经系统)。三者结合,才能真正解决复杂的实际问题。


思考题

  1. 1. 场景题:假设用户问"帮我删除销售额为0的订单记录",DataAgent的SQL生成节点会如何处理?为什么这是不安全的?请从Prompt设计、数据库权限、业务风险三个角度分析。
  2. 2. 设计题:如果你要为一个"智能客服Agent"设计工具系统,除了本章提到的搜索、数据库、Python、文件工具外,你还需要什么工具?请列举2-3个,并说明它们的作用和调用方式。

 



第五章:记忆让Agent更聪明

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

金鱼的烦恼

想象一下,你养了一条金鱼。每次你走到鱼缸前,它都会惊恐地游开——因为它完全不记得你是谁。你昨天才喂过它,今天它却像第一次见到你一样。金鱼的记忆只有7秒,这让它无法建立任何持续的关系,也无法从过去的经验中学习。

早期的AI就像这条金鱼。你问它"我叫什么名字",它回答"我不知道"。你刚告诉它"我叫小明",下一秒再问,它又忘了。每次对话都是全新的开始,没有任何记忆的积累。

但人类不一样。你能记住昨天和朋友聊了什么,能回忆起上周学过的知识,甚至能想起十年前的一次旅行。正是这种记忆能力,让你可以持续学习、不断成长。

那么,如何让AI Agent也拥有记忆呢?这就是本章要探讨的核心问题。

5.1 为什么Agent需要记忆

5.1.1 没有记忆的Agent有多笨

假设你正在用一个数据分析Agent查询销售情况:

你:"帮我查一下上个月的销售额"

Agent:"上个月销售额是150万元。"

你:"那再帮我看看利润是多少?"

Agent:"请问您要查哪个月的利润?"

你:"就是刚才说的那个月啊!"

Agent:"抱歉,我不记得刚才说的是哪个月。"

这种体验让人抓狂。没有记忆的Agent,每次交互都是孤立的,无法理解上下文,更做不到连贯的对话。

5.1.2 记忆让Agent更智能

有了记忆,Agent就能:

  1. 1. 保持对话连贯:记住用户之前说过的话,理解指代关系
  2. 2. 积累用户偏好:记住用户喜欢什么格式、关注什么指标
  3. 3. 避免重复劳动:记住已经查过的数据,直接复用
  4. 4. 持续学习进化:从每次交互中积累经验,越用越聪明

记忆是智能的基石。没有记忆,Agent永远是个"新手";有了记忆,Agent才能成长为"专家"。

5.2 短期记忆:Agent的"工作记忆"

5.2.1 什么是短期记忆

人类的短期记忆就像大脑的"工作台",能同时存放几件事。比如你在算账时,会记住"刚才加了15,现在要乘3",算完就忘了。

Agent的短期记忆也一样,它保存当前对话的上下文,让Agent知道"刚才说了什么"。对话结束后,短期记忆通常会被清空。

5.2.2 DataAgent的短期记忆实现

DataAgent使用MultiTurnContextManager来管理短期记忆。它就像Agent的"记事本",记录着对话的每一轮内容。

// 文件路径:DataAgent-main/data-agent-management/.../MultiTurnContextManager.java

@Component
public class MultiTurnContextManager {
    
    // 用线程安全的Map存储每个对话的上下文
    private final ConcurrentHashMap<String, Deque<ConversationTurn>> contextStore = 
        new ConcurrentHashMap<>();
    
    @Autowired
    private DataAgentProperties properties;
    
    /**
     * 开始新一轮对话
     */
    public void beginTurn(String threadId, String userQuery) {
        Deque<ConversationTurn> turns = contextStore.computeIfAbsent(
            threadId, k -> new LinkedList<>()
        );
        
        // 如果超出最大轮数,移除最旧的记录
        int maxHistory = properties.getMaxturnhistory(); // 默认5轮
        while (turns.size() >= maxHistory) {
            turns.pollFirst(); // 移除最早的一轮
        }
        
        // 创建新的对话轮次
        ConversationTurn newTurn = new ConversationTurn();
        newTurn.setUserQuery(userQuery);
        newTurn.setTimestamp(System.currentTimeMillis());
        turns.addLast(newTurn);
    }
    
    /**
     * 追加Agent的回复片段(流式输出时逐步积累)
     */
    public void appendPlannerChunk(String threadId, String chunk) {
        Deque<ConversationTurn> turns = contextStore.get(threadId);
        if (turns != null && !turns.isEmpty()) {
            ConversationTurn currentTurn = turns.getLast();
            currentTurn.appendAssistantResponse(chunk);
        }
    }
    
    /**
     * 完成当前轮次
     */
    public void finishTurn(String threadId) {
        // 可以在这里做清理或持久化操作
    }
}

看,这个设计很巧妙:

  • • 用ConcurrentHashMap保证线程安全,支持多用户同时对话
  • • 用Deque(双端队列)实现"先进先出",自动淘汰旧记忆
  • • 每轮对话包含用户问题和Agent回复,形成完整的上下文

5.2.3 短期记忆的使用

有了存储,还要会用。DataAgent在每次处理用户请求时,都会把短期记忆注入到提示词中:

// 文件路径:DataAgent-main/data-agent-management/.../GraphServiceImpl.java

public class GraphServiceImpl implements GraphService {
    
    @Autowired
    private MultiTurnContextManager multiTurnContextManager;
    
    @Override
    public Flux<GraphNodeResponse> streamSearch(GraphRequest request) {
        String threadId = request.getThreadId();
        String userQuery = request.getQuery();
        
        // 1. 开始新一轮对话
        multiTurnContextManager.beginTurn(threadId, userQuery);
        
        // 2. 构建上下文(包含历史对话)
        String multiTurnContext = multiTurnContextManager.buildContext(threadId);
        
        // 3. 将上下文放入状态,供后续节点使用
        OverAllState state = new OverAllState();
        state.put(Constant.MULTI_TURN_CONTEXT, multiTurnContext);
        state.put(Constant.CURRENT_QUERY, userQuery);
        
        // 4. 执行工作流...
        return executeGraph(state);
    }
}

这样,当Agent生成SQL或Python代码时,提示词中已经包含了之前的对话内容,Agent就能理解"刚才说的那个月"指的是什么了。

5.2.4 短期记忆的边界

短期记忆不是越多越好。DataAgent通过配置控制记忆容量:

// 文件路径:DataAgent-main/data-agent-management/.../DataAgentProperties.java

@ConfigurationProperties(prefix = "spring.ai.alibaba.data-agent")
public class DataAgentProperties {
    
    /**
     * 最大保留几轮对话历史
     */
    private int maxturnhistory = 5;
    
    /**
     * 单轮对话最大长度(字符数)
     */
    private int maxplanlength = 2000;
    
    // getter和setter...
}

为什么限制5轮?因为:

  1. 1. LLM的上下文窗口有限:太多历史会挤占当前问题的空间
  2. 2. 太远的历史帮助不大:用户通常只关心最近几轮的内容
  3. 3. 成本考虑:更多token意味着更高的调用费用

配置提示:你可以在application.yml中调整这些参数:

spring:
  ai:
    alibaba:
      data-agent:
        maxturnhistory: 5
        maxplanlength: 2000

5.3 长期记忆:Agent的"知识库"

5.3.1 什么是长期记忆

短期记忆像便利贴,用完就扔;长期记忆像图书馆,永久保存。

Agent的长期记忆包括:

  • • 业务知识:产品定义、指标口径、业务规则
  • • 用户偏好:张三喜欢看表格,李四喜欢图表
  • • 历史案例:上次类似的问题是怎么解决的
  • • 文档资料:产品手册、API文档、操作指南

5.3.2 向量数据库:长期记忆的"书架"

长期记忆需要专门的存储方式。DataAgent使用向量数据库(Vector Store),它就像图书馆的书架,能根据"语义相似度"快速找到相关资料。

传统数据库是精确匹配:你搜"苹果",只能找到包含"苹果"两个字的内容。

向量数据库是语义匹配:你搜"苹果",还能找到"iPhone"、"苹果手机"、"Apple公司",因为它们在语义空间中是相近的。

传统数据库          向量数据库
┌─────────┐        ┌─────────┐
│ 苹果    │        │ 苹果    │◄──┐
│ 香蕉    │        │ iPhone  │   │ 语义相近
│ 橘子    │        │ Apple   │◄──┘
│ 梨      │        │ 香蕉    │
└─────────┘        │ 橘子    │
  精确匹配          └─────────┘
                    语义相似度搜索

5.3.3 DataAgent的向量存储服务

DataAgent通过AgentVectorStoreService管理长期记忆:

// 文件路径:DataAgent-main/data-agent-management/.../AgentVectorStoreServiceImpl.java

@Service
public class AgentVectorStoreServiceImpl implements AgentVectorStoreService {
    
    @Autowired
    private VectorStore vectorStore; // Spring AI提供的向量存储抽象
    
    @Autowired
    private DataAgentProperties properties;
    
    /**
     * 为指定Agent搜索相关文档
     */
    @Override
    public List<Document> getDocumentsForAgent(String agentId, String query, VectorType vectorType) {
        // 1. 构建搜索请求,带过滤条件
        SearchRequest searchRequest = SearchRequest.builder()
            .query(query)
            .topK(properties.getVectorStore().getDefaultTopkLimit()) // 默认返回8条
            .similarityThreshold(properties.getVectorStore().getDefaultSimilarityThreshold()) // 相似度阈值0.4
            .filterExpression("agentId == '" + agentId + "' && vectorType == '" + vectorType.name() + "'")
            .build();
        
        // 2. 执行相似度搜索
        List<Document> documents = vectorStore.similaritySearch(searchRequest);
        
        return documents;
    }
    
    /**
     * 高级搜索:支持混合搜索(向量+关键词)
     */
    @Override
    public List<Document> search(String agentId, String query, VectorType vectorType, boolean hybrid) {
        if (hybrid) {
            // 混合搜索:结合语义相似度和关键词匹配
            return doHybridSearch(agentId, query, vectorType);
        }
        return getDocumentsForAgent(agentId, query, vectorType);
    }
}

这里有几个关键概念:

  • • TopK:返回最相似的前K条结果(默认8条)
  • • SimilarityThreshold:相似度门槛,低于这个值的会被过滤(默认0.4)
  • • FilterExpression:按AgentID和类型过滤,确保不同Agent的记忆不混淆
  • • HybridSearch:结合语义搜索和关键词搜索,提高召回率

5.3.4 文档的存储与检索

长期记忆不是凭空出现的,需要先把资料"录入"。DataAgent支持多种文档类型:

// 文件路径:DataAgent-main/data-agent-management/.../DataAgentProperties.java

public class DataAgentProperties {
    
    /**
     * 文本分片配置
     */
    private TextSplitterProperties textSplitter = new TextSplitterProperties();
    
    public static class TextSplitterProperties {
        private int chunkSize = 1000;        // 每个片段约1000字符
        private int chunkOverlap = 200;      // 片段间重叠200字符,避免断句
        private String splitterType = "recursive"; // 分片策略
    }
}

为什么要把文档切小?

因为LLM的输入长度有限,一次塞不进整本手册。而且,小块内容更精准:用户问"退货政策",只需找到相关的那一段,不用把整本手册都传给LLM。

重叠200字符是为什么?

防止关键信息被切断。比如"退款条件:商品未使用且包装完整",如果不重叠,可能被切成"退款条件:商品未使用"和"且包装完整",语义就不完整了。

5.4 RAG:给Agent装上"外接大脑"

5.4.1 什么是RAG

RAG(Retrieval-Augmented Generation,检索增强生成)是Agent使用长期记忆的核心机制。

它的工作流程就像开卷考试:

  1. 1. 用户提问:"我们的退货政策是什么?"
  2. 2. 检索资料:去知识库找到相关的政策文档
  3. 3. 拼接提示:把问题和资料一起给LLM
  4. 4. 生成答案:LLM基于资料回答问题
用户提问 ──► 检索相关资料 ──► 拼接提示词 ──► LLM生成答案
                │
                ▼
           [向量数据库]
           业务知识库
           历史文档

5.4.2 DataAgent的RAG实现:EvidenceRecallNode

DataAgent的EvidenceRecallNode专门负责RAG检索。它就像Agent的"图书管理员",根据用户问题去找相关资料。

// 文件路径:DataAgent-main/data-agent-management/.../EvidenceRecallNode.java

@Component
public class EvidenceRecallNode implements NodeAction {
    
    @Autowired
    private AgentVectorStoreService vectorStoreService;
    
    @Autowired
    private LlmService llmService;
    
    @Override
    public Map<String, Object> apply(OverAllState state) {
        String query = StateUtil.getStringValue(state, Constant.CURRENT_QUERY);
        String agentId = StateUtil.getStringValue(state, Constant.AGENT_ID);
        
        // 第一步:查询改写(可选)
        // 把口语化的问题改写成更适合检索的形式
        String enhancedQuery = enhanceQuery(query);
        
        // 第二步:检索业务术语(BUSINESS_TERM)
        List<Document> businessTerms = vectorStoreService.getDocumentsForAgent(
            agentId, enhancedQuery, VectorType.BUSINESS_TERM
        );
        
        // 第三步:检索Agent专属知识(AGENT_KNOWLEDGE)
        List<Document> agentKnowledge = vectorStoreService.getDocumentsForAgent(
            agentId, enhancedQuery, VectorType.AGENT_KNOWLEDGE
        );
        
        // 第四步:格式化证据内容
        StringBuilder evidence = new StringBuilder();
        evidence.append("=== 相关业务术语 ===\n");
        for (Document doc : businessTerms) {
            evidence.append("- ").append(doc.getContent()).append("\n");
        }
        
        evidence.append("\n=== 相关知识 ===\n");
        for (Document doc : agentKnowledge) {
            evidence.append("- ").append(doc.getContent()).append("\n");
        }
        
        // 第五步:将证据存入状态,供后续节点使用
        Map<String, Object> result = new HashMap<>();
        result.put(Constant.EVIDENCE_CONTENT, evidence.toString());
        return result;
    }
    
    /**
     * 查询改写:把"那个指标"改写成具体的指标名称
     */
    private String enhanceQuery(String query) {
        // 简单实现:直接返回原查询
        // 高级实现:用LLM进行查询扩展和同义词替换
        return query;
    }
}

这个节点的工作流程很清晰:

  1. 1. 查询改写:把用户的口语化表达改写成更规范的检索词
  2. 2. 分类检索:同时检索"业务术语"和"Agent知识"两类资料
  3. 3. 格式化输出:把检索结果整理成清晰的文本,方便后续节点使用
  4. 4. 状态传递:通过OverAllState把证据传递给PlannerNode等后续节点

5.4.3 RAG的优势与局限

优势:

  • • 知识实时更新:不用重新训练模型,只需更新知识库
  • • 答案可溯源:能告诉用户"这个答案来自哪份文档"
  • • 减少幻觉:LLM基于真实资料回答,而不是瞎编
  • • 保护隐私:敏感数据留在本地,不传给大模型厂商

局限:

  • • 检索质量决定答案质量:如果找错了资料,答案也会错
  • • 需要维护知识库:文档要定期更新,分片策略要调优
  • • 复杂推理仍有挑战:跨文档的关联推理比较困难

RAG不是万能的,但在大多数企业场景中,它是让Agent拥有专业知识的最佳方案。

5.5 记忆管理的实战策略

5.5.1 短期记忆 vs 长期记忆:如何分工


特性短期记忆长期记忆
存储内容当前对话历史业务知识、用户偏好
存储位置内存(ConcurrentHashMap)向量数据库
生命周期对话期间永久
容量限制5轮对话无上限(受存储限制)
使用场景理解上下文指代回答专业问题
实现复杂度

5.5.2 记忆清理与重置

DataAgent提供了记忆管理功能:

// 文件路径:DataAgent-main/data-agent-management/.../MultiTurnContextManager.java

public class MultiTurnContextManager {
    
    /**
     * 重置指定对话的所有记忆
     */
    public void clearContext(String threadId) {
        contextStore.remove(threadId);
    }
    
    /**
     * 丢弃当前未完成的轮次(用户取消或出错时)
     */
    public void discardPending(String threadId) {
        Deque<ConversationTurn> turns = contextStore.get(threadId);
        if (turns != null && !turns.isEmpty()) {
            ConversationTurn lastTurn = turns.getLast();
            if (!lastTurn.isCompleted()) {
                turns.removeLast(); // 移除未完成的轮次
            }
        }
    }
    
    /**
     * 重新开始最后一轮(用户反馈修改时)
     */
    public void restartLastTurn(String threadId, String newQuery) {
        Deque<ConversationTurn> turns = contextStore.get(threadId);
        if (turns != null && !turns.isEmpty()) {
            turns.removeLast(); // 移除旧的
            beginTurn(threadId, newQuery); // 创建新的
        }
    }
}

这些方法在以下场景很有用:

  • • clearContext:用户说"我们重新开始",清空所有历史
  • • discardPending:Agent生成到一半出错了,丢弃不完整的内容
  • • restartLastTurn:用户说"不对,我的意思是...",基于原问题重新理解

5.5.3 记忆的持久化

短期记忆存在内存里,服务重启就丢了。如果需要持久化,可以:

  1. 1. 定期快照:把内存中的对话历史写入Redis或数据库
  2. 2. 异步持久化:对话结束后,异步保存到数据库
  3. 3. 分级存储:热数据在内存,温数据在Redis,冷数据在数据库

DataAgent目前使用内存存储短期记忆,这是为了简单和性能。在生产环境中,建议接入Redis实现分布式共享。

5.6 本章小结

本章我们探讨了Agent的记忆机制,这是让Agent从"工具"升级为"助手"的关键能力。

核心概念回顾:

  1. 1. 短期记忆:保存当前对话的上下文,让Agent理解"刚才说了什么"。DataAgent用MultiTurnContextManager实现,默认保留5轮对话。
  2. 2. 长期记忆:保存业务知识和用户偏好,让Agent成为"专家"。DataAgent用向量数据库存储,通过语义相似度检索相关资料。
  3. 3. RAG:检索增强生成,是Agent使用长期记忆的核心机制。先检索相关资料,再基于资料生成答案,减少幻觉,提高专业性。
  4. 4. 记忆管理:需要控制记忆容量、提供清理重置机制、考虑持久化策略。

DataAgent的实践亮点:

  • • 线程安全的ConcurrentHashMap存储短期记忆
  • • 可配置的maxturnhistorymaxplanlength控制记忆容量
  • • AgentVectorStoreService提供语义检索和混合搜索
  • • EvidenceRecallNode实现两阶段RAG(查询改写+向量检索)
  • • 状态机OverAllState在各节点间传递记忆

思考题

  1. 1. 场景设计:假设你在开发一个客服Agent,用户可能会说"还是刚才那个问题"、"换种方式问"、"我指的是前天说的那件事"。你会如何设计记忆机制来应对这些场景?
  2. 2. 技术权衡:DataAgent的短期记忆默认只保留5轮。如果增加到20轮,会带来什么好处和问题?你会如何平衡?
  3. 3. RAG优化:如果你发现Agent经常检索到不相关的文档,或者遗漏了关键文档,你会从哪些方面优化RAG的效果?(提示:可以从查询改写、分片策略、相似度阈值等角度思考)

 


第六章:规划让Agent更有条理——Planning与任务拆解

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

做一顿年夜饭

春节前夕,你决定为家人做一顿丰盛的年夜饭。走进厨房,面对冰箱里的食材,你会怎么做?

没有规划的人会手忙脚乱:先炒个青菜,发现米饭还没煮;刚腌好的鱼下了锅,发现葱姜蒜没切;等所有菜炒完,第一道菜已经凉了...

有规划的人会提前列好菜单:

  1. 1. 先规划菜单:冷盘4个、热菜6个、汤1个、主食饺子
  2. 2. 再列采购清单:缺什么食材、缺什么调料
  3. 3. 安排备菜顺序:先泡发干货,再腌制肉类,最后处理蔬菜
  4. 4. 确定烹饪顺序:汤先炖上(需要2小时),米饭煮上(需要40分钟),同时准备凉菜,最后炒热菜
  5. 5. 上桌时机:确保所有热菜同时出锅,汤和主食刚好配齐

这就是规划的力量——把复杂的大目标拆解成有序的小步骤,让执行有条不紊。

Agent面对复杂任务时,也需要这样的规划能力。本章我们就来看看,Agent是如何"先想后做"的。

6.1 为什么Agent需要规划

6.1.1 复杂问题不能一步到位

假设你问Agent:"分析一下上季度各产品线的销售趋势,找出下滑最严重的产品,预测下季度走势,并生成一份带图表的分析报告。"

这个任务包含多少子任务?

  1. 1. 查询上季度各产品线的销售数据
  2. 2. 计算同比、环比增长率
  3. 3. 识别下滑最严重的产品线
  4. 4. 分析下滑原因(可能需要对比历史数据)
  5. 5. 用统计模型预测下季度走势
  6. 6. 生成可视化图表(趋势图、对比图)
  7. 7. 整合所有结果,生成结构化报告

如果Agent没有规划能力,可能会:

  • • 顺序混乱:先生成报告模板,才发现数据还没查
  • • 遗漏步骤:忘了做预测分析,直接给结论
  • • 重复劳动:两次查询相同的数据
  • • 无法纠错:做到第6步发现第2步算错了,全盘重来

6.1.2 规划的本质:拆解与排序

规划的核心就是两件事:

  1. 1. 拆解(Decomposition):把大目标拆成小任务
  2. 2. 排序(Sequencing):确定任务的执行顺序

对人类来说,这是本能。你出门旅行,会自动想:定机票→订酒店→查攻略→收拾行李→去机场。

但对Agent来说,这种能力不是天生的,需要专门设计。这就是规划能力(Planning)——让Agent像项目经理一样,先制定计划,再逐步执行。

好的规划 = 正确的步骤 + 合理的顺序 + 可执行的粒度

6.2 常见的规划模式

在AI领域,研究者们提出了多种规划模式。我们介绍三种最常见的。

6.2.1 Chain-of-Thought(思维链):一步步思考

早期的LLM回答问题像考试蒙选择题——直接给答案,不对就错了。

直接回答模式

问:一个农场有鸡和兔共35只,脚共94只,鸡兔各几只?

答:鸡23只,兔12只。(可能是对的,也可能是错的,不知道咋算的)

Chain-of-Thought模式(思维链):

问:一个农场有鸡和兔共35只,脚共94只,鸡兔各几只?

答:让我逐步思考:

  1. 1. 假设全是鸡,应该有 35×2=70 只脚
  2. 2. 实际多了 94-70=24 只脚
  3. 3. 每只兔比鸡多2只脚,所以兔有 24÷2=12 只
  4. 4. 鸡有 35-12=23 只
  5. 5. 验证:23×2 + 12×4 = 46+48=94,正确

答案是:鸡23只,兔12只。

看出区别了吗?思维链把"黑盒推理"变成了"白盒推理",不仅答案更准,而且错了也能发现哪步错了。

DataAgent的Plan对象就是思维链的具象化:

// 文件路径:DataAgent-main/data-agent-management/.../dto/planner/Plan.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Plan {

    @JsonProperty("thought_process")
    @JsonPropertyDescription("简要描述你的分析思路。必须明确提到你检查了哪些表和字段")
    private String thoughtProcess;

    @JsonProperty("execution_plan")
    @JsonPropertyDescription("执行计划的步骤列表")
    private List<ExecutionStep> executionPlan;

    // ...其他方法
}

thoughtProcess字段就是Agent的"内心独白",记录了它为什么要这样规划。

6.2.2 ReAct(推理+行动):边想边做

ReAct(Reasoning + Acting)模式不一次性制定完整计划,而是每一步都先"思考"再"行动",根据行动结果调整下一步。

思考1:用户要查销售额,我应该先找销售表
   │
   ▼
行动1:查询数据库表名
   │
   ▼
观察1:找到表`orders`,有amount字段
   │
   ▼
思考2:好的,我可以查这个表的amount总和
   │
   ▼
行动2:执行SQL查询
   │
   ▼
观察2:结果为150万
   │
   ▼
思考3:任务完成,可以回复用户了
   │
   ▼
最终答案:上月销售额为150万元

ReAct更灵活,适合开放性探索任务;DataAgent的"规划-执行"模式更可控,适合企业级数据分析。两者没有绝对优劣,看场景选择。

6.2.3 Plan-and-Execute(先规划再执行)

这是DataAgent采用的模式:先一次性制定完整计划,再按步骤执行

优点:

  • • 计划全局可见,便于人工审核
  • • 执行路径清晰,便于调试和优化
  • • 适合有明确目标的任务(如数据分析)

缺点:

  • • 灵活性不如ReAct
  • • 如果前期规划有误,后期需要重新规划

DataAgent通过动态修复机制弥补了这个缺点——执行失败时可以重新规划。

6.3 DataAgent的规划系统

6.3.1 规划的数据结构:Plan + ExecutionStep

DataAgent把规划结果抽象为两个核心类:

Plan(计划)

// 文件路径:DataAgent-main/data-agent-management/.../dto/planner/Plan.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Plan {

    @JsonProperty("thought_process")
    @JsonPropertyDescription("简要描述你的分析思路。必须明确提到你检查了哪些表和字段")
    private String thoughtProcess;

    @JsonProperty("execution_plan")
    @JsonPropertyDescription("执行计划的步骤列表")
    private List<ExecutionStep> executionPlan;

    // 为NL2SQL模式准备的简单计划(直接查询,不做复杂分析)
    public static String nl2SqlPlan() {
        return NL2SQL_PLAN_JSON;
    }
}

ExecutionStep(执行步骤)

// 文件路径:DataAgent-main/data-agent-management/.../dto/planner/ExecutionStep.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ExecutionStep {

    @JsonProperty("step")
    @JsonPropertyDescription("步骤顺序号")
    private int step;

    @JsonProperty("tool_to_use")
    @JsonPropertyDescription("工具名称")
    private String toolToUse;

    @JsonProperty("tool_parameters")
    @JsonPropertyDescription("工具参数")
    private ToolParameters toolParameters;

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonInclude(JsonInclude.Include.NON_NULL)  // 序列化时忽略null值
    public static class ToolParameters {

        @JsonProperty("instruction")
        @JsonPropertyDescription("SQL_GENERATE_NODE时:详细的SQL需求;PYTHON_GENERATE_NODE时:详细的编程需求")
        private String instruction;

        @JsonProperty("summary_and_recommendations")
        @JsonPropertyDescription("REPORT_GENERATOR_NODE专用,报告的大纲")
        private String summaryAndRecommendations;

        @JsonProperty("sql_query")
        @JsonPropertyDescription("SQL_GENERATE_NODE运行完后,会把生成的SQL塞进来")
        private String sqlQuery;
    }
}

这个设计非常清晰:

  • • thoughtProcess:Agent的"内心独白",解释为什么要这样规划
  • • executionPlan:具体的任务清单,每一步都有序号、工具和参数
  • • ExecutionStep:原子操作,不可再分的基本任务单元
  • • ToolParameters:根据工具类型携带不同参数(SQL指令、Python指令、报告大纲)

6.3.2 规划器:PlannerNode

PlannerNode是DataAgent的"项目经理"。它接收用户的需求,分析现状,制定执行计划。

用户Query + 历史上下文 + 数据库Schema + 检索到的证据
                    │
                    ▼
            ┌───────────────┐
            │  PlannerNode  │
            │   (规划器)   │
            └───────────────┘
                    │
                    ▼
            Plan(JSON格式)
    thoughtProcess + executionPlan[]

PlannerNode的实现

// 文件路径:DataAgent-main/data-agent-management/.../workflow/node/PlannerNode.java

@Slf4j
@Component
@AllArgsConstructor
public class PlannerNode implements NodeAction {

    private final LlmService llmService;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 是否为NL2SQL模式(直接翻译SQL,不做复杂分析)
        Boolean onlyNl2sql = state.value(IS_ONLY_NL2SQL, false);

        // NL2SQL模式走简化流程,否则走标准规划流程
        Flux<ChatResponse> flux = onlyNl2sql ? handleNl2SqlOnly() : handlePlanGenerate(state);
        // ...流式输出处理
    }

    private Flux<ChatResponse> handlePlanGenerate(OverAllState state) {
        // 获取查询增强节点的输出(经过改写和扩展的用户问题)
        String canonicalQuery = StateUtil.getCanonicalQuery(state);
        log.info("Using processed query for planning: {}", canonicalQuery);

        // 检查是否为修复模式(用户之前否决了计划,需要重新生成)
        String validationError = StateUtil.getStringValue(state, PLAN_VALIDATION_ERROR, null);
        if (validationError != null) {
            log.info("Regenerating plan with user feedback: {}", validationError);
        } else {
            log.info("Generating initial plan");
        }

        // 构建提示参数
        String semanticModel = (String) state.value(GENEGRATED_SEMANTIC_MODEL_PROMPT).orElse("");
        SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
        String schemaStr = PromptHelper.buildMixMacSqlDbPrompt(schemaDTO, true);

        // 构建用户提示(如果是修复模式,会包含错误信息)
        String userPrompt = buildUserPrompt(canonicalQuery, validationError, state);
        String evidence = StateUtil.getStringValue(state, EVIDENCE);

        // 构建模板参数,传入LLM
        BeanOutputConverter<Plan> beanOutputConverter = new BeanOutputConverter<>(Plan.class);
        Map<String, Object> params = Map.of(
            "user_question", userPrompt,
            "schema", schemaStr,
            "evidence", evidence,
            "semantic_model", semanticModel,
            "plan_validation_error", formatValidationError(validationError),
            "format", beanOutputConverter.getFormat()
        );
        
        // 生成计划:渲染Prompt模板 + 调用LLM
        String plannerPrompt = PromptConstant.getPlannerPromptTemplate().render(params);
        return llmService.callUser(plannerPrompt);
    }

    private Flux<ChatResponse> handleNl2SqlOnly() {
        // NL2SQL模式直接返回预置的简单计划
        return Flux.just(ChatResponseUtil.createPureResponse(Plan.nl2SqlPlan()));
    }
}

这里有几个关键点:

  1. 1. 上下文整合:规划器不是凭空规划,而是基于Schema(数据库结构)、Evidence(业务知识)、Context(对话历史)综合判断
  2. 2. 结构化输出:通过BeanOutputConverter强制LLM输出JSON,而不是自由文本
  3. 3. 修复模式:如果PLAN_VALIDATION_ERROR存在,说明之前的计划被否决了,会重新规划
  4. 4. NL2SQL简化:对于简单的SQL查询,直接返回预置的单步计划,提高效率

6.3.3 规划提示词模板

好的提示词决定了规划质量。DataAgent的Planner提示词非常详细:

# 角色:高级数据分析智能体 (Senior Data Analysis Agent)

你是一位拥有深厚业务洞察力的高级数据分析专家。你的核心职责是解析用户的业务问题,并基于给定的数据库 Schema,制定一个严谨、可执行的分步执行计划(Plan)。

**重要原则:你必须且只能输出一个合法的 JSON 对象。严禁包含 markdown 标记(如 ```json)、注释或任何 JSON 结构之外的文本。**

# 用户反馈处理 (最高优先级)

{plan_validation_error}

**如果上方存在用户反馈信息:**
1. **强制执行**:反馈中包含的要求必须在新的计划中得到满足。
2. **绝对合规**:用户反馈的各个方面都必须纳入你的计划中
3. **无例外情况**:若用户提及 "需要用 Python",你必须包含 Python 生成节点(PYTHON_GENERATE_NODE)相关步骤
4. **优先级覆盖**:用户反馈中的要求优先于任何默认的分析方法

# 核心任务流程

1.  **解构需求**:深入分析用户问题,明确核心业务目标、所需指标(如转化率、GMV)、维度(如按地区、按渠道)和时间范围。
2.  **Schema 验证 (至关重要)**:**必须首先分析 `AVAILABLE DATA CONTEXT` 中的 Schema**。确认你计划查询的字段(列名)在表中真实存在。整个计划必须完全基于此schema构建,严禁臆造字段。
3.   **制定策略**:创建一个逻辑清晰的多步计划。标准策略通常包括:
        *   **数据提取 (SQL)**:编写针对性的指令来提取原始数据。如果逻辑复杂,请拆分为多个 SQL 步骤。
        *   **深度分析 (Python)**:对于 SQL 难以实现的复杂计算(如环比/同比计算、统计分析、预测、相关性分析)或图表绘制,必须使用 Python。
        *   **总结汇报 (Report)**:最后一步必须是生成报告,总结发现并给出建议。
4.  **生成计划**:输出 JSON 对象。

# 可用工具 (Available Tools)

## 1. SQL_GENERATE_NODE
*   **核心用途**:执行 SQL 查询以提取或聚合数据。这是分析的基础。
*   **参数**:
    *   `instruction` (string): **[下游指令]** 将直接作为 SQL 生成专家的 Prompt。
*   **严格约束**:
    *   **禁止**使用"SQL生成"、"查询数据"、"执行查询"等模糊词语。
    *   **必须**包含:目标表名(或业务实体名)、具体的聚合维度(如按月、按地区)、过滤条件(如2023年)。
    *   **正例**:"从 orders 表查询 2023 年北京地区的销售总额,并按月份分组"。
    *   **反例**:"SQL生成"(**绝对禁止**,这将导致任务失败)。

## 2. PYTHON_GENERATE_NODE
*   **核心用途**:当 SQL 难以满足需求时使用。适用于:复杂逻辑计算、数据清洗、高级统计分析、图表绘制。
*   **参数**:
    *   `instruction` (string): 给 Python 解释器的具体编程指令。

## 3. REPORT_GENERATOR_NODE
*   **核心用途**:流程的最后一步。用于整合所有步骤的输出,回答用户最初的问题,并提供商业建议。
*   **参数**:
    *   `summary_and_recommendations` (string): 报告的大纲、需要回答的关键问题和建议方向。

# 可用数据上下文 (Available Data Context)

基于用户问题,系统已召回以下数据库 Schema。**你的计划必须且只能基于这些表结构。**

```sql
{schema}

【Reference information】
{evidence}

思考路径指南 (用于填充 thought_process 字段)

在生成 JSON 之前,请遵循以下步骤进行思考,并将思考的摘要写入 JSON 的 thought_process 字段中:

1、理解目标:用户的核心疑问是什么?(例如:"分析线索转化质量")。
2、核对 Schema:检查 {schema}。我有 region, channel 字段吗?我有代表转化节点的字段吗?如果没有相关字段,我不能制定查询该字段的计划。
3、拆解步骤:思考如何将大问题拆解为 SQL 查询和 Python 计算。例如:
    3.1、需要按渠道看转化率? -> Step 1: SQL_GENERATE_NODE (描述:按渠道分组查询各阶段人数)。
    3.2、需要按地区看转化率? -> Step 2: SQL_GENERATE_NODE (描述:按地区分组查询各阶段人数)。
    3.3、需要计算复杂的比率或排名? -> Step 3: PYTHON_GENERATE_NODE (指令:读取前两步数据,计算转化率并排名)。
    3.4 需要总结? -> Step 4: REPORT_GENERATOR_NODE。
4、撰写指令:确保 SQL_GENERATE_NODE 的 instruction 足够详细,让写 SQL 的同事(下游节点)一看就懂,不需要再问用户。
5、构建 JSON:组装最终结果。

输出格式 (必须是合法的 JSON)

请注意:tool_parameters 对象是动态的,请根据 tool_to_use 仅填充必要的字段,不要输出值为 null 的字段
{format}


示例(EXAMPLE)

User Input: "分析极曜汽车近一年的购车线索转化质量,尤其是不同地区的线索质量情况"

Your Output:
{
  "thought_process": "用户想要分析'极曜汽车'近一年的线索转化质量...",
  "execution_plan": [
    {
      "step": 1,
      "tool_to_use": "SQL_GENERATE_NODE",
      "tool_parameters": {
        "instruction": "按渠道来源分组,查询近一年的线索转化漏斗核心指标。"
      }
    },
    {
      "step": 2,
      "tool_to_use": "SQL_GENERATE_NODE",
      "tool_parameters": {
        "instruction": "按地理区域(省份、城市)分组,查询近一年的线索转化漏斗核心指标。"
      }
    },
    {
      "step": 3,
      "tool_to_use": "PYTHON_GENERATE_NODE",
      "tool_parameters": {
        "instruction": "基于步骤1(渠道数据)和步骤2(区域数据)的结果,进行深入分析..."
      }
    },
    {
      "step": 4,
      "tool_to_use": "REPORT_GENERATOR_NODE",
      "tool_parameters": {
        "summary_and_recommendations": "综合以上分析结果,总结出高转化率渠道和区域的共同特征..."
      }
    }
  ]
}


User's Current Request

User Input: "{user_question}"


这个提示词模板非常详细,包含:

- **角色设定**:"高级数据分析专家"
- **重要原则**:只能输出合法JSON
- **用户反馈处理**:最高优先级,必须满足用户反馈
- **核心任务流程**:解构需求→Schema验证→制定策略→生成计划
- **可用工具说明**:每个工具的用途、参数、约束条件
- **思考路径指南**:教LLM如何一步步思考
- **输出格式要求**:JSON结构、字段说明
- **示例**:完整的输入输出示例

这就像给实习生一份极其详细的"作业指导书",说得越清楚,他做得越到位。

## 6.4 计划的执行与调度

### 6.4.1 执行器:PlanExecutorNode

制定计划只是第一步,更重要的是执行。`PlanExecutorNode`就是DataAgent的"执行经理",负责按计划推进任务。

Plan(计划)
   │
   ▼
PlanExecutorNode(执行器)
   │
   ├──► 验证计划合法性
   │
   ├──► 获取当前步骤
   │
   ├──► 路由到对应节点
   │       ├──► SQL_GENERATE_NODE → 生成SQL → 执行SQL
   │       ├──► PYTHON_GENERATE_NODE → 生成Python → 执行Python
   │       └──► REPORT_GENERATOR_NODE → 生成报告
   │
   └──► 检查是否还有下一步
           ├──► 有 → 继续执行
           └──► 无 → 结束


**PlanExecutorNode的实现**:

```java
// 文件路径:DataAgent-main/data-agent-management/.../workflow/node/PlanExecutorNode.java

@Slf4j
@Component
public class PlanExecutorNode implements NodeAction {

    // 支持的工具节点类型
    private static final Set<String> SUPPORTED_NODES = Set.of(
        SQL_GENERATE_NODE, 
        PYTHON_GENERATE_NODE,
        REPORT_GENERATOR_NODE
    );

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 1. 验证计划结构
        Plan plan;
        try {
            plan = PlanProcessUtil.getPlan(state);
        } catch (Exception e) {
            log.error("Plan validation failed due to a parsing error.", e);
            return buildValidationResult(state, false, 
                "Validation failed: The plan is not a valid JSON structure.");
        }

        // 验证执行计划是否为空
        if (!validateExecutionPlanStructure(plan)) {
            return buildValidationResult(state, false,
                "Validation failed: The generated plan is empty or has no execution steps.");
        }

        // 2. 验证每个执行步骤
        for (ExecutionStep step : plan.getExecutionPlan()) {
            String validationResult = validateExecutionStep(step);
            if (validationResult != null) {
                return buildValidationResult(state, false, validationResult);
            }
        }

        log.info("Plan validation successful.");

        // 3. 如果开启人工复核,则在执行前暂停,跳转到human_feedback节点
        Boolean humanReviewEnabled = state.value(HUMAN_REVIEW_ENABLED, false);
        if (Boolean.TRUE.equals(humanReviewEnabled)) {
            log.info("Human review enabled: routing to human_feedback node");
            return Map.of(PLAN_VALIDATION_STATUS, true, PLAN_NEXT_NODE, HUMAN_FEEDBACK_NODE);
        }

        // 4. 获取当前步骤编号
        int currentStep = PlanProcessUtil.getCurrentStepNumber(state);
        List<ExecutionStep> executionPlan = plan.getExecutionPlan();

        // 5. 检查计划是否已完成
        if (currentStep > executionPlan.size()) {
            log.info("Plan completed, current step: {}, total steps: {}", currentStep, executionPlan.size());
            return Map.of(
                PLAN_CURRENT_STEP, 1,
                PLAN_NEXT_NODE, isOnlyNl2Sql ? StateGraph.END : REPORT_GENERATOR_NODE,
                PLAN_VALIDATION_STATUS, true
            );
        }

        // 6. 获取当前步骤,确定下一个执行节点
        ExecutionStep executionStep = executionPlan.get(currentStep - 1);
        String toolToUse = executionStep.getToolToUse();

        return determineNextNode(toolToUse);
    }

    /**
     * 确定下一个执行节点
     */
    private Map<String, Object> determineNextNode(String toolToUse) {
        if (SUPPORTED_NODES.contains(toolToUse)) {
            log.info("Determined next execution node: {}", toolToUse);
            return Map.of(PLAN_NEXT_NODE, toolToUse, PLAN_VALIDATION_STATUS, true);
        } else if (HUMAN_FEEDBACK_NODE.equals(toolToUse)) {
            return Map.of(PLAN_NEXT_NODE, toolToUse, PLAN_VALIDATION_STATUS, true);
        } else {
            return Map.of(PLAN_VALIDATION_STATUS, false, 
                PLAN_VALIDATION_ERROR, "Unsupported node type: " + toolToUse);
        }
    }

    /**
     * 验证单个执行步骤
     */
    private String validateExecutionStep(ExecutionStep step) {
        // 验证工具名称是否合法
        if (step.getToolToUse() == null || !SUPPORTED_NODES.contains(step.getToolToUse())) {
            return "Validation failed: Plan contains an invalid tool name: '" 
                + step.getToolToUse() + "' in step " + step.getStep();
        }

        // 验证工具参数是否存在
        if (step.getToolParameters() == null) {
            return "Validation failed: Tool parameters are missing for step " + step.getStep();
        }

        // 根据节点类型验证特定参数
        switch (step.getToolToUse()) {
            case SQL_GENERATE_NODE:
                if (!StringUtils.hasText(step.getToolParameters().getInstruction())) {
                    return "Validation failed: SQL generation node is missing description in step " + step.getStep();
                }
                break;

            case PYTHON_GENERATE_NODE:
                if (!StringUtils.hasText(step.getToolParameters().getInstruction())) {
                    return "Validation failed: Python generation node is missing instruction in step " + step.getStep();
                }
                break;

            case REPORT_GENERATOR_NODE:
                if (!StringUtils.hasText(step.getToolParameters().getSummaryAndRecommendations())) {
                    return "Validation failed: Report generation node is missing summary_and_recommendations in step " + step.getStep();
                }
                break;
        }

        return null; // 验证通过
    }
}

执行器的逻辑很清晰:

  1. 1. 验证计划:确保计划不是空的,JSON格式正确
  2. 2. 验证步骤:逐个检查每个步骤的工具名称、参数完整性
  3. 3. 人工审核:如果开启了人工复核,暂停执行等待用户确认
  4. 4. 获取步骤:按序号取出当前要执行的步骤
  5. 5. 路由跳转:返回下一个节点的名称,StateGraph负责调度

6.4.2 调度器:PlanExecutorDispatcher

PlanExecutorDispatcher是"交通警察",负责根据执行结果决定下一步去哪。

// 文件路径:DataAgent-main/data-agent-management/.../workflow/dispatcher/PlanExecutorDispatcher.java

@Slf4j
public class PlanExecutorDispatcher implements EdgeAction {

    private static final int MAX_REPAIR_ATTEMPTS = 2;

    @Override
    public String apply(OverAllState state) {
        // 1. 检查计划验证是否通过
        boolean validationPassed = StateUtil.getObjectValue(state, PLAN_VALIDATION_STATUS, Boolean.class, false);

        if (validationPassed) {
            log.info("Plan validation passed. Proceeding to next step.");
            String nextNode = state.value(PLAN_NEXT_NODE, END);
            if ("END".equals(nextNode)) {
                log.info("Plan execution completed successfully.");
                return END;
            }
            return nextNode;
        } else {
            // 2. 计划验证失败,检查修复次数
            int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);

            if (repairCount > MAX_REPAIR_ATTEMPTS) {
                log.error("Plan repair attempts exceeded the limit of {}. Terminating execution.", MAX_REPAIR_ATTEMPTS);
                return END;
            }

            log.warn("Plan validation failed. Routing back to PlannerNode for repair. Attempt count: {}.", repairCount);
            return PLANNER_NODE;
        }
    }
}

这个调度器很聪明:

  1. 1. 验证通过:继续执行下一步(SQL生成、Python生成或报告生成)
  2. 2. 验证失败:检查已修复次数
    • • 如果未超过最大次数(2次),回到PlannerNode重新规划
    • • 如果超过最大次数,结束执行,避免无限循环

6.5 计划的动态调整:执行失败时重新规划

6.5.1 现实世界的意外

再好的计划也可能遇到意外:

  • • SQL执行失败:表不存在、权限不足、语法错误
  • • 数据不符合预期:查询结果为空、格式不对
  • • LLM生成错误:生成的SQL有逻辑bug
  • • 用户反馈:用户说"不对,我要的不是这个"

没有容错机制,Agent就会"一崩到底"。

6.5.2 DataAgent的修复机制

DataAgent通过PlanExecutorDispatcher实现计划修复:

用户:分析一下销售情况
  │
  ▼
Planner:制定4步计划
  │
  ▼
PlanExecutorNode:验证计划 → 通过
  │
  ▼
执行步骤1(查数据)→ 成功
  │
  ▼
执行步骤2(做分析)→ 失败:数据格式不对
  │
  ▼
PlanExecutorDispatcher:验证失败,repairCount=0 < 2
  │
  ▼
回到 PlannerNode,传入错误信息
  │
  ▼
Planner:调整计划,增加数据清洗步骤
  │
  ▼
新计划:查数据 → 清洗数据 → 做分析 → 生成报告
  │
  ▼
继续执行...

修复模式的PlannerNode处理

// PlannerNode.java 中的修复逻辑

private String buildUserPrompt(String input, String validationError, OverAllState state) {
    if (validationError == null) {
        return input;  // 正常模式,直接返回用户输入
    }

    // 修复模式:构建包含错误信息的提示词
    String previousPlan = StateUtil.getStringValue(state, PLANNER_NODE_OUTPUT, "");
    return String.format(
        "IMPORTANT: User rejected previous plan with feedback: \"%s\"\n\n"
        + "Original question: %s\n\n"
        + "Previous rejected plan:\n%s\n\n"
        + "CRITICAL: Generate new plan incorporating user feedback (\"%s\")",
        validationError, input, previousPlan, validationError
    );
}

这就是"试错学习"——Agent从错误中调整策略,最终完成任务。

6.5.3 修复的实战场景

假设用户问:"查一下去年的销售额"

第一次规划

步骤1:SQL_GENERATE_NODE → 生成SQL查询去年销售额

执行失败

错误:表sales不存在

第一次修复(回到PlannerNode):

Planner看到错误信息,重新规划:
步骤1:SQL_GENERATE_NODE → 查询所有表名,找到销售相关表
步骤2:SQL_GENERATE_NODE → 根据找到的表,查询去年销售额

第二次执行

步骤1成功:找到表名为orders
步骤2成功:查询到去年销售额为150万

6.6 从规划到执行:DataAgent的完整计划生命周期

让我们把整个过程串起来,看看DataAgent的完整计划生命周期:

用户提问:"分析近三个月各产品线的销售趋势"
    │
    ▼
[PlannerNode] 制定计划
    │
    ├── 分析Schema:确认有sales表、product表、date字段
    ├── 拆解任务:查数据 → 画图 → 写报告
    └── 生成Plan(JSON格式)
    │
    ▼
[PlanExecutorNode] 验证计划
    │
    ├── 检查JSON格式是否正确
    ├── 检查每一步的工具名称是否合法
    ├── 检查每一步的参数是否完整
    └── 验证通过 ✓
    │
    ▼
[PlanExecutorDispatcher] 路由到第一步
    │
    ▼
[SQL_GENERATE_NODE] 生成SQL
    │
    ├── "查询近三个月各产品线销售额,按月份和产品线分组"
    └── 生成SQL:SELECT ...
    │
    ▼
[SQL_EXECUTE_NODE] 执行SQL
    │
    ├── 连接数据库,执行查询
    └── 返回结果数据
    │
    ▼
[PlanExecutorDispatcher] 检查是否还有下一步
    │
    ├── 有下一步 → 继续
    │
    ▼
[PYTHON_GENERATE_NODE] 生成Python代码
    │
    ├── "根据SQL结果,绘制各产品线近三个月销售趋势折线图"
    └── 生成Python代码
    │
    ▼
[PYTHON_EXECUTE_NODE] 执行Python
    │
    ├── 在Docker沙箱中运行代码
    └── 返回图表数据
    │
    ▼
[PlanExecutorDispatcher] 检查是否还有下一步
    │
    ├── 有下一步 → 继续
    │
    ▼
[REPORT_GENERATOR_NODE] 生成报告
    │
    ├── 整合所有结果(数据 + 图表)
    └── 生成结构化分析报告
    │
    ▼
[PlanExecutorDispatcher] 计划执行完毕
    │
    └── 返回 END
    │
    ▼
用户收到:数据表格 + 趋势图 + 分析结论

这个流程体现了DataAgent规划系统的几个核心设计:

  1. 1. 计划先行:PlannerNode先制定完整计划,而不是边做边想
  2. 2. 验证严格:PlanExecutorNode每一步都验证合法性
  3. 3. 调度灵活:PlanExecutorDispatcher根据状态决定下一步去哪
  4. 4. 容错修复:失败时自动回到PlannerNode重新规划(最多2次)
  5. 5. 人工介入:支持人工审核模式,重要操作需用户确认

6.7 本章小结

本章我们探讨了Agent的规划能力,这是让Agent从"被动应答"升级为"主动解决"的关键。

核心概念回顾:

  1. 1. 规划的必要性:复杂任务必须拆解为步骤,否则Agent会混乱、遗漏、重复。
  2. 2. 三种规划模式
    • • Chain-of-Thought:让Agent"逐步思考",把黑盒推理变成白盒推理
    • • ReAct:边想边做,适合开放性探索任务
    • • Plan-and-Execute:先规划再执行,适合企业级数据分析(DataAgent采用)
  3. 3. DataAgent的规划体系
    • • PlannerNode:项目经理,基于Schema和证据制定计划
    • • Plan + ExecutionStep:结构化的任务清单(thoughtProcess + executionPlan)
    • • PlanExecutorNode:执行经理,验证计划并路由到对应节点
    • • PlanExecutorDispatcher:交通警察,根据验证结果决定下一步
  4. 4. 动态修复:执行失败时自动重新规划(最多2次),避免一崩到底。
  5. 5. 提示词工程:Planner的提示词模板非常详细,包含角色设定、工具说明、约束条件、示例等,确保LLM生成高质量计划。

DataAgent的实践亮点:

  • • BeanOutputConverter强制LLM输出结构化JSON计划
  • • ExecutionStep的原子化设计,每步明确工具+参数
  • • 状态机驱动,通过OverAllState在节点间传递计划和进度
  • • 最大2次的修复机制,平衡容错性和效率
  • • 支持人工审核,重要操作需用户确认

思考题

  1. 1. 场景设计:假设用户问"对比一下我们和竞争对手的优劣势"。你会如何设计规划流程?需要哪些步骤?可能遇到什么意外?DataAgent的PlannerNode会如何处理?
  2. 2. 模式选择:DataAgent使用"先规划后执行"模式,ReAct使用"边想边做"模式。如果你要开发一个"旅游规划Agent",帮用户制定旅行计划,你会选择哪种模式?为什么?
  3. 3. 容错设计:DataAgent的修复次数限制为2次。如果增加到10次,会带来什么问题?如果不限制,又会有什么风险?你会如何设计一个"智能"的修复策略?

 


第七章:多Agent协作——从单打独斗到团队作战

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

一个厨房里的协作艺术

想象你走进一家高档餐厅的后厨:

主厨站在中央,审视着今天的订单:"三号桌要一份招牌牛排、一份海鲜意面。"

切配师傅立刻开始处理蔬菜和肉类,把切好的食材按顺序摆在盘子里。

炒锅师傅接过食材,点火、倒油、翻炒,精准控制着火候和时间。

甜点师 meanwhile 正在准备提拉米苏,完全不需要别人催促。

服务员在门口等候,菜一出锅立刻端走,还不忘提醒:"三号桌对坚果过敏,甜点不要放杏仁片。"

这五个人的团队配合默契、各司其职,最终让客人享用到了完美的晚餐。

如果只有一个人呢?主厨既要切菜又要炒菜还要做甜点,等他把牛排煎好,意面早就凉了。单打独斗的效率,永远比不上团队协作。

在AI的世界里,这个道理同样适用。


7.1 从单Agent到多Agent系统

7.1.1 单Agent的局限

回顾前面几章,我们讲的DataAgent本质上是一个单Agent系统——一个"超级助手"独立完成从意图识别到报告生成的全部工作。

单Agent的优点是简单、集中、好管理。但它也有明显的天花板:

局限说明
任务复杂度过高当一个问题涉及多个专业领域(数据分析+法律审查+市场调研),单个Agent难以同时精通所有领域
并行效率低单Agent只能一步一步执行,无法像人类团队那样并行推进
容易"钻牛角尖"单个Agent可能陷入某种思维定式,缺乏外部视角的纠偏
责任边界模糊如果出错了,很难定位是哪个环节的问题

7.1.2 什么是多Agent系统

多Agent系统(Multi-Agent System,MAS) 是由多个Agent组成的协作网络。每个Agent有自己的专长、角色和职责,它们通过通信协调来共同完成一个宏大目标。

用餐厅的例子来类比:

餐厅角色Agent角色职责
主厨协调者Agent(Orchestrator)分配任务、把控整体进度
切配师傅数据准备Agent清洗数据、提取特征
炒锅师傅分析Agent执行核心算法、生成洞察
甜点师可视化Agent制作图表、设计报告样式
服务员交互Agent与用户沟通、收集反馈

一句话总结:单Agent是"全能型个人选手",多Agent是"专业化团队作战"。


7.2 多Agent协作的三种经典模式

学术界和工业界总结出了三种最常见的多Agent协作模式:

7.2.1 分工协作模式(Division of Labor)

每个Agent负责流水线上的一个环节,像工厂流水线一样接力完成任务。

用户提问
    ↓
[意图识别Agent] → 明确需求
    ↓
[数据检索Agent] → 查找相关数据
    ↓
[分析Agent] → 执行计算和分析
    ↓
[报告Agent] → 生成可读报告
    ↓
返回给用户

DataAgent的启示:虽然DataAgent运行在单个进程中,但它的16个节点本质上就是在模拟分工协作——IntentRecognitionNode负责"理解需求",SqlGenerateNode负责"查数据",PythonAnalyzeNode负责"做分析",ReportGeneratorNode负责"写报告"。每个节点只专注于自己的任务,通过OverAllState传递"接力棒"。

7.2.2 协商讨论模式(Discussion & Debate)

多个Agent围绕同一个问题各抒己见,通过讨论达成共识或最优解。

想象一个投资委员会:

  • • 基本面分析Agent:"这家公司财务很健康,建议买入。"
  • • 技术分析Agent:"但股价已经突破阻力位,短期有回调风险。"
  • • 风险控制Agent:"行业政策最近有变化,建议观望。"
  • • 协调者Agent:"综合三方意见,建议分批建仓,先买入30%仓位。"

这种模式特别适合需要多维度权衡的决策场景。

7.2.3 层级汇报模式(Hierarchical)

Agent之间形成上下级关系,上级Agent分配任务,下级Agent执行并汇报。

                    [总指挥官Agent]
                         |
        +----------------+----------------+
        |                |                |
[数据分析小队]      [市场调研小队]      [文案撰写小队]
   |   |   |            |   |              |   |
[SQL] [Python] [图表]  [爬虫] [访谈]     [标题] [正文]

优点:结构清晰,便于管理和追责。哪个环节出错,直接找对应的Agent。

缺点:层级过多会导致信息传递损耗,就像"传话游戏"一样,最下层的Agent可能误解了最初的意图。


7.3 DataAgent中的"协作思想"

虽然DataAgent不是严格意义上的多Agent系统(它运行在一个Java进程里),但它的设计处处体现了多Agent协作的思想

7.3.1 节点即"微Agent"

DataAgent的每个节点都可以看作一个微型的专用Agent

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PlannerNode.java

public class PlannerNode implements NodeAction {
    // 这个"Agent"只负责一件事:制定计划
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 读取当前状态(接收其他"Agent"传来的信息)
        String canonicalQuery = StateUtil.getCanonicalQuery(state);
        SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
        
        // 专注完成自己的任务
        String plannerPrompt = PromptConstant.getPlannerPromptTemplate().render(params);
        return llmService.callUser(plannerPrompt);
    }
}
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SqlGenerateNode.java

public class SqlGenerateNode implements NodeAction {
    // 这个"Agent"只负责一件事:写SQL
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 接收PlannerNode传来的执行计划
        String instruction = getCurrentExecutionStepInstruction(state);
        
        // 专注完成SQL生成
        // ...
    }
}

PlannerNode和SqlGenerateNode互不干涉:

  • • PlannerNode不需要知道SQL怎么写
  • • SqlGenerateNode不需要知道计划是怎么制定的
  • • 它们只通过OverAllState交换信息

这不正是分工协作的精髓吗?

7.3.2 Dispatcher:协作的"调度员"

在DataAgent中,**Dispatcher(调度器)**扮演了"协调者"的角色,决定下一步该哪个"Agent"干活:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/PlanExecutorDispatcher.java

@Slf4j
public class PlanExecutorDispatcher implements EdgeAction {

    private static final int MAX_REPAIR_ATTEMPTS = 2;

    @Override
    public String apply(OverAllState state) {
        boolean validationPassed = StateUtil.getObjectValue(state, PLAN_VALIDATION_STATUS, Boolean.class, false);

        if (validationPassed) {
            // 验证通过,安排下一个执行节点去干活
            String nextNode = state.value(PLAN_NEXT_NODE, END);
            return nextNode;
        }
        else {
            // 验证失败,安排PlannerNode返工(最多2次)
            int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
            if (repairCount > MAX_REPAIR_ATTEMPTS) {
                log.error("修复次数超过限制,终止执行");
                return END; // 让流程结束
            }
            return PLANNER_NODE; // 安排PlannerNode重新制定计划
        }
    }
}

这个Dispatcher就像餐厅里的主厨

  • • 切配完成了?→ 安排炒锅师傅上
  • • 炒锅出问题了?→ 让切配重新准备(最多重试2次)
  • • 所有人都干完了?→ 宣布收工(END)

7.3.3 State:协作的"共享白板"

多Agent协作最大的挑战是信息共享。DataAgent用OverAllState解决了这个问题。

想象一个团队会议室里的共享白板

  • • PlannerNode在白板上写下"第一步:查询sales表"
  • • SqlGenerateNode看到白板,执行后把SQL结果写上去
  • • PythonAnalyzeNode看到SQL结果,把分析结论写上去
  • • 所有Agent都能看到白板上的最新内容
// StateGraph中的每个节点都可以读写State
public Map<String, Object> apply(OverAllState state) {
    // 读取其他节点留下的信息
    String previousResult = StateUtil.getStringValue(state, SQL_EXECUTE_NODE_OUTPUT);
    
    // 处理完成后,把自己的结果写回State
    return Map.of(PYTHON_ANALYSIS_NODE_OUTPUT, analysisResult);
}

7.4 什么时候需要真正的多Agent系统

DataAgent用"单进程多节点"的方式模拟了多Agent协作,那什么时候需要真正独立运行的多个Agent呢?

场景1:跨系统协作

当不同的Agent运行在不同的服务器上,甚至由不同的团队维护时:

  • • 客服Agent:处理用户投诉(部署在客服部门)
  • • 库存Agent:查询仓库库存(部署在供应链部门)
  • • 物流Agent:跟踪快递状态(部署在物流公司)

三个Agent属于不同的组织,必须通过API或消息队列通信。

场景2:安全隔离

当某些任务必须在隔离环境中执行时:

  • • 主Agent:接收用户请求、制定计划
  • • 沙箱Agent:在Docker容器中执行不可信的Python代码
  • • 审计Agent:监控所有操作是否符合合规要求

如果都放在一个进程里,恶意代码可能危及整个系统。

场景3:负载均衡

当任务量巨大时,多个相同的Agent并行工作:

  • • 1000份报表需要生成
  • • 启动10个"报表Agent"同时处理
  • • 协调者Agent分配任务、汇总结果

7.5 多Agent协作的设计原则

如果你要设计一个多Agent系统,记住以下原则:

原则1:职责单一

每个Agent只做一件事,并做好它。就像DataAgent的节点一样,PlannerNode不碰SQL,SqlGenerateNode不碰Python。

原则2:接口明确

Agent之间通过明确的接口通信,而不是直接访问对方的内部状态。

// 好的设计:通过State交换信息
Map<String, Object> result = Map.of(SQL_GENERATE_OUTPUT, sql);

// 坏的设计:直接调用另一个Agent的内部方法
sqlGenerateNode.internalQueryParser(); // 不要这样做!

原则3:容错设计

一个Agent失败了,不应该拖垮整个系统。DataAgent的Dispatcher会在验证失败时安排重试,而不是直接崩溃。

原则4:人类可介入

在多Agent协作的的关键节点,应该预留"人类接管"的接口。DataAgent的HumanFeedbackNode就是一个很好的例子(我们将在第九章详细讲解)。


本章小结

  1. 1. 单Agent的局限:面对复杂、跨领域、高并发的任务时,单打独斗效率低下。
  2. 2. 多Agent系统的三种协作模式
    • • 分工协作:流水线接力,各专其职
    • • 协商讨论:多维度辩论,综合决策
    • • 层级汇报:树形结构,上级指挥下级
  3. 3. DataAgent的协作思想:虽然DataAgent是单进程系统,但它的节点设计(PlannerNode、SqlGenerateNode等)和Dispatcher调度机制,体现了多Agent协作的核心思想。
  4. 4. 共享状态(State)是多Agent协作的关键OverAllState就像团队白板,让所有参与者看到最新进展。
  5. 5. 真正多Agent系统的适用场景:跨组织协作、安全隔离、负载均衡。

思考题

  1. 1. 场景设计:假设你要设计一个"智能旅行社"多Agent系统,需要完成"根据用户预算和喜好制定旅行计划+预订机票酒店+生成行程攻略"的任务。请设计3个专门化的Agent,说明每个Agent的职责和它们之间的协作流程。
  2. 2. 模式辨析:DataAgent的16个节点更像是"分工协作模式"还是"层级汇报模式"?为什么?PlannerNode和PlanExecutorNode之间是什么关系?
  3. 3. 扩展思考:如果把DataAgent改造为真正的多Agent系统(每个节点是一个独立的微服务),会带来什么好处和风险?从通信延迟、容错性、部署复杂度三个角度分析。

 



第八章:工作流编排——Agent的"神经系统"

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

医院看病的工作流

想象你去医院看病的全过程:

  1. 1. 挂号处:出示身份证,拿到挂号单
  2. 2. 门诊室:医生问诊、开检查单
  3. 3. 收费处:缴费
  4. 4. 检验科:抽血、拍片
  5. 5. 回门诊室:医生查看检验结果
  6. 6. 药房:凭处方取药
  7. 7. 离院

这个流程有几个特点:

  • • 有明确的顺序:不能先取药再看医生
  • • 有分支判断:如果检查结果正常,直接取药;如果异常,可能需要住院
  • • 有循环回退:检验结果不清楚,医生可能要求重新检查
  • • 有并行可能:抽血和拍片可以在不同楼层同时进行

这就是工作流(Workflow)——一组按照特定规则编排的活动,共同完成一个业务目标。

Agent要完成复杂任务,同样需要一个精心设计的"工作流"来协调各个环节。本章将学习DataAgent如何用StateGraph来编排它的16+个节点。


8.1 什么是工作流编排

8.1.1 从剧本到即兴表演

在第六章中,我们把Agent比作"即兴演员"——没有固定剧本,根据现场情况灵活发挥。

但即使是即兴演员,也需要一个舞台框架

  • • 幕什么时候拉开
  • • 灯光怎么打
  • • 场景怎么切换
  • • 什么时候谢幕

工作流编排就是这个"舞台框架"。它不负责写台词(那是LLM的工作),但负责:

  • • 哪个节点先上场
  • • 哪个节点后上场
  • • 什么条件下切换场景
  • • 出错时怎么处理

8.1.2 工作流的核心要素

任何工作流系统都离不开四个核心要素:


要素类比说明
节点(Node)演员执行具体任务的单元
边(Edge)舞台通道连接节点的路径
状态(State)共享道具箱节点之间传递数据的载体
条件(Condition)导演指令决定走哪条边的规则

8.2 StateGraph:DataAgent的工作流引擎

DataAgent使用 Spring AI Alibaba Graph 框架中的 StateGraph 来编排工作流。

8.2.1 什么是StateGraph

StateGraph(状态图) 是一种有向图结构:

  • • 节点(Node) = 处理步骤(如意图识别、SQL生成)
  • • 边(Edge) = 节点之间的连接
  • • 状态(State) = 流经整个图的数据对象
        START
          |
          v
   [IntentRecognition]
          |
    +-----+-----+
    |           |
    v           v
[EvidenceRecall] END
    |
    v
[QueryEnhance]
    |
    ...

数据(State)像流水一样在图中流动,每经过一个节点,就会被加工一次,直到到达终点(END)。

8.2.2 DataAgent的工作流全景图

DataAgent的工作流包含16+个节点,是一个相当复杂的图。让我们看看它的"蓝图":

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/config/DataAgentConfiguration.java

@Bean
public StateGraph nl2sqlGraph(NodeBeanUtil nodeBeanUtil, CodeExecutorProperties codeExecutorProperties)
        throws GraphStateException {

    StateGraph stateGraph = new StateGraph(NL2SQL_GRAPH_NAME, keyStrategyFactory)
        // 添加所有节点
        .addNode(INTENT_RECOGNITION_NODE, nodeBeanUtil.getNodeBeanAsync(IntentRecognitionNode.class))
        .addNode(EVIDENCE_RECALL_NODE, nodeBeanUtil.getNodeBeanAsync(EvidenceRecallNode.class))
        .addNode(QUERY_ENHANCE_NODE, nodeBeanUtil.getNodeBeanAsync(QueryEnhanceNode.class))
        .addNode(SCHEMA_RECALL_NODE, nodeBeanUtil.getNodeBeanAsync(SchemaRecallNode.class))
        .addNode(TABLE_RELATION_NODE, nodeBeanUtil.getNodeBeanAsync(TableRelationNode.class))
        .addNode(FEASIBILITY_ASSESSMENT_NODE, nodeBeanUtil.getNodeBeanAsync(FeasibilityAssessmentNode.class))
        .addNode(SQL_GENERATE_NODE, nodeBeanUtil.getNodeBeanAsync(SqlGenerateNode.class))
        .addNode(PLANNER_NODE, nodeBeanUtil.getNodeBeanAsync(PlannerNode.class))
        .addNode(PLAN_EXECUTOR_NODE, nodeBeanUtil.getNodeBeanAsync(PlanExecutorNode.class))
        .addNode(SQL_EXECUTE_NODE, nodeBeanUtil.getNodeBeanAsync(SqlExecuteNode.class))
        .addNode(PYTHON_GENERATE_NODE, nodeBeanUtil.getNodeBeanAsync(PythonGenerateNode.class))
        .addNode(PYTHON_EXECUTE_NODE, nodeBeanUtil.getNodeBeanAsync(PythonExecuteNode.class))
        .addNode(PYTHON_ANALYZE_NODE, nodeBeanUtil.getNodeBeanAsync(PythonAnalyzeNode.class))
        .addNode(REPORT_GENERATOR_NODE, nodeBeanUtil.getNodeBeanAsync(ReportGeneratorNode.class))
        .addNode(SEMANTIC_CONSISTENCY_NODE, nodeBeanUtil.getNodeBeanAsync(SemanticConsistencyNode.class))
        .addNode(HUMAN_FEEDBACK_NODE, nodeBeanUtil.getNodeBeanAsync(HumanFeedbackNode.class));
    
    // ... 连接边(见下文)
}

这段代码就像剧组的演员表——把16个"演员"(节点)全部登记在册,但还没有告诉他们上场顺序。

8.2.3 连接节点:普通边与条件边

节点注册完后,下一步是连接它们

普通边(addEdge):一对一的固定连接

// 从START开始,先到意图识别节点
stateGraph.addEdge(START, INTENT_RECOGNITION_NODE)
    
    // 证据召回完成后,必然进入查询增强
    .addEdge(EVIDENCE_RECALL_NODE, QUERY_ENHANCE_NODE)
    
    // Python代码生成后,必然进入Python执行
    .addEdge(PYTHON_GENERATE_NODE, PYTHON_EXECUTE_NODE)
    
    // 报告生成后,流程结束
    .addEdge(REPORT_GENERATOR_NODE, END)
    
    // 计划生成后,进入计划执行器
    .addEdge(PLANNER_NODE, PLAN_EXECUTOR_NODE)
    
    // Python分析完成后,回到计划执行器继续下一步
    .addEdge(PYTHON_ANALYZE_NODE, PLAN_EXECUTOR_NODE);

普通边就像工厂流水线的传送带:A做完后必然传给B,没有任何选择余地。

条件边(addConditionalEdges):根据运行时的状态动态决定走向

// 意图识别后,可能去证据召回,也可能直接结束(如果是闲聊)
stateGraph.addConditionalEdges(INTENT_RECOGNITION_NODE, 
        edge_async(new IntentRecognitionDispatcher()),
        Map.of(EVIDENCE_RECALL_NODE, EVIDENCE_RECALL_NODE, END, END))

// Schema召回后,可能去表关系分析,也可能结束
.addConditionalEdges(SCHEMA_RECALL_NODE, edge_async(new SchemaRecallDispatcher()),
        Map.of(TABLE_RELATION_NODE, TABLE_RELATION_NODE, END, END))

// 表关系分析后,可能成功进入可行性评估,也可能失败重试,还可能结束
.addConditionalEdges(TABLE_RELATION_NODE, edge_async(new TableRelationDispatcher()),
        Map.of(FEASIBILITY_ASSESSMENT_NODE, FEASIBILITY_ASSESSMENT_NODE, 
               END, END, 
               TABLE_RELATION_NODE, TABLE_RELATION_NODE)); // retry

条件边就像铁路道岔:列车开到岔路口,根据信号(运行时状态)决定走哪条轨道。

8.2.4 条件边的"大脑":Dispatcher

条件边之所以能"做选择",是因为有一个**Dispatcher(调度器)**在幕后决策。

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/IntentRecognitionDispatcher.java

public class IntentRecognitionDispatcher implements EdgeAction {
    @Override
    public String apply(OverAllState state) throws Exception {
        IntentRecognitionOutputDTO intent = StateUtil.getObjectValue(state, 
            INTENT_RECOGNITION_NODE_OUTPUT, IntentRecognitionOutputDTO.class);
        
        // 如果用户意图是"数据分析",走分析流程
        if (intent != null && intent.isDataAnalysis()) {
            return EVIDENCE_RECALL_NODE;
        }
        
        // 如果是闲聊,直接结束
        return END;
    }
}

这个Dispatcher就像一个智能道岔工

  • • 它查看当前状态(State)中的意图识别结果
  • • 如果是数据分析请求 → 扳向"证据召回"轨道
  • • 如果是闲聊 → 扳向"结束"轨道

8.3 状态管理:工作流的"血液"

如果说节点是工作流的"器官",边是"血管",那么**State(状态)**就是流动的"血液"——携带养分(数据)输送到各个器官。

8.3.1 OverAllState:全局状态容器

DataAgent的所有节点共享一个OverAllState对象。每个节点可以读取自己需要的数据,也可以写入自己的输出结果。

public Map<String, Object> apply(OverAllState state) throws Exception {
    // 读取:从"血液"中获取养分
    String userInput = StateUtil.getStringValue(state, INPUT_KEY);
    
    // 处理...
    
    // 写入:把代谢产物排回"血液"
    return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, result);
}

8.3.2 KeyStrategy:状态合并的策略

多个节点可能对同一个key写入数据,怎么合并?DataAgent通过KeyStrategy来定义合并规则:

KeyStrategyFactory keyStrategyFactory = () -> {
    HashMap<String, KeyStrategy> keyStrategyHashMap = new HashMap<>();
    
    // 用户输入:新值直接替换旧值
    keyStrategyHashMap.put(INPUT_KEY, KeyStrategy.REPLACE);
    
    // 意图识别结果:新值直接替换旧值
    keyStrategyHashMap.put(INTENT_RECOGNITION_NODE_OUTPUT, KeyStrategy.REPLACE);
    
    // 计划当前步骤:新值直接替换旧值
    keyStrategyHashMap.put(PLAN_CURRENT_STEP, KeyStrategy.REPLACE);
    
    // SQL执行结果:新值直接替换旧值
    keyStrategyHashMap.put(SQL_EXECUTE_NODE_OUTPUT, KeyStrategy.REPLACE);
    
    return keyStrategyHashMap;
};

目前DataAgent主要使用REPLACE策略——新值覆盖旧值。这符合数据分析场景的特点:每个节点的输出都是最新结论,不需要保留历史版本。

类比:就像白板上的内容,写新的就把旧的擦掉,始终只保留最新信息。


8.4 工作流的执行:编译与运行

StateGraph定义好了"蓝图",但它还不能直接运行。需要编译成可执行的图。

8.4.1 编译图(Compile)

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java

public GraphServiceImpl(StateGraph stateGraph, ExecutorService executorService,
        MultiTurnContextManager multiTurnContextManager, LangfuseService langfuseReporter)
        throws GraphStateException {
    
    // 编译图,并在人工反馈节点前设置"中断点"
    this.compiledGraph = stateGraph.compile(
        CompileConfig.builder()
            .interruptBefore(HUMAN_FEEDBACK_NODE)
            .build()
    );
    // ...
}

编译会做两件事:

  1. 1. 校验图的合法性:检查是否有孤立的节点、是否有循环依赖
  2. 2. 生成执行计划:把图结构转换成可执行的运行时结构

interruptBefore(HUMAN_FEEDBACK_NODE)是一个关键配置——它告诉系统:"运行到人工反馈节点之前,先暂停下来,等待外部信号。"(我们将在第九章详细讲解这个机制。)

8.4.2 流式执行(Stream)

DataAgent采用流式执行模式,用户可以实时看到每个节点的输出:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java

private void handleNewProcess(GraphRequest graphRequest) {
    // 构建初始状态
    Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(
        Map.of(IS_ONLY_NL2SQL, nl2sqlOnly, 
               INPUT_KEY, query, 
               AGENT_ID, agentId, 
               HUMAN_REVIEW_ENABLED, humanReviewEnabled),
        RunnableConfig.builder().threadId(threadId).build()
    );
    
    // 订阅流,处理每个节点的输出
    nodeOutputFlux.subscribe(
        output -> handleNodeOutput(graphRequest, output),     // 正常输出
        error -> handleStreamError(agentId, threadId, error), // 错误处理
        () -> handleStreamComplete(agentId, threadId)         // 完成处理
    );
}

流式执行的好处:

  • • 即时反馈:用户不需要等整个流程跑完,每经过一个节点就能看到输出
  • • 可中断:用户随时可以喊"停"
  • • 资源友好:不需要等所有节点都完成才返回结果

8.5 工作流的容错设计

一个健壮的工作流必须能处理"意外情况"。DataAgent在工作流层面设计了多处容错机制:

8.5.1 循环重试

// 表关系分析失败后,可以回到自身重新执行
.addConditionalEdges(TABLE_RELATION_NODE, edge_async(new TableRelationDispatcher()),
        Map.of(FEASIBILITY_ASSESSMENT_NODE, FEASIBILITY_ASSESSMENT_NODE, 
               END, END, 
               TABLE_RELATION_NODE, TABLE_RELATION_NODE)) // 失败后重试

这就像工厂流水线上设置了"返工区"——质检不合格的产品会被送回前面重新加工。

8.5.2 最大重试次数

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/PlanExecutorDispatcher.java

private static final int MAX_REPAIR_ATTEMPTS = 2;

@Override
public String apply(OverAllState state) {
    int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
    if (repairCount > MAX_REPAIR_ATTEMPTS) {
        log.error("修复次数超过限制,终止执行");
        return END; // 不再重试,直接结束
    }
    return PLANNER_NODE; // 安排重试
}

无限重试可能导致系统卡死。设置上限就像给电梯装"防坠落安全钳"——最多下坠一段距离就会被强制制动。

8.5.3 错误传播与终止

// 如果任何节点抛出异常,流式订阅的错误回调会处理
nodeOutputFlux.subscribe(
    output -> handleNodeOutput(graphRequest, output),
    error -> handleStreamError(agentId, threadId, error), // 捕获异常
    () -> handleStreamComplete(agentId, threadId)
);

private void handleStreamError(String agentId, String threadId, Throwable error) {
    log.error("流处理出错,threadId: {}", threadId, error);
    // 清理资源、通知用户、结束流程
    stopStreamProcessing(threadId);
}

本章小结

  1. 1. 工作流编排是Agent的"舞台框架",负责协调各个节点何时上场、如何衔接。
  2. 2. StateGraph的四大要素
    • • 节点(Node):执行任务的单元
    • • 边(Edge):连接节点的路径
    • • 状态(State):节点间传递数据的载体
    • • 条件(Condition):动态决定走向的规则
  3. 3. DataAgent的工作流:16+个节点通过普通边和条件边连接成一张有向图,由Dispatcher根据运行时状态动态调度。
  4. 4. 状态管理OverAllState是全局共享状态容器,KeyStrategy.REPLACE定义了数据合并规则。
  5. 5. 编译与执行:StateGraph需要先编译成CompiledGraph,支持流式执行和断点暂停。
  6. 6. 容错设计:循环重试、最大重试次数限制、错误传播与优雅终止。

思考题

  1. 1. 流程设计题:假设你要设计一个"智能招聘Agent"的工作流,包含"简历解析→技能匹配→HR初筛→部门面试→发放Offer"等环节。请画出StateGraph的节点和边,并标注哪些用普通边、哪些用条件边。
  2. 2. 分析题:DataAgent中PYTHON_ANALYZE_NODE执行完成后,为什么会回到PLAN_EXECUTOR_NODE而不是直接到REPORT_GENERATOR_NODE?这个设计有什么好处?
  3. 3. 扩展思考:如果DataAgent的工作流中某个节点(比如SQL_EXECUTE_NODE)执行时间特别长(比如查询了上亿行数据),会阻塞整个工作流吗?为什么?(提示:思考dbOperationExecutor线程池的作用)

 



第九章:人在回路——当Agent需要"请示领导"

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

开篇:自动驾驶的"手动接管"按钮

想象你正在驾驶一辆具备自动驾驶功能的汽车:

高速公路上,车辆平稳地自动行驶。突然,前方出现了一起严重交通事故,车道被杂物堵塞,导航地图也没有更新这条信息。

这时,车内响起提示音:"前方路况复杂,请驾驶员接管方向盘。"

你立刻握住方向盘,观察情况后决定减速变道,安全通过了事故路段。

这个场景里,人类驾驶员在关键时刻介入了决策过程。这不是因为自动驾驶技术不够好,而是因为:

  • • 有些场景超出算法的训练范围
  • • 安全攸关的决策需要人类把关
  • • 人类拥有算法不具备的常识和判断力

在AI Agent的世界里,这种"关键时刻让人类介入"的机制叫做 Human-in-the-Loop(人在回路)


9.1 为什么需要人在回路

9.1.1 Agent不是万能的

尽管DataAgent能自动完成意图识别、SQL生成、数据分析等复杂任务,但它在某些场景下仍然需要人类的智慧:

场景为什么需要人类
计划审批Agent制定的分析计划可能不符合业务逻辑,需要业务专家确认
敏感数据涉及财务、人事、客户隐私的查询,需要权限审批
高成本操作生成大规模报表、调用付费API,需要确认预算
模糊需求用户的问题本身不够清晰,需要人工澄清
异常结果Agent返回的结果明显不合理,需要人工判断是数据问题还是逻辑问题

9.1.2 "全自动化"的风险

假设DataAgent没有任何人工审核环节:

用户问:"删除上个月所有测试订单。"

Agent理解有误,生成了 DELETE FROM orders WHERE month = '2024-01'——删除了所有1月份的订单,包括正式订单!

如果没有人在回路,这个错误会立即执行,造成不可逆的损失。

有了人在回路:

Agent生成SQL后暂停,把计划展示给用户:"我将执行以下操作:删除orders表中2024年1月的所有记录,预计影响15,234条数据。请确认。"

用户一看:"等等,我只想让删测试订单!"——灾难避免了。


9.2 DataAgent的人在回路设计

DataAgent在计划执行前设置了人工审核环节,让用户有机会查看和修改Agent制定的分析计划。

9.2.1 整体流程

用户提问
    ↓
[意图识别] → [证据召回] → [查询增强] → ... → [计划生成]
    ↓
[计划执行器] 检查:是否启用了人工复核?
    ↓
    是 ──────────────────────────────→ [人工反馈节点] 暂停,等待用户
    │                                        ↓
    │                              用户查看计划,选择:
    │                              ├─ 同意 → 继续执行
    │                              ├─ 拒绝+反馈 → 重新生成计划
    │                              └─ 不处理 → 保持暂停
    │                                        ↓
    └──────────────────────────────────── [SQL生成] → ...

9.2.2 启用人工复核

用户在发起请求时,可以通过参数开启人工复核:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java

private void handleNewProcess(GraphRequest graphRequest) {
    boolean nl2sqlOnly = graphRequest.isNl2sqlOnly();
    // 只有当不是纯SQL生成模式时,才允许人工复核
    boolean humanReviewEnabled = graphRequest.isHumanFeedback() && !(nl2sqlOnly);
    
    Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(
        Map.of(
            // ... 其他参数
            HUMAN_REVIEW_ENABLED, humanReviewEnabled  // 把用户的选择传入State
        ),
        RunnableConfig.builder().threadId(threadId).build()
    );
}

9.2.3 计划执行器的"分岔口"

PlanExecutorNode在运行时会检查HUMAN_REVIEW_ENABLED标志:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PlanExecutorNode.java

@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
    // ... 验证计划合法性 ...
    
    // 关键判断:是否开启人工复核?
    Boolean humanReviewEnabled = state.value(HUMAN_REVIEW_ENABLED, false);
    if (Boolean.TRUE.equals(humanReviewEnabled)) {
        log.info("人工复核已启用:路由到人工反馈节点");
        return Map.of(PLAN_VALIDATION_STATUS, true, 
                      PLAN_NEXT_NODE, HUMAN_FEEDBACK_NODE);
    }
    
    // 未启用复核,直接继续执行计划
    // ...
}

这就像高速公路上的检查站

  • • 普通车辆(未开启复核)→ 直接放行
  • • 危险品运输车(开启复核)→ 必须停车检查,确认安全后才能继续

9.3 人工反馈节点:HumanFeedbackNode

当流程走到HumanFeedbackNode时,整个工作流会暂停,等待用户的输入。

9.3.1 节点的核心逻辑

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/HumanFeedbackNode.java

@Slf4j
@Component
public class HumanFeedbackNode implements NodeAction {

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        Map<String, Object> updated = new HashMap<>();

        // 检查最大修复次数,防止无限循环
        int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
        if (repairCount >= 3) {
            log.warn("最大修复次数(3次)已达,结束流程");
            updated.put("human_next_node", "END");
            return updated;
        }

        // 检查是否有用户反馈数据
        Map<String, Object> feedbackData = StateUtil.getObjectValue(
            state, HUMAN_FEEDBACK_DATA, Map.class, Map.of());
        
        if (feedbackData.isEmpty()) {
            // 没有反馈 → 等待用户
            updated.put("human_next_node", "WAIT_FOR_FEEDBACK");
            return updated;
        }

        // 处理反馈结果
        Object approvedValue = feedbackData.getOrDefault("feedback", true);
        boolean approved = approvedValue instanceof Boolean approvedBoolean 
            ? approvedBoolean 
            : Boolean.parseBoolean(approvedValue.toString());

        if (approved) {
            // 用户同意 → 继续执行
            log.info("计划已批准 → 执行");
            updated.put("human_next_node", PLAN_EXECUTOR_NODE);
            updated.put(HUMAN_REVIEW_ENABLED, false); // 关闭复核,避免再次暂停
        }
        else {
            // 用户拒绝 → 重新生成计划
            log.info("计划被拒绝 → 重新生成(第{}次)", repairCount + 1);
            updated.put("human_next_node", PLANNER_NODE);
            updated.put(PLAN_REPAIR_COUNT, repairCount + 1);
            updated.put(PLAN_CURRENT_STEP, 1);
            updated.put(HUMAN_REVIEW_ENABLED, true);

            // 保存用户的反馈内容,供PlannerNode参考
            String feedbackContent = feedbackData.getOrDefault("feedback_content", "").toString();
            updated.put(PLAN_VALIDATION_ERROR, 
                StringUtils.hasLength(feedbackContent) ? feedbackContent : "计划被用户拒绝");
            // 清空旧的计划输出
            updated.put(PLANNER_NODE_OUTPUT, "");
        }

        return updated;
    }
}

这段代码处理了三种状态:

状态处理逻辑
首次到达,无反馈返回WAIT_FOR_FEEDBACK,工作流暂停
用户批准关闭复核标志,路由回计划执行器继续执行
用户拒绝增加修复计数,清空旧计划,路由回计划生成器重新制定

9.3.2 中断与恢复机制

DataAgent使用CompiledGraph.interruptBefore()来实现"暂停":

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java

this.compiledGraph = stateGraph.compile(
    CompileConfig.builder()
        .interruptBefore(HUMAN_FEEDBACK_NODE)  // 在人工反馈节点前设置断点
        .build()
);

这就像在乐谱上标记了一个休止符:演奏到这里暂停,等待指挥家给出继续的手势。

9.3.3 用户反馈的传递

当用户在前端看到计划并做出选择后,系统通过handleHumanFeedback方法恢复工作流:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java

private void handleHumanFeedback(GraphRequest graphRequest) {
    String threadId = graphRequest.getThreadId();
    String feedbackContent = graphRequest.getHumanFeedbackContent();
    
    // 构建反馈数据
    Map<String, Object> feedbackData = Map.of(
        "feedback", !graphRequest.isRejectedPlan(),      // 是否批准
        "feedback_content", feedbackContent              // 用户的具体意见
    );
    
    // 更新State
    Map<String, Object> stateUpdate = new HashMap<>();
    stateUpdate.put(HUMAN_FEEDBACK_DATA, feedbackData);
    
    // 把反馈写入图的当前状态
    RunnableConfig baseConfig = RunnableConfig.builder().threadId(threadId).build();
    RunnableConfig updatedConfig = compiledGraph.updateState(baseConfig, stateUpdate);
    
    // 恢复执行
    RunnableConfig resumeConfig = RunnableConfig.builder(updatedConfig)
        .addMetadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY, feedbackData)
        .build();
    
    Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(null, resumeConfig);
    // ... 订阅流,继续处理
}

这个过程就像:

  1. 1. 暂停:导演喊"卡",演员停在原地
  2. 2. 沟通:导演走过去告诉演员哪里需要调整
  3. 3. 恢复:导演喊"Action",演员从刚才暂停的地方继续

关键点是threadId——它确保恢复的是同一个"会话",而不是新开一个流程。


9.4 HumanFeedbackDispatcher:反馈后的路由

HumanFeedbackNode处理完用户的反馈后,还需要一个Dispatcher来决定下一步去哪:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/HumanFeedbackDispatcher.java

public class HumanFeedbackDispatcher implements EdgeAction {

    @Override
    public String apply(OverAllState state) throws Exception {
        String nextNode = (String) state.value("human_next_node", END);

        // 如果是等待反馈状态,返回END让图暂停
        if ("WAIT_FOR_FEEDBACK".equals(nextNode)) {
            return END;
        }

        return nextNode;
    }
}

这个Dispatcher很简单:

  • • 如果还在等反馈 → 返回END(实际上工作流已经暂停,这里的END只是优雅地结束当前步骤)
  • • 如果用户已批准 → 返回PLAN_EXECUTOR_NODE,继续执行
  • • 如果用户已拒绝 → 返回PLANNER_NODE,重新生成计划

9.5 人在回路的设计原则

通过DataAgent的实现,我们可以总结出人在回路设计的几个关键原则:

原则1:选择合适的介入点

不是所有环节都需要人类介入。DataAgent选择在计划执行前暂停,因为:

  • • 太早(如意图识别后):用户还没看到Agent要做什么,无法有效判断
  • • 太晚(如SQL执行后):错误已经造成,无法挽回
  • • 刚刚好(计划生成后):用户能完整看到Agent的"作战方案",决定要不要执行

原则2:提供清晰的上下文

让人类做决策时,必须提供足够的上下文信息。DataAgent在暂停时会通过SSE流把计划内容推送给前端,让用户看到:

  • • Agent打算执行哪些步骤
  • • 每个步骤用什么工具
  • • 预计会产生什么结果

原则3:支持"拒绝+反馈"

人类说"不"的时候,通常伴随着"为什么"。DataAgent允许用户:

  • • 拒绝计划
  • • 填写具体反馈(如"不要查2023年的数据,只看2024年")
  • • Agent根据反馈重新生成计划

这比简单的"通过/拒绝"更有价值。

原则4:防止无限循环

if (repairCount >= 3) {
    log.warn("最大修复次数(3次)已达,结束流程");
    updated.put("human_next_node", "END");
    return updated;
}

如果用户和Agent"杠上了"——用户一直拒绝,Agent一直重新生成——系统会在3次后强制结束,避免无限循环。


本章小结

  1. 1. 人在回路(Human-in-the-Loop) 是在关键环节引入人类判断的机制,用于弥补AI的局限性、规避安全风险。
  2. 2. DataAgent的HITL设计:在计划执行前设置暂停点,让用户审核Agent制定的分析计划。
  3. 3. 核心组件
    • • GraphRequest.humanFeedback:用户决定是否启用复核
    • • PlanExecutorNode:检查复核标志,决定是否路由到人工节点
    • • HumanFeedbackNode:处理三种状态(等待反馈/批准/拒绝)
    • • interruptBefore():实现工作流的暂停与恢复
    • • threadId:确保恢复的是同一个会话
  4. 4. 设计原则:选择合适介入点、提供清晰上下文、支持拒绝+反馈、防止无限循环。

思考题

  1. 1. 场景分析:假设一个医疗诊断Agent,在哪些环节应该设置人在回路?为什么?(提示:考虑误诊风险、患者隐私、治疗方案选择等因素)
  2. 2. 代码分析:在HumanFeedbackNode中,如果用户拒绝了计划但feedback_content为空,系统会怎么处理?PlannerNode重新生成计划时,能利用到什么信息?
  3. 3. 设计题:DataAgent目前只在"计划执行前"设置了一个暂停点。如果你是产品经理,还会在哪里增加人在回路?请说明理由和具体实现思路。

 



第十章:安全与容错——让Agent"稳中求胜"

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

黑箱 vs 白箱——你信任一个你看不见的系统吗?

想象一下,你走进一家餐厅,点了一道菜。厨师在封闭的厨房里忙碌,你看不见他用了什么食材、怎么烹饪、花了多长时间。最后端上来一盘菜——好吃,但你完全不知道为什么好吃。如果下次再来,味道变了,你也不知道问题出在哪里。

这就是黑箱系统:输入进去,输出出来,中间发生了什么,一无所知。

再想象另一家餐厅,厨房是透明的玻璃房。你能看见厨师切菜、炒菜、调味,甚至能闻到香味。如果哪道菜咸了,你一眼就能看出是盐放多了。

这就是白箱系统:整个过程透明可见,每个环节都可追溯、可理解、可调试。

Agent系统,尤其是基于大语言模型(LLM)的Agent,天然带有"黑箱"属性:

  • • LLM的输出有随机性,同样的输入可能得到不同的结果
  • • Agent的决策链条很长,涉及多个节点、多次工具调用
  • • 出了问题很难定位:是Prompt写错了?是工具返回了错误数据?还是LLM" hallucination"(幻觉)了?

可观测性(Observability) 就是让黑箱变白箱的能力。它不是可有可无的"锦上添花",而是生产级Agent系统的必备基础设施

本章将带你理解可观测性的三大支柱,看看DataAgent是如何实现全方位观测的,以及如何在实际开发中调试Agent系统。


为什么Agent特别需要可观测性

1. LLM的随机性让行为难以预测

传统软件是确定性的:输入A,永远输出B。但LLM是概率性的:输入A,可能输出B、C或D,每次都有细微差别。

这种随机性带来几个挑战:

  • • 难以复现:用户报告了一个bug,你用同样的输入测试,可能无法复现
  • • 难以调试:不知道LLM为什么会给出某个答案
  • • 难以优化:不清楚哪个环节是性能瓶颈

2. Agent的决策链条长且复杂

一个典型的Agent执行流程可能涉及:

  1. 1. 意图识别(Intent Recognition)
  2. 2. 知识检索(RAG)
  3. 3. 计划生成(Planning)
  4. 4. 多轮工具调用(Tool Use)
  5. 5. 结果汇总与报告生成

每个环节都可能出错。如果没有观测手段,你只能在最后看到"答案错了",却不知道错在哪一步。

3. 生产环境需要快速定位问题

当Agent系统上线后,可能遇到各种问题:

  • • 响应变慢了,是LLM API延迟增加?还是某个工具执行超时?
  • • 答案质量下降,是检索召回率降低?还是Prompt被污染了?
  • • 某个用户反馈Agent"疯了",你需要快速查看完整的执行轨迹

没有观测,就没有诊断。没有诊断,就没有修复。


可观测性的三大支柱

业界将可观测性总结为三大支柱(Three Pillars of Observability):

第一支柱:日志(Logging)——记录发生了什么

日志是系统运行过程中产生的文本记录,就像飞机的"黑匣子",记录了每个关键时刻发生了什么。

日志的核心作用是:

  • • 事后追溯:出问题后查看日志,还原现场
  • • 状态记录:记录关键变量的值、决策结果
  • • 异常捕获:记录错误堆栈和异常信息

DataAgent中的日志示例:

// GraphServiceImpl.java
log.info("Stream processing completed successfully for threadId: {}", threadId);
log.error("Error in stream processing for threadId: {}: ", threadId, error);
log.debug("Received Stream output: {}", chunk);

这三行日志分别对应:

  • • INFO:正常流程的关键节点(流处理完成)
  • • ERROR:异常和错误(流处理出错)
  • • DEBUG:详细调试信息(收到流式输出片段)

日志级别小贴士:生产环境通常开启INFO级别,开发环境可以开启DEBUG级别。ERROR级别必须记录足够的信息用于排查问题。

第二支柱:指标(Metrics)——量化性能表现

指标是将系统行为量化为数字,用于监控和告警。

DataAgent通过Langfuse记录了详细的LLM指标:

// LangfuseService.java
private static final AttributeKey<Long> GEN_AI_PROMPT_TOKENS = 
    AttributeKey.longKey("gen_ai.usage.prompt_tokens");
private static final AttributeKey<Long> GEN_AI_COMPLETION_TOKENS = 
    AttributeKey.longKey("gen_ai.usage.completion_tokens");
private static final AttributeKey<Long> GEN_AI_TOTAL_TOKENS = 
    AttributeKey.longKey("gen_ai.usage.total_tokens");

这些指标会被自动上报到Langfuse平台,你可以:

  • • 查看每次请求的Token消耗
  • • 分析哪个节点最"费Token"
  • • 监控成本趋势,及时发现异常

第三支柱:追踪(Tracing)——跟踪请求流转路径

追踪是记录一个请求在系统中完整的流转路径,就像快递的物流轨迹。

一个用户请求进入Agent系统后,可能经过:

用户请求 → GraphController → GraphService → IntentRecognitionNode → 
EvidenceRecallNode → PlannerNode → SqlGenerateNode → SqlExecuteNode → 
ReportGeneratorNode → 返回用户

分布式追踪能记录每个环节的:

  • • 开始时间和结束时间(计算耗时)
  • • 输入和输出(查看数据变化)
  • • 是否出错(错误类型和堆栈)

DataAgent使用OpenTelemetry实现分布式追踪:

// OpenTelemetryConfig.java
@Bean
public OpenTelemetry openTelemetry() {
    // 创建 OTLP HTTP 导出器,将追踪数据发送到 Langfuse
    OtlpHttpSpanExporter spanExporter = OtlpHttpSpanExporter.builder()
        .setEndpoint(host + "/api/public/otel/v1/traces")
        .addHeader("Authorization", "Basic " + encodedAuth)
        .setTimeout(10, TimeUnit.SECONDS)
        .build();

    // 配置资源信息(服务名称等)
    Resource resource = Resource.getDefault()
        .merge(Resource.create(Attributes.of(
            AttributeKey.stringKey("service.name"), SERVICE_NAME)));

    // 创建TracerProvider并注册Span处理器
    tracerProvider = SdkTracerProvider.builder()
        .addSpanProcessor(BatchSpanProcessor.builder(spanExporter)
            .setScheduleDelay(1, TimeUnit.SECONDS)
            .setMaxExportBatchSize(100)
            .build())
        .setResource(resource)
        .build();

    return OpenTelemetrySdk.builder()
        .setTracerProvider(tracerProvider)
        .build();
}

这段代码做了三件事:

  1. 1. 配置导出器:将追踪数据发送到Langfuse的OTLP接收端点
  2. 2. 配置资源:标识这是"data-agent"服务的数据
  3. 3. 批处理:每1秒或每100个Span批量发送,减少网络开销

DataAgent的可观测性设计

DataAgent的可观测性体系可以用一张图概括:

┌─────────────────────────────────────────────────────────────┐
│                      DataAgent 可观测性体系                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   用户请求                                                    │
│      │                                                       │
│      ▼                                                       │
│   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│   │   日志系统   │    │   指标系统   │    │   追踪系统   │     │
│   │  (Slf4j)    │    │ (Langfuse)  │    │(OpenTelemetry)│    │
│   └──────┬──────┘    └──────┬──────┘    └──────┬──────┘     │
│          │                  │                  │            │
│          ▼                  ▼                  ▼            │
│   ┌─────────────────────────────────────────────────────┐   │
│   │              Langfuse 可视化平台                      │   │
│   │  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│   │  │ 追踪链路 │  │ Token统计│  │ 延迟分析 │             │   │
│   │  │ (Traces)│  │ (Usage) │  │(Latency)│             │   │
│   │  └─────────┘  └─────────┘  └─────────┘             │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Langfuse集成:追踪每次LLM调用

Langfuse是一个开源的LLM可观测性平台,DataAgent通过OpenTelemetry协议与其集成。

LangfuseService是DataAgent的核心观测组件:

// LangfuseService.java(简化版)
@Component
public class LangfuseService {
    
    private final Tracer tracer;
    private final boolean enabled;
    
    // 按 threadId 隔离的 Token 累计器
    private static final ConcurrentHashMap<String, long[]> TOKEN_ACCUMULATOR = 
        new ConcurrentHashMap<>();

    /**
     * 开始一个追踪 Span
     */
    public Span startLLMSpan(String spanName, GraphRequest request) {
        Span span = tracer.spanBuilder(spanName)
            .setSpanKind(SpanKind.CLIENT)
            .setParent(Context.current())
            .startSpan();
        
        // 记录请求上下文
        span.setAttribute("input.value", request.getQuery());
        span.setAttribute("data_agent.agent_id", request.getAgentId());
        span.setAttribute("data_agent.thread_id", request.getThreadId());
        
        // 初始化 Token 累计器
        TOKEN_ACCUMULATOR.put(request.getThreadId(), new long[] { 0, 0 });
        
        return span;
    }

    /**
     * 累计 Token 用量(线程安全)
     */
    public static void accumulateTokens(Object threadId, long promptTokens, long completionTokens) {
        long[] tokens = TOKEN_ACCUMULATOR.get(threadId);
        if (tokens != null) {
            synchronized (tokens) {
                tokens[0] += promptTokens;      // Prompt Tokens
                tokens[1] += completionTokens;  // Completion Tokens
            }
        }
    }

    /**
     * 结束 Span(成功)
     */
    public void endSpanSuccess(Span span, String threadId, String output) {
        span.setAttribute("output.value", output);
        // 写入累计的 Token 数据
        applyAccumulatedTokens(span, threadId);
        span.setStatus(StatusCode.OK);
        span.end();
    }
}

这段代码展示了几个关键设计:

  1. 1. Span生命周期管理startLLMSpan创建追踪,endSpanSuccess/endSpanError结束追踪
  2. 2. 线程安全的Token累计:使用ConcurrentHashMap + synchronized数组,支持多线程并发
  3. 3. 丰富的上下文信息:记录query、agentId、threadId等,方便后续检索

FluxUtil中的Token累积统计

DataAgent的流式处理中,每次LLM响应都会经过FluxUtil,这里会提取并累计Token:

// FluxUtil.java(关键片段)
private static void extractAndAccumulateTokens(Object threadId, ChatResponse response) {
    if (threadId == null || response.getMetadata() == null) {
        return;
    }
    Usage usage = response.getMetadata().getUsage();
    if (usage != null && (usage.getPromptTokens() > 0 || usage.getCompletionTokens() > 0)) {
        // 将本次调用的 Token 用量累计到 Langfuse
        LangfuseService.accumulateTokens(
            threadId, 
            usage.getPromptTokens(), 
            usage.getCompletionTokens()
        );
    }
}

这个方法在每次收到LLM响应时自动调用,实现了无侵入式的指标采集——业务代码不需要关心Token统计,FluxUtil自动完成。

StreamContext:流式处理的上下文管理

每个用户请求在DataAgent中都有一个独立的StreamContext,封装了所有相关状态:

// StreamContext.java(简化版)
@Data
public class StreamContext {
    private Disposable disposable;        // 用于取消订阅
    private Sinks.Many<ServerSentEvent<GraphNodeResponse>> sink;  // 流式输出通道
    private Span span;                    // OpenTelemetry 追踪 Span
    private TextType textType;            // 当前文本类型
    
    // 收集流式输出内容,用于 Langfuse 上报
    private final StringBuilder outputCollector = new StringBuilder();
    
    // 线程安全的清理标记
    private final AtomicBoolean cleaned = new AtomicBoolean(false);

    public void appendOutput(String chunk) {
        outputCollector.append(chunk);
    }

    public String getCollectedOutput() {
        return outputCollector.toString();
    }

    /**
     * 线程安全的资源清理
     */
    public void cleanup() {
        // 使用 CAS 确保只执行一次
        if (!cleaned.compareAndSet(false, true)) {
            return;
        }
        // 清理 Disposable 和 Sink...
    }
}

StreamContext的设计亮点:

  • • 按threadId隔离:每个对话有独立的上下文,互不干扰
  • • 自动收集输出outputCollector累积所有输出,用于最终上报
  • • 线程安全清理AtomicBoolean + compareAndSet确保资源只释放一次,避免重复清理导致的异常

流式输出与实时反馈

SSE技术:让用户"看见"Agent思考

传统的HTTP请求是"一问一答":客户端发送请求,服务端处理完成后一次性返回结果。但对于Agent系统,用户可能需要等待几秒甚至几十秒,这段时间页面一片空白,体验很差。

SSE(Server-Sent Events,服务器发送事件) 解决了这个问题。它允许服务端在请求处理过程中,实时推送中间结果给客户端。

DataAgent的SSE端点实现:

// GraphController.java(简化版)
@RestController
public class GraphController {

    @GetMapping(value = "/api/stream/search", 
                produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<GraphNodeResponse>> streamSearch(
            @RequestParam("agentId") String agentId,
            @RequestParam("query") String query,
            ServerHttpResponse response) {
        
        // 设置 SSE 相关的 HTTP 头
        response.getHeaders().add("Cache-Control", "no-cache");
        response.getHeaders().add("Connection", "keep-alive");
        
        // 创建 Sink(流式数据通道)
        Sinks.Many<ServerSentEvent<GraphNodeResponse>> sink = 
            Sinks.many().unicast().onBackpressureBuffer();
        
        // 构建请求并启动图处理
        GraphRequest request = GraphRequest.builder()
            .agentId(agentId)
            .query(query)
            .build();
        graphService.graphStreamProcess(sink, request);
        
        // 返回 Flux,Spring 自动处理 SSE 格式
        return sink.asFlux()
            .doOnSubscribe(subscription -> 
                log.info("Client subscribed to stream"))
            .doOnCancel(() -> {
                log.info("Client disconnected");
                graphService.stopStreamProcessing(threadId);
            })
            .doOnError(error -> {
                log.error("Stream error", error);
                graphService.stopStreamProcessing(threadId);
            })
            .doOnComplete(() -> 
                log.info("Stream completed successfully"));
    }
}

这段代码的关键点:

  1. 1. produces = MediaType.TEXT_EVENT_STREAM_VALUE:声明这是一个SSE端点
  2. 2. Sinks.Many:Project Reactor的流式数据通道,支持多生产者单消费者
  3. 3. 生命周期钩子doOnSubscribe(客户端连接)、doOnCancel(客户端断开)、doOnError(出错)、doOnComplete(完成)
  4. 4. 自动清理:客户端断开时,自动停止对应的流处理,释放资源

GraphNodeResponse:流式数据的封装

每个推送给前端的数据包都封装为GraphNodeResponse

// GraphNodeResponse.java
@Data
@Builder
public class GraphNodeResponse {
    private String agentId;      // Agent 标识
    private String threadId;     // 对话线程标识
    private String nodeName;     // 当前节点名称(如"PlannerNode")
    private TextType textType;   // 文本类型(如思考过程、SQL代码、最终结果)
    private String text;         // 实际内容
    
    @Builder.Default
    private boolean error = false;    // 是否出错
    
    @Builder.Default
    private boolean complete = false; // 是否完成

    // 便捷工厂方法
    public static GraphNodeResponse error(String agentId, String threadId, String text) { ... }
    public static GraphNodeResponse complete(String agentId, String threadId) { ... }
}

前端收到这样的数据流:

event: message
data: {"nodeName":"IntentRecognitionNode","text":"用户想查询销售额","textType":"THINKING"}

event: message
data: {"nodeName":"PlannerNode","text":"计划:1.生成SQL 2.执行查询","textType":"PLAN"}

event: message
data: {"nodeName":"SqlGenerateNode","text":"SELECT SUM(amount) FROM orders","textType":"SQL"}

event: message
data: {"nodeName":"SqlExecuteNode","text":"查询结果:1,234,567元","textType":"RESULT"}

event: complete
data: {"complete":true}

用户能实时看到Agent在每个节点做了什么,而不是干等最后结果。这种透明感极大提升了用户体验和信任度。


调试技巧实战

技巧一:查看每个Node的输入输出状态

DataAgent的日志系统记录了每个节点的执行:

// GraphServiceImpl.java
private void handleNodeOutput(GraphRequest request, NodeOutput output) {
    log.debug("Received output: {}", output.getClass().getSimpleName());
    if (output instanceof StreamingOutput streamingOutput) {
        String node = output.node();
        String chunk = output.chunk();
        log.debug("Node: {}, Chunk: {}", node, chunk);
        // ...
    }
}

调试方法:开启DEBUG级别日志,查看完整的节点输出流。

技巧二:分析Prompt和LLM响应

在Langfuse平台上,你可以查看每次LLM调用的完整信息:

  • • 输入Prompt:实际发送给LLM的内容
  • • 输出响应:LLM返回的原始文本
  • • Token用量:Prompt Token数、Completion Token数
  • • 延迟:从请求到响应的时间

如果Agent输出异常,首先检查:

  1. 1. Prompt是否正确拼接?变量是否被正确替换?
  2. 2. LLM响应是否包含异常内容?
  3. 3. Token用量是否超限?

技巧三:追踪执行路径

通过OpenTelemetry的Trace ID,你可以在Langfuse中查看完整的执行链路:

Trace: graph-stream (threadId: abc-123)
├── Span: IntentRecognitionNode (200ms)
├── Span: EvidenceRecallNode (350ms)
├── Span: PlannerNode (800ms)
├── Span: SqlGenerateNode (1200ms)
├── Span: SqlExecuteNode (150ms)
└── Span: ReportGeneratorNode (600ms)

一眼就能看出:SqlGenerateNode耗时最长(1.2秒),是性能瓶颈,可能需要优化Prompt或更换更快的模型。

技巧四:本地快速验证

DataAgent支持nl2sqlOnly模式,只执行核心链路,方便快速验证:

// GraphServiceImpl.java
@Override
public String nl2sql(String naturalQuery, String agentId) {
    OverAllState state = compiledGraph
        .invoke(Map.of(IS_ONLY_NL2SQL, true, INPUT_KEY, naturalQuery, AGENT_ID, agentId),
                RunnableConfig.builder().build())
        .orElseThrow();
    return state.value(SQL_GENERATE_OUTPUT, "");
}

这个模式跳过了计划生成、报告生成等复杂节点,直接测试NL2SQL核心能力,适合本地调试。


本章小结

本章我们学习了Agent系统的可观测性设计:

  1. 1. 为什么需要可观测性:LLM的随机性、Agent决策链的复杂性、生产环境快速定位问题的需求
  2. 2. 三大支柱
    • • 日志:记录发生了什么(Slf4j)
    • • 指标:量化性能表现(Token、延迟、成功率)
    • • 追踪:跟踪请求流转路径(OpenTelemetry + Langfuse)
  3. 3. DataAgent的实践
    • • LangfuseService实现LLM调用追踪和Token统计
    • • FluxUtil自动提取并累计Token用量
    • • StreamContext管理每个请求的完整生命周期
    • • SSE流式输出让用户实时看到Agent思考过程
  4. 4. 调试技巧:查看节点输入输出、分析Prompt和响应、追踪执行路径、本地快速验证

记住:可观测性不是事后补丁,而是应该从项目第一天就开始设计的基础设施。


思考题

  1. 1. 场景题:你收到用户反馈说"Agent今天回答特别慢",你会如何通过DataAgent的可观测性体系定位问题?请按步骤说明你会查看哪些日志、指标和追踪信息。
  2. 2. 设计题:假设你要为一个客服Agent设计可观测性方案,除了本章提到的三大支柱,你认为还需要监控哪些业务指标?如何设计一个"答案质量评分"的指标?

 



第十一章:观测与调试——让Agent透明可控

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

电梯里的安全哲学

想象你走进一栋摩天大楼的电梯:

你按下100层,电梯平稳上升。突然,钢缆发出一声异响,电梯轻微晃动。

但下一秒,电梯并没有坠落——它稳稳地停在了最近的楼层,门自动打开,报警铃声响起,通风系统启动。

你安全地走了出来。虽然这次乘梯体验不算愉快,但你毫发无伤。

这背后是一套精密的安全系统在运作:

  • • 限速器:检测到速度异常立即制动
  • • 安全钳:钢缆断裂时卡住轨道
  • • 缓冲器:万一坠落,底部弹簧吸收冲击
  • • 备用电源:停电时维持照明和通风

安全系统的设计哲学是:假设最坏的情况一定会发生,然后提前准备好应对措施。

Agent系统也是如此。LLM可能生成错误的SQL,数据库可能连接失败,Python代码可能包含死循环。一个成熟的Agent必须在"出问题"时保护自己、保护用户、保护系统。


10.1 Agent系统的风险全景

在深入DataAgent的安全设计之前,让我们先看看Agent系统面临的主要风险:

风险类型具体表现危害程度
数据风险LLM生成DELETEDROP语句,误删数据🔴 极高
代码风险Python代码包含os.system("rm -rf /")🔴 极高
资源风险SQL查询全表扫描,拖垮数据库🟠 高
连接风险数据库密码错误、网络中断🟡 中
逻辑风险LLM理解错误,生成与意图不符的SQL🟡 中
服务风险节点无限重试,导致系统卡死🟠 高


DataAgent针对这些风险,设计了一套多层次的防御体系


10.2 第一层防御:输入验证与计划校验

10.2.1 PlanExecutorNode的计划验证

在计划执行前,DataAgent会严格检查计划是否合法:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PlanExecutorNode.java

@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
    // 1. 验证Plan是否能正确解析
    Plan plan;
    try {
        plan = PlanProcessUtil.getPlan(state);
    }
    catch (Exception e) {
        log.error("计划验证失败:JSON解析错误", e);
        return buildValidationResult(state, false,
            "验证失败:计划不是有效的JSON结构。错误:" + e.getMessage());
    }

    // 2. 验证计划结构是否完整
    if (!validateExecutionPlanStructure(plan)) {
        return buildValidationResult(state, false,
            "验证失败:生成的计划为空或没有执行步骤。");
    }

    // 3. 逐一验证每个执行步骤
    for (ExecutionStep step : plan.getExecutionPlan()) {
        String validationResult = validateExecutionStep(step);
        if (validationResult != null) {
            return buildValidationResult(state, false, validationResult);
        }
    }

    log.info("计划验证成功。");
    // ... 继续执行
}

这个验证过程就像机场安检

  • • 第一步:检查证件是否有效(JSON能否解析)
  • • 第二步:检查行李是否为空(计划是否有内容)
  • • 第三步:逐件检查行李内容(每个步骤的参数是否合法)

10.2.2 步骤级别的参数校验

private String validateExecutionStep(ExecutionStep step) {
    // 验证工具名称是否在白名单中
    if (step.getToolToUse() == null || !SUPPORTED_NODES.contains(step.getToolToUse())) {
        return "验证失败:计划包含非法工具名:'" + step.getToolToUse() + "'";
    }

    // 验证工具参数是否存在
    if (step.getToolParameters() == null) {
        return "验证失败:第" + step.getStep() + "步缺少工具参数";
    }

    // 根据节点类型,验证特定必填参数
    switch (step.getToolToUse()) {
        case SQL_GENERATE_NODE:
            if (!StringUtils.hasText(step.getToolParameters().getInstruction())) {
                return "验证失败:SQL生成节点缺少描述";
            }
            break;
        // ... 其他节点类似
    }

    return null; // 验证通过
}

白名单机制是关键:SUPPORTED_NODES只包含预定义的合法节点类型(SQL_GENERATE_NODEPYTHON_GENERATE_NODEREPORT_GENERATOR_NODE)。即使LLM"突发奇想"生成了一个不存在的节点名,也会被拦截。


10.3 第二层防御:语义一致性校验

10.3.1 SQL的"二次确认"

LLM生成的SQL语法上可能正确,但语义上可能与用户的真实意图不符。DataAgent设置了**SemanticConsistencyNode(语义一致性节点)**来做"二次确认"。

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SemanticConsistencyNode.java

@Slf4j
@Component
public class SemanticConsistencyNode implements NodeAction {

    private final Nl2SqlService nl2SqlService;

    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 获取用户原始问题、生成的SQL、数据库Schema
        String sql = StateUtil.getStringValue(state, SQL_GENERATE_OUTPUT);
        String userQuery = StateUtil.getCanonicalQuery(state);
        SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
        String evidence = StateUtil.getStringValue(state, EVIDENCE);

        // 构建语义校验请求
        SemanticConsistencyDTO dto = SemanticConsistencyDTO.builder()
            .dialect(dialect)
            .sql(sql)
            .executionDescription(getCurrentExecutionStepInstruction(state))
            .schemaInfo(buildMixMacSqlDbPrompt(schemaDTO, true))
            .userQuery(userQuery)
            .evidence(evidence)
            .build();

        // 调用LLM进行语义校验
        Flux<ChatResponse> validationResultFlux = nl2SqlService.performSemanticConsistency(dto);

        // 处理结果
        return FluxUtil.createStreamingGeneratorWithMessages(this.getClass(), state,
            "开始语义一致性校验", "语义一致性校验完成",
            validationResult -> {
                boolean isPassed = !validationResult.startsWith("不通过");
                return buildValidationResult(isPassed, validationResult);
            }, validationResultFlux);
    }
}

这个节点就像翻译的"回译校验"

  • • 用户说中文(业务问题)
  • • Agent翻译成SQL(第一轮翻译)
  • • SemanticConsistencyNode把SQL"回译"成业务语言
  • • 对比"回译结果"和"用户的原始问题"是否一致

如果不一致,说明SQL可能偏离了用户意图。

10.3.2 失败时的重试机制

当语义校验不通过时,DataAgent不会直接报错结束,而是记录原因、返回重试

private Map<String, Object> buildValidationResult(boolean passed, String validationResult) {
    if (passed) {
        return Map.of(SEMANTIC_CONSISTENCY_NODE_OUTPUT, true);
    }
    else {
        // 不通过时,记录失败原因,供SQL生成节点重新生成时使用
        return Map.of(
            SEMANTIC_CONSISTENCY_NODE_OUTPUT, false,
            SQL_REGENERATE_REASON, SqlRetryDto.semantic(validationResult)
        );
    }
}

SqlRetryDto是一个专门用于记录重试原因的数据结构:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/dto/datasource/SqlRetryDto.java

public record SqlRetryDto(String reason, boolean semanticFail, boolean sqlExecuteFail) {

    public static SqlRetryDto semantic(String reason) {
        return new SqlRetryDto(reason, true, false);  // 语义校验失败
    }

    public static SqlRetryDto sqlExecute(String reason) {
        return new SqlRetryDto(reason, false, true);  // SQL执行失败
    }

    public static SqlRetryDto empty() {
        return new SqlRetryDto("", false, false);     // 无失败
    }
}

这个设计很精巧:

  • • 语义失败semanticFail=true):LLM理解有误,需要重新生成SQL
  • • 执行失败sqlExecuteFail=true):SQL语法错误 or 数据库问题,需要修复SQL
  • • 原因分类:让后续节点知道"该怎么修"

10.4 第三层防御:代码沙箱

10.4.1 为什么需要沙箱

DataAgent允许LLM生成并执行Python代码。这功能很强大,但也极其危险:

# LLM"不小心"生成了这样的代码?
import os
os.system("rm -rf /")  # 删除服务器上所有文件!

# 或者
while True:
    pass  # 死循环,耗尽CPU

如果没有隔离机制,Agent就变成了黑客的帮凶

10.4.2 Docker沙箱:把危险关在笼子里

DataAgent使用Docker容器来执行Python代码,实现严格的资源隔离:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/code/impls/DockerCodePoolExecutorService.java

public class DockerCodePoolExecutorService extends AbstractCodePoolExecutorService {

    protected String createNewContainer() throws Exception {
        String containerName = this.generateContainerName();

        // 创建容器的主机配置(限制资源、挂载目录)
        HostConfig hostConfig = this.createHostConfig(tempDir);

        // 创建容器
        CreateContainerResponse container = dockerClient.createContainerCmd(properties.getImageName())
            .withName(containerName)
            .withWorkingDir("/app")
            .withHostConfig(hostConfig)
            .withCmd("sh", "-c", cmd)
            .exec();
        
        return container.getId();
    }
}

10.4.3 资源限制

private HostConfig createHostConfig(Path tempDir) {
    return newHostConfig()
        // 限制内存使用(防止内存耗尽)
        .withMemory(this.properties.getLimitMemory() * 1024L * 1024L)
        // 限制CPU核心数(防止CPU占满)
        .withCpuCount(this.properties.getCpuCore())
        // 丢弃所有Linux Capability(最小权限原则)
        .withCapDrop(Capability.ALL)
        // 使用临时文件系统,不持久化
        .withTmpFs(Map.of("/tmp", ""))
        // 网络隔离(可选)
        .withNetworkMode(this.properties.getNetworkMode());
}

这套限制就像监狱的安保系统

  • • 内存限制:囚犯(代码)只能使用指定的床铺大小
  • • CPU限制:囚犯只能占用指定的活动区域
  • • 权限剥离:囚犯没有任何特权(Capability.ALL全部丢弃)
  • • 网络隔离:囚犯不能与外界通信(根据配置)

10.4.4 执行超时

private String buildExecutionCommand(Path tempDir) {
    return String.format(
        "... timeout -s SIGKILL %s python3 -u script.py < input_data.txt",
        properties.getCodeTimeout()  // 超时时间(秒)
    );
}

即使代码陷入了死循环,timeout命令也会在指定时间后强制终止进程。

10.4.5 日志大小限制

private void appendWithLimit(StringBuilder builder, String payload, int limit) {
    if (builder.length() < limit) {
        builder.append(payload);
    }
    else if (builder.length() == limit) {
        builder.append("\n...[Output truncated due to size limit]...");
        builder.append(" "); // 防止重复进入
    }
}

即使代码疯狂输出日志(比如while True: print("ha")),日志大小也会被限制在5MB以内,防止耗尽磁盘空间。


10.5 第四层防御:错误码与优雅降级

10.5.1 数据库错误标准化

DataAgent定义了一套完整的错误码体系,把数据库层面的原始错误翻译成用户友好的提示:

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/ErrorCodeEnum.java

public enum ErrorCodeEnum {

    SUCCESS("0", "操作成功"),
    
    INVALID_PARAM("10", "无效参数"),
    
    // 连接类错误
    DATASOURCE_CONNECTION_FAILURE_08001("08001", 
        "无法建立数据库连接,请检查配置的IP/域名是否正确"),
    CONNECTION_LOST_08002("08002", "连接丢失:服务器关闭"),
    
    // 权限类错误
    INSUFFICIENT_PRIVILEGE_42501("42501", "权限不足"),
    PASSWORD_ERROR_28P01("28P01", "密码错误"),
    
    // 数据类错误
    DATABASE_NOT_EXIST_3D000("3D000", "数据库不存在"),
    SCHEMA_NOT_EXIST_3D070("3D070", "模式不存在"),
    
    // 兜底错误
    OTHERS("100", "未知错误,请联系技术人员排查");

    // 根据SQLState错误码查找对应的中文提示
    public static ErrorCodeEnum fromCode(String code) {
        for (ErrorCodeEnum rc : values()) {
            if (code.equals(rc.getCode())) {
                return rc;
            }
        }
        return OTHERS; // 找不到就返回兜底错误
    }
}

这套机制的好处:

  • • 用户友好:用户看到"密码错误"而不是一大串Java异常堆栈
  • • 问题定位:"08001"这样的错误码方便开发人员快速定位问题类型
  • • 统一处理:所有数据库错误都走同一套处理逻辑

10.5.2 线程池与优雅关闭

DataAgent使用专用线程池处理数据库操作,并在应用关闭时优雅地释放资源

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/config/DataAgentConfiguration.java

@Bean(name = "dbOperationExecutor")
public ExecutorService dbOperationExecutor() {
    int corePoolSize = Math.max(4, 
        Math.min(Runtime.getRuntime().availableProcessors() * 2, 16));
    
    return new ThreadPoolExecutor(
        corePoolSize, corePoolSize, 
        60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(500),  // 队列上限500个任务
        threadFactory, 
        new ThreadPoolExecutor.CallerRunsPolicy()  // 队列满时,调用者线程自己执行
    );
}

@Override
public void destroy() {
    if (dbOperationExecutor != null && !dbOperationExecutor.isShutdown()) {
        dbOperationExecutor.shutdown();  // 1. 停止接收新任务
        
        if (!dbOperationExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
            dbOperationExecutor.shutdownNow();  // 2. 超时后强制关闭
        }
    }
}

这套机制确保:

  • • 不会无限堆积任务:队列上限500,超过后由调用者线程自己执行(自然降速)
  • • 应用关闭不丢任务:先等60秒让已有任务完成,实在不行才强制关闭
  • • 资源不泄漏:线程池被正确关闭,不会导致内存泄漏

10.6 第五层防御:重试与熔断

10.6.1 计划修复的"三次法则"

当计划验证失败时,DataAgent允许最多2次修复(加上初始尝试,共3次):

// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/PlanExecutorDispatcher.java

private static final int MAX_REPAIR_ATTEMPTS = 2;

@Override
public String apply(OverAllState state) {
    boolean validationPassed = StateUtil.getObjectValue(state, PLAN_VALIDATION_STATUS, Boolean.class, false);

    if (!validationPassed) {
        int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
        
        if (repairCount > MAX_REPAIR_ATTEMPTS) {
            log.error("计划修复次数超过{}次限制,终止执行", MAX_REPAIR_ATTEMPTS);
            return END;  // 不再重试,结束流程
        }
        
        return PLANNER_NODE;  // 安排重试
    }
}

这遵循了工程界的**"三次法则"**:

  • • 第一次失败:可能是偶然,再试一次
  • • 第二次失败:可能有问题,再给它最后一次机会
  • • 第三次失败:系统性地不行,放弃并报告

10.6.2 表关系分析的循环重试

.addConditionalEdges(TABLE_RELATION_NODE, edge_async(new TableRelationDispatcher()),
        Map.of(FEASIBILITY_ASSESSMENT_NODE, FEASIBILITY_ASSESSMENT_NODE, 
               END, END, 
               TABLE_RELATION_NODE, TABLE_RELATION_NODE)) // 失败后回到自身重试

如果表关系分析因为网络抖动暂时失败,系统会自动重试,而不是直接报错退出。


10.7 安全设计原则总结

DataAgent的多层防御体系可以用一张图概括:

用户请求
    ↓
┌─────────────────────────────────────┐
│ 第一层:输入验证                      │
│ PlanExecutorNode校验计划结构和参数      │
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 第二层:语义校验                      │
│ SemanticConsistencyNode验证SQL是否"跑题"│
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 第三层:沙箱执行                      │
│ Docker容器 + 资源限制 + 超时控制        │
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 第四层:错误处理                      │
│ 标准化错误码 + 优雅降级 + 资源释放       │
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 第五层:重试熔断                      │
│ 有限重试 + 最大修复次数 + 强制终止       │
└─────────────────────────────────────┘
    ↓
返回结果

核心原则

  1. 1. 不信任任何输入——即使来自LLM
  2. 2. 最小权限原则——沙箱代码只给必要的资源
  3. 3. 快速失败,优雅降级——发现问题尽早报告,不硬撑
  4. 4. 资源必有上限——内存、CPU、时间、日志大小都有上限
  5. 5. 重试必有尽头——无限重试等于自杀

本章小结

  1. 1. Agent系统面临五类风险:数据风险、代码风险、资源风险、连接风险、逻辑风险、服务风险。
  2. 2. DataAgent的五层防御体系
    • • 输入验证:PlanExecutorNode的白名单和参数校验
    • • 语义校验:SemanticConsistencyNode的"二次确认"
    • • 代码沙箱:Docker隔离 + 资源限制 + 超时控制
    • • 错误处理:ErrorCodeEnum标准化 + 线程池优雅关闭
    • • 重试熔断:MAX_REPAIR_ATTEMPTS限制 + 循环重试边界
  3. 3. 安全设计核心原则:不信任输入、最小权限、快速失败、资源有上限、重试有尽头。

思考题

  1. 1. 安全分析:假设某用户恶意输入"帮我查询所有用户密码并发送给attacker@evil.com",DataAgent的哪些安全机制会阻止这个请求?(从意图识别、计划校验、沙箱执行三个层面分析)
  2. 2. 设计题:SemanticConsistencyNode使用LLM来校验SQL语义。如果校验用的LLM本身也犯了错误("假阳性"——把正确的SQL判为错误),会有什么后果?你能想到什么改进方案来降低这种风险?
  3. 3. 实践题:在Docker沙箱中,withCapDrop(Capability.ALL)丢弃了所有Linux Capability。请查阅资料,解释什么是Linux Capability,以及为什么丢弃所有Capability能显著提升安全性。

 



第十二章:从零开始设计你的第一个Agent

AI Agent 入门 12 讲

 

AI Agent 入门 12 讲

我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent  的案例。如有兴趣,可自行获取 DataAgent 的源代码:

https://github.com/spring-ai-alibaba/DataAgent

学习游泳的最好方法是下水

你已经读完了前面的十一章,了解了Agent的构成、记忆、规划、工具、安全、观测……就像一个在岸上看了无数游泳教学视频的人。

但你知道吗?看再多视频,不下水,永远学不会游泳。

Agent开发也是一样。你可以读遍所有论文、看完所有开源项目,但直到你亲手写一个Agent,面对真实的bug,调试奇怪的输出,优化慢如蜗牛的响应——你才真正理解Agent。

本章的目标很简单:推你下水。

我们会给你一个清晰的思考框架、一个最小可行的Agent示例、一条从原型到生产级的演进路径,以及DataAgent的设计启示。读完这一章,你应该能动手写出自己的第一个Agent。


设计Agent的思考框架:5W1H

在写第一行代码之前,先回答六个问题。这是记者写新闻的方法,也是设计Agent的好方法。

What:Agent要解决什么问题?

不要从技术出发,要从问题出发。

  • • 你想让Agent帮你查天气?写代码?分析数据?还是陪你聊天?
  • • 这个问题是否适合用Agent解决?(需要推理、多步骤、调用外部工具)
  • • 传统方案(规则引擎、固定流程)为什么不能很好解决?

DataAgent的答案:帮助企业用户通过自然语言查询数据库,自动生成SQL、执行分析、生成报告。传统BI工具需要学习成本,Agent让"说话就能分析"成为可能。

Who:目标用户是谁?

  • • 技术背景:用户是程序员还是业务人员?
  • • 使用场景:是日常办公辅助,还是关键业务决策?
  • • 容忍度:用户对错误答案的容忍度如何?(医疗Agent vs 天气Agent)

DataAgent的答案:企业中的数据分析师和业务人员。他们懂业务但可能不懂SQL,需要Agent bridge the gap(弥合鸿沟)。

When:什么时候需要人类介入?

这是人机协作设计的关键:

  • • 哪些决策Agent可以自主做?
  • • 哪些必须人工确认?(如删除数据、转账、发布内容)
  • • 什么情况下Agent应该"求助"人类?

DataAgent的答案:在计划生成后、执行前,设置HumanFeedbackNode。用户可以看到Agent的执行计划,确认或修改后再继续。这避免了Agent"一意孤行"执行危险操作。

Where:运行在什么环境?

  • • 部署方式:本地运行、私有服务器、公有云?
  • • 资源限制:有多少GPU/CPU?内存多大?
  • • 合规要求:数据能否出域?是否需要审计日志?

DataAgent的答案:企业私有部署,支持多种LLM(本地/云端)。数据不出域,所有操作有Langfuse追踪记录。

Why:为什么用Agent而不是传统方案?

诚实地回答这个问题。Agent不是银弹,有些场景传统方案更好

场景传统方案Agent方案
固定流程的审批工作流引擎过度设计
简单的数据查询SQL界面可以,但非必须
复杂的多步骤分析需要人工拆解Agent擅长
开放式问题回答搜索+规则Agent擅长

DataAgent的答案:自然语言到SQL的转换涉及意图理解、Schema匹配、SQL生成、结果解释——这是一个典型的多步骤推理任务,Agent比固定规则更灵活。

How:技术方案是什么?

最后才考虑技术。基于前面的答案,选择合适的技术栈:

  • • LLM选择:GPT-4、Claude、DeepSeek、本地模型?
  • • 框架选择:LangChain、Spring AI、原生API?
  • • 记忆方案:简单变量、数据库、向量数据库?
  • • 工具生态:需要哪些外部工具?如何集成?

DataAgent的答案:Spring AI Alibaba Graph(与Java生态集成)、支持动态切换LLM(热插拔)、H2/MySQL存储、向量检索+关键词检索混合。


最小可行Agent(MVP)

1个LLM + 1个工具 + 1个记忆 = 最简单的Agent

让我们从一个极简Agent开始。这个Agent能查天气,并根据天气给出穿衣建议。

# weather_agent.py - 极简天气Agent
import os
import json
from datetime import datetime

# 模拟LLM(实际项目中调用OpenAI/DeepSeek API)
class MockLLM:
    def chat(self, prompt):
        # 这里应该是实际的API调用
        # 为了演示,我们模拟一个简单推理
        if "rain" in prompt.lower() or "雨" in prompt:
            return "今天有雨,建议带伞,穿防水外套。"
        elif "cold" in prompt.lower() or "冷" in prompt:
            return "今天气温低,建议穿羽绒服。"
        else:
            return "今天天气不错,穿轻便衣物即可。"

# 工具:天气查询(模拟)
class WeatherTool:
    def get_weather(self, city):
        # 实际项目中调用天气API(如OpenWeatherMap、和风天气)
        mock_data = {
            "北京": {"temp": 5, "condition": "cold", "wind": "north 3级"},
            "上海": {"temp": 18, "condition": "rain", "wind": "east 2级"},
            "深圳": {"temp": 28, "condition": "sunny", "wind": "south 2级"}
        }
        return mock_data.get(city, {"temp": 20, "condition": "unknown", "wind": "calm"})

# 记忆:简单对话历史
class SimpleMemory:
    def __init__(self, max_turns=5):
        self.history = []
        self.max_turns = max_turns
    
    def add(self, role, content):
        self.history.append({"role": role, "content": content, "time": datetime.now().isoformat()})
        # 只保留最近N轮
        if len(self.history) > self.max_turns * 2:
            self.history = self.history[-self.max_turns * 2:]
    
    def get_context(self):
        return "\n".join([f"{h['role']}: {h['content']}" for h in self.history])

# Agent核心
class WeatherAgent:
    def __init__(self):
        self.llm = MockLLM()
        self.weather_tool = WeatherTool()
        self.memory = SimpleMemory()
    
    def run(self, user_input):
        # Step 1: 记录用户输入
        self.memory.add("user", user_input)
        
        # Step 2: 提取城市(简化版,实际可用NER或让LLM提取)
        city = self._extract_city(user_input)
        
        # Step 3: 调用工具获取天气
        weather = self.weather_tool.get_weather(city)
        
        # Step 4: 构建Prompt(包含记忆和工具结果)
        prompt = f"""
你是一位贴心的天气助手。根据以下信息给出穿衣建议:

城市:{city}
天气:{weather['condition']}
温度:{weather['temp']}°C
风力:{weather['wind']}

对话历史:
{self.memory.get_context()}

请给出简洁的穿衣建议:
"""
        
        # Step 5: 调用LLM生成回答
        response = self.llm.chat(prompt)
        
        # Step 6: 记录回答并返回
        self.memory.add("assistant", response)
        return response
    
    def _extract_city(self, text):
        # 简化版城市提取
        cities = ["北京", "上海", "深圳", "广州", "杭州"]
        for city in cities:
            if city in text:
                return city
        return "北京"  # 默认

# 运行Agent
if __name__ == "__main__":
    agent = WeatherAgent()
    
    print("=== 天气Agent演示 ===")
    print("用户:北京今天天气怎么样?")
    print("Agent:", agent.run("北京今天天气怎么样?"))
    print()
    print("用户:上海呢?")
    print("Agent:", agent.run("上海呢?"))  # 测试记忆:"呢"指代上海
    print()
    print("用户:深圳明天穿什么?")
    print("Agent:", agent.run("深圳明天穿什么?"))

这个极简Agent包含了Agent的四个核心要素:

组件实现说明
LLM(大脑)MockLLM实际项目中替换为真实API
工具(手脚)WeatherTool查询天气数据
记忆SimpleMemory保留最近N轮对话
规划run()方法固定的执行步骤(提取→查询→生成)

运行结果

=== 天气Agent演示 ===
用户:北京今天天气怎么样?
Agent:今天气温低,建议穿羽绒服。

用户:上海呢?
Agent:今天有雨,建议带伞,穿防水外套。

用户:深圳明天穿什么?
Agent:今天天气不错,穿轻便衣物即可。

这个Agent虽然简单,但它展示了Agent的基本工作模式:感知(用户输入)→ 决策(提取城市)→ 行动(查询天气)→ 生成(LLM回答)


从MVP到生产级的演进路径

一个玩具Agent和一个生产级Agent之间,隔着十万八千里。但不要被吓倒——所有复杂的系统,都是从简单的原型一步步长出来的。

阶段一:增加更多工具

从1个工具扩展到多个工具:

class ToolRegistry:
    def __init__(self):
        self.tools = {}
    
    def register(self, name, tool):
        self.tools[name] = tool
    
    def execute(self, name, **params):
        if name not in self.tools:
            raise ValueError(f"Tool {name} not found")
        return self.tools[name].run(**params)

# 注册多个工具
registry = ToolRegistry()
registry.register("weather", WeatherTool())
registry.register("stock", StockQueryTool())      # 新增:股票查询
registry.register("calendar", CalendarTool())     # 新增:日历管理
registry.register("email", EmailTool())           # 新增:发送邮件

DataAgent有16+个节点,每个节点本质上是一个"工具"或"工具组合":SQL生成、SQL执行、Python代码执行、报告生成……

阶段二:引入规划能力

MVP的Agent执行固定流程,但复杂任务需要动态规划。DataAgent的PlannerNode就是为此而生:

用户:"分析一下上季度各地区的销售额,找出增长最快的前三个地区,
      并预测下季度的趋势。"

PlannerNode生成计划:
1. SchemaRecallNode - 获取sales表结构
2. SqlGenerateNode - 生成查询上季度销售额的SQL
3. SqlExecuteNode - 执行SQL获取数据
4. PythonAnalyzeNode - 用Python计算增长率并排序
5. PythonAnalyzeNode - 用Python做趋势预测
6. ReportGeneratorNode - 生成分析报告

规划能力让Agent从"按固定剧本演戏"升级为"根据任务自主编排"。

阶段三:添加记忆系统

从简单的对话历史,升级到长期记忆

  • • 短期记忆:当前对话的上下文(已具备)
  • • 长期记忆:用户偏好、历史查询、常见问题的答案
  • • 向量记忆:用Embedding将知识存入向量数据库,支持语义检索

DataAgent的MultiTurnContextManager管理多轮上下文,AgentVectorStoreService提供RAG检索能力。

阶段四:工作流编排

当节点多了以后,需要工作流引擎来管理执行顺序、条件分支、并行执行、错误处理。

DataAgent使用StateGraph模式:

// 伪代码:StateGraph的工作流定义
StateGraph graph = new StateGraph();
graph.addNode("intent", new IntentRecognitionNode());
graph.addNode("plan", new PlannerNode());
graph.addNode("sql_generate", new SqlGenerateNode());
graph.addNode("sql_execute", new SqlExecuteNode());
graph.addNode("report", new ReportGeneratorNode());
graph.addNode("human_feedback", new HumanFeedbackNode());

// 定义边(执行顺序)
graph.addEdge("intent", "plan");
graph.addEdge("plan", "human_feedback");
graph.addConditionalEdge("human_feedback", 
    state -> state.getHumanFeedback() ? "execute" : "revise");
graph.addEdge("execute", "report");

阶段五:人机协作

生产级Agent必须考虑人在回路(Human-in-the-loop)

  • • 计划确认:Agent生成计划后,人类确认或修改
  • • 危险操作拦截:删除数据、修改配置前必须人工确认
  • • 异常求助:Agent遇到不确定的情况,主动询问人类

DataAgent在PlannerNode后设置HumanFeedbackNode,使用interruptBefore机制暂停执行,等待人类反馈。

阶段六:安全与观测

最后但最重要的是安全可观测性(我们在第十章和第十一章详细讲过):

  • • 输入校验:防止Prompt注入、恶意输入
  • • 输出过滤:敏感信息脱敏、有害内容拦截
  • • 权限控制:不同用户能访问不同数据和工具
  • • 全链路追踪:每个请求的可追溯、可审计

DataAgent的设计启示

DataAgent是一个拥有16+节点的复杂StateGraph,但它不是一天建成的。

启示一:从核心功能开始

DataAgent的起点是NL2SQL(自然语言转SQL)。这是最有价值、最核心的功能:

// DataAgentApplication.java - 简洁的入口
@SpringBootApplication
@EnableScheduling
public class DataAgentApplication {
    public static void main(String[] args) {
        SpringApplication.run(DataAgentApplication.class, args);
    }
}

整个应用的入口只有这几行。但背后是一个逐步扩展的过程:

  1. 1. V0.1:NL2SQL(意图识别 → SQL生成 → SQL执行 → 结果返回)
  2. 2. V0.2:增加Schema召回(解决表结构理解问题)
  3. 3. V0.3:增加RAG检索(解决业务术语映射问题)
  4. 4. V0.4:增加Planner(支持多步骤复杂查询)
  5. 5. V0.5:增加Python分析(支持深度数据挖掘)
  6. 6. V0.6:增加报告生成(输出美观的图表和文档)
  7. 7. V1.0:增加人机协作、安全控制、可观测性

启示二:每个Node都是独立可测试的

DataAgent的每个节点都实现了NodeAction接口,可以独立开发和单元测试:

// 伪代码:Node的标准接口
public interface NodeAction {
    Map<String, Object> execute(OverAllState state);
}

// SqlGenerateNode可以独立测试
@Test
public void testSqlGenerateNode() {
    SqlGenerateNode node = new SqlGenerateNode(llmService);
    OverAllState state = new OverAllState();
    state.put("query", "查询上个月的销售额");
    state.put("schema", "table: sales(columns: amount, date, region)");
    
    Map<String, Object> result = node.execute(state);
    
    assertNotNull(result.get("sql"));
    assertTrue(result.get("sql").toString().contains("SELECT"));
}

这种设计的好处:

  • • 并行开发:不同开发者负责不同节点,互不干扰
  • • 独立测试:每个节点可以单独验证正确性
  • • 灵活替换:想换一个新的SQL生成策略?直接替换SqlGenerateNode

启示三:动态LLM切换的设计巧思

DataAgent的AiModelRegistry实现了运行时热切换LLM,这是一个非常实用的设计:

// AiModelRegistry.java(简化版)
@Component
public class AiModelRegistry {
    
    private final DynamicModelFactory modelFactory;
    private final ModelConfigDataService modelConfigDataService;
    
    // volatile 保证多线程可见性
    private volatile ChatClient currentChatClient;
    private volatile EmbeddingModel currentEmbeddingModel;

    /**
     * 获取 ChatClient(懒加载 + 双重检查锁)
     */
    public ChatClient getChatClient() {
        if (currentChatClient == null) {
            synchronized (this) {
                if (currentChatClient == null) {
                    ModelConfigDTO config = modelConfigDataService
                        .getActiveConfigByType(ModelType.CHAT);
                    ChatModel chatModel = modelFactory.createChatModel(config);
                    currentChatClient = ChatClient.builder(chatModel).build();
                }
            }
        }
        return currentChatClient;
    }

    /**
     * 刷新缓存(用于热切换)
     */
    public void refreshChat() {
        this.currentChatClient = null;
        log.info("Chat cache cleared. Next call will create new instance.");
    }
}

这个设计的亮点:

  1. 1. 懒加载:第一次使用时才初始化,加快启动速度
  2. 2. 双重检查锁(DCL):线程安全的高效单例模式
  3. 3. volatile关键字:确保多线程环境下缓存的可见性
  4. 4. 热切换:管理员在后台切换模型配置后,调用refreshChat()清空缓存,下次请求自动使用新模型,无需重启服务

这种设计在生产环境中非常实用:

  • • 发现某个模型API不稳定?立即切换到备用模型
  • • 新模型上线了?无缝切换,用户无感知
  • • A/B测试不同模型?动态切换,对比效果

扩展方向:Agent的未来

当你掌握了基础Agent开发后,可以向这些方向探索:

多模态Agent

不仅能处理文本,还能理解图像、语音、视频:

  • • 图像理解:用户上传一张图表,Agent分析数据趋势
  • • 语音交互:用户用语音提问,Agent语音回答
  • • 文档解析:自动读取PDF、Word、Excel,提取关键信息

Agent生态:MCP协议

MCP(Model Context Protocol) 是Anthropic提出的开放协议,让Agent可以无缝连接各种外部工具和数据源。

DataAgent已经支持MCP Server:

// McpServerService.java(概念示意)
@Service
public class McpServerService {
    // 提供 nl2SqlToolCallback 和 listAgentsToolCallback
    // 外部Agent(如Claude Desktop)可以通过MCP协议调用DataAgent的能力
}

这意味着DataAgent不仅能独立运行,还能作为"工具"被其他Agent调用,形成Agent生态。

自主Agent(AutoGPT类)

更进一步的Agent可以自主设定目标、分解任务、循环执行、自我纠错

用户:"帮我调研一下新能源汽车市场"

Agent自主执行:
1. 搜索最新销量数据
2. 分析主要品牌市场份额
3. 查找政策新闻
4. 生成对比表格
5. 发现数据缺失,自动补充搜索
6. 生成完整报告
7. 询问用户是否需要深入某个方面

这类Agent更智能,但也更难控制,需要更强的安全机制和人在回路设计。


学习资源推荐

官方文档

  • • Spring AI Alibaba:https://github.com/alibaba/spring-ai-alibaba(DataAgent使用的框架)
  • • LangChain:https://python.langchain.com(Python生态最流行的Agent框架)
  • • OpenAI API:https://platform.openai.com/docs(理解LLM API的最佳起点)

开源项目

  • • DataAgent:企业级数据分析Agent(本书案例)
  • • AutoGPT:自主Agent的探索性项目
  • • MetaGPT:多Agent协作框架

论文与博客

  • • ReAct(Reasoning + Acting):LLM推理与行动结合的经典论文
  • • LangGraph:基于图的Agent工作流框架
  • • Toolformer:LLM学习使用工具的论文

全书总结

让我们用一张图总结全书的核心内容:

┌─────────────────────────────────────────────────────────────┐
│                      Agent 的完整拼图                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│    ┌─────────┐                                              │
│    │  LLM    │  ← 大脑:推理、理解、生成                       │
│    │ (大脑)  │                                              │
│    └────┬────┘                                              │
│         │                                                   │
│    ┌────┴────┬─────────┬─────────┬─────────┐               │
│    ▼         ▼         ▼         ▼         ▼               │
│ ┌──────┐ ┌──────┐ ┌────────┐ ┌──────┐ ┌────────┐          │
│ │Tools │ │Memory│ │Planning│ │Safety│ │Human   │          │
│ │(手脚)│ │(记忆)│ │(规划)  │ │(安全)│ │(协作)  │          │
│ └──────┘ └──────┘ └────────┘ └──────┘ └────────┘          │
│    │         │         │         │         │               │
│    └─────────┴─────────┴─────────┴─────────┘               │
│                      │                                      │
│                      ▼                                      │
│               ┌────────────┐                                │
│               │ Workflow   │  ← 编排:StateGraph / 工作流引擎 │
│               │ (编排)     │                                │
│               └─────┬──────┘                                │
│                     │                                       │
│                     ▼                                       │
│               ┌────────────┐                                │
│               │Observability│ ← 观测:日志 / 指标 / 追踪       │
│               │ (观测)      │                                │
│               └────────────┘                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Agent = LLM(大脑) + Tools(手脚) + Memory(记忆) + Planning(规划) + Workflow(编排) + Human(协作) + Safety(安全) + Observability(观测)

这八个要素构成了生产级Agent系统的完整拼图。缺少任何一个,都可能在某个时刻出问题:

  • • 没有Tools,Agent是" paralysis by analysis"(只会分析,不会行动)
  • • 没有Memory,Agent是"金鱼"(每次对话都从零开始)
  • • 没有Planning,Agent是"无头苍蝇"(复杂任务无从下手)
  • • 没有Workflow,Agent是"一盘散沙"(节点各自为战)
  • • 没有Human,Agent是"失控的列车"(可能做出危险决策)
  • • 没有Safety,Agent是"定时炸弹"(容易被攻击、泄露数据)
  • • 没有Observability,Agent是"黑箱"(出问题无法定位)

本章小结

  1. 1. 设计框架(5W1H):What、Who、When、Where、Why、How——从问题出发,而非技术出发
  2. 2. MVP Agent:1个LLM + 1个工具 + 1个记忆 = 最小可行Agent,先跑起来再优化
  3. 3. 演进路径:增加工具 → 引入规划 → 添加记忆 → 工作流编排 → 人机协作 → 安全与观测
  4. 4. DataAgent启示:从核心功能(NL2SQL)开始,每个Node独立可测试,支持动态LLM热切换
  5. 5. 未来方向:多模态Agent、MCP生态、自主Agent

最后的话:Agent开发没有终点,只有持续的迭代和优化。今天写的第一个Agent可能很简陋,但它是你成为Agent工程师的第一步。动手吧,现在就开始写你的第一个Agent!


思考题

  1. 1. 设计题:用本章的5W1H框架,设计一个"智能客服Agent"。请回答六个问题,并画出它的MVP架构图(可以用文字描述)。
  2. 2. 代码题:在天气Agent的示例基础上,增加一个"日历工具",让Agent能根据天气和用户的日程给出建议(例如:"今天有雨,而且你下午有户外会议,建议改到室内或带伞")。
  3. 3. 开放题:DataAgent从V0.1到V1.0经历了七个阶段的演进。如果你要设计一个"个人知识管理Agent"(帮助用户整理笔记、提取知识点、回答基于笔记的问题),你会如何规划它的演进路线?请列出前三个版本的核心功能。