第一章:什么是Agent?——从生活例子到AI Agent
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent开篇:你早就见过"Agent"
想象一下这样的场景:
你走进一家房产中介公司,对经纪人小王说:"我想在市中心买一套三居室,预算300万,附近有地铁,最好带学区。"
小王会怎么做?
- 1. 接收需求:他拿出笔记本,记录下你的预算、地段、户型要求
- 2. 分析问题:他在脑海中盘算:"市中心的学区房很抢手,300万可能有点紧张,需要看看周边新盘或者二手房"
- 3. 调用资源:他打开房源系统,筛选符合条件的楼盘;打电话给熟悉的开发商销售;查询最新的学区划分政策
- 4. 返回结果:三天后,他给你整理了一份详细的看房清单,附带每套房源的优缺点分析
在这个过程中,小王就是一个Agent(代理/中介)——他接收你的需求,独立思考,调动各种资源,最终给你一个完整的解决方案。
类似的例子还有很多:
- • 保险经纪人:了解你的家庭情况后,帮你对比十几家保险公司的产品,推荐最适合的方案
- • 律师:听完你的案情,查阅法律条文、检索判例、分析证据,给出专业的法律意见
- • 旅行社定制师:根据你的预算和喜好,设计一条独一无二的旅行路线,预订机票酒店
这些职业的共同点是:他们不是简单地传递信息,而是主动帮你解决问题。
AI Agent的定义:会思考的数字助手
现在,我们把"Agent"这个概念搬到人工智能领域。
AI Agent(智能体)是能够自主感知环境、做出决策、执行行动的智能系统。
听起来有点抽象?让我们拆解一下:
| 能力 | 房产中介小王 | AI Agent |
|---|---|---|
| 感知环境 | 听你说话、看你的表情 | 读取用户输入、监控系统状态、获取外部数据 |
| 做出决策 | 分析预算和需求是否匹配 | 判断用户意图、规划执行步骤 |
| 执行行动 | 打电话、查系统、约看房 | 调用API、执行代码、生成报告 |
一个AI Agent就像数字世界里的"小王"——你告诉它一个问题,它会自己思考怎么解决,然后动手去做。
Agent的三个关键词
学术界给Agent下了很多定义,但核心离不开这三个特性:
1. 自主性(Autonomy)
Agent能独立运行,不需要人类一步一步地指导。
就像你委托小王找房后,不需要每天催他"今天查了哪几个小区"——他会主动推进。AI Agent也是如此:你给它一个目标("分析一下上个月的销售数据"),它会自己决定怎么做,不需要你教它"先打开数据库,再写SQL,最后做图表"。
2. 反应性(Reactivity)
Agent能感知环境变化并做出反应。
小王带你看房时,如果你说"这个小区太旧了",他会立刻调整方向,推荐新楼盘。AI Agent也一样:如果执行SQL时数据库连接断了,它会感知到这个错误,尝试重连或者换一种方式获取数据。
3. 主动性(Pro-activeness)
Agent不只会被动响应,还会主动采取行动。
优秀的小王不会等你问"学区政策变了吗",他会主动提醒你:"最近学区划分有调整,我帮你确认了一下,这个小区还在学区范围内。"AI Agent也能如此:发现数据异常时主动预警,或者在分析完销售数据后主动建议"下个月可以重点推广A产品"。
一句话总结:Agent = 自主性 + 反应性 + 主动性
Agent vs 传统程序:即兴演员 vs 按剧本演戏
你可能想问:Agent和普通程序有什么区别?我写一个脚本也能自动执行任务啊。
好问题!让我们用一个比喻来说明:
传统程序 = 按剧本演戏的演员
剧本写好了每一步:
- 1. 走到舞台中央
- 2. 说台词A
- 3. 转身
- 4. 说台词B
演员(程序)严格按剧本执行,不会变通。如果舞台上突然多了把椅子,演员会愣住——剧本里没写怎么处理椅子。
Agent = 即兴演员
即兴演员没有固定剧本,只有一个目标("演一场关于友情的戏")。他会根据现场情况灵活发挥:
- • 看到椅子,可能把它当成道具坐上去
- • 搭档忘词了,他自然地接话圆场
- • 观众笑了,他顺势加一段互动
代码层面的对比:
// 传统程序:固定的执行流程
public class TraditionalProgram {
public void analyzeSales() {
// 步骤1:连接数据库(写死的配置)
Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/sales");
// 步骤2:执行固定SQL
String sql = "SELECT * FROM monthly_sales WHERE month = '2024-01'";
ResultSet rs = conn.createStatement().executeQuery(sql);
// 步骤3:输出固定格式报告
System.out.println("月度销售报告:");
while (rs.next()) {
System.out.println(rs.getString("product") + ": " + rs.getDouble("amount"));
}
}
}传统程序的问题是:一切都预先写死了。如果用户问"对比2023年和2024年的数据",或者"只看华东区的销售",程序就无能为力了——因为它只会执行那一条固定的SQL。
而Agent的做法完全不同:
// DataAgent的意图识别节点:先理解用户要什么
public class IntentRecognitionNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 1. 获取用户输入(感知)
String userInput = StateUtil.getStringValue(state, INPUT_KEY);
log.info("User input for intent recognition: {}", userInput);
// 2. 构建意图识别提示词(思考)
String prompt = PromptHelper.buildIntentRecognitionPrompt(multiTurn, userInput);
// 3. 调用大模型判断用户意图(决策)
Flux<ChatResponse> responseFlux = llmService.callUser(prompt);
// 4. 返回识别结果(行动)
return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, generator);
}
}看到区别了吗?Agent不会预先写死SQL,而是:
- 1. 先理解用户问的是什么("分析一下上个月的销售数据,按产品分类")
- 2. 再决定需要做什么(需要查sales表、按product分组、计算amount总和)
- 3. 最后生成对应的SQL并执行
如果用户换个问法:"华东区Q3的销售趋势如何?"Agent会重新理解意图,生成完全不同的执行计划。
Agent vs Chatbot:你派我办 vs 你问我答
还有一个容易混淆的概念:Agent和Chatbot(聊天机器人)有什么区别?
| 维度 | Chatbot | Agent |
|---|---|---|
| 交互模式 | 你问我答 | 你派我办 |
| 能力范围 | 只说话 | 能动手 |
| 记忆能力 | 当前对话 | 跨会话记忆 |
| 主动性 | 被动等待提问 | 主动推进任务 |
| 典型例子 | 客服机器人、Siri | 自动驾驶、数据分析Agent |
Chatbot是"百科全书":你问它"什么是SQL",它给你解释;你再问"怎么写JOIN",它给你示例。但它不会真的帮你去数据库里查数据。
Agent是"办事员":你说"帮我分析一下用户增长趋势",它会:
- 1. 理解你要分析什么指标(DAU?新增用户?留存率?)
- 2. 去数据库里找到相关表
- 3. 写SQL查询数据
- 4. 用Python画图
- 5. 生成一份完整的分析报告
整个过程它自己推进,不需要你一步步指导。
类比:Chatbot像电话客服——能解答问题,但办不了事;Agent像你的私人助理——你交代任务,他搞定一切。
DataAgent:一个真实的企业级AI Agent
说了这么多概念,让我们看看真实的AI Agent长什么样。
DataAgent是阿里云开发的企业级智能数据分析Agent。它能听懂你的数据问题,自动写SQL、画图、出报告。
DataAgent的工作流程
当你问DataAgent:"近三个月各产品线的销售额趋势如何?"
DataAgent内部会经历16+个节点的处理:
用户提问
↓
[意图识别] → 这是数据分析请求,不是闲聊
↓
[证据召回] → 查询相关知识库,了解"销售额"对应哪些表和字段
↓
[查询增强] → 把"近三个月"翻译成具体的日期范围
↓
[Schema召回] → 找到sales、product、date等相关表
↓
[表关系] → 分析这些表怎么关联(sales.product_id = product.id)
↓
[可行性评估] → 判断这个问题能不能用现有数据回答
↓
[规划] → 制定执行计划:Step1查数据 → Step2画图 → Step3写报告
↓
[人机反馈] → (可选)把计划给用户确认
↓
[计划执行] → 按步骤执行...
↓
[SQL生成] → 自动生成SQL查询语句
↓
[语义一致性] → 检查SQL是否符合用户意图
↓
[SQL执行] → 连接数据库执行查询
↓
[Python生成] → 根据查询结果生成画图代码
↓
[Python执行] → 运行代码生成图表
↓
[Python分析] → 对数据做深度分析(趋势、异常等)
↓
[报告生成] → 整合所有结果,生成最终报告
↓
返回给用户:数据表格 + 趋势图 + 分析结论整个流程完全自动化,你只需要说一句话,剩下的交给Agent。
看看DataAgent的"大脑"——意图识别
DataAgent的第一个节点是IntentRecognitionNode(意图识别节点),它的作用是判断用户想干什么。
/**
* 意图识别节点,用于识别用户输入是闲聊还是数据分析请求
*/
@Slf4j
@Component
@AllArgsConstructor
public class IntentRecognitionNode implements NodeAction {
private final LlmService llmService;
private final JsonParseUtil jsonParseUtil;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 第1步:获取用户输入(感知)
String userInput = StateUtil.getStringValue(state, INPUT_KEY);
log.info("User input for intent recognition: {}", userInput);
// 获取多轮对话上下文(如果有历史对话)
String multiTurn = StateUtil.getStringValue(state, MULTI_TURN_CONTEXT, "(无)");
// 第2步:构建意图识别提示词(思考准备)
String prompt = PromptHelper.buildIntentRecognitionPrompt(multiTurn, userInput);
log.debug("Built intent recognition prompt as follows \n {} \n", prompt);
// 第3步:调用大模型进行意图识别(决策)
Flux<ChatResponse> responseFlux = llmService.callUser(prompt);
// 第4步:处理模型返回的结果(行动)
Flux<GraphResponse<StreamingOutput>> generator = FluxUtil.createStreamingGenerator(this.getClass(), state,
responseFlux,
Flux.just(ChatResponseUtil.createResponse("正在进行意图识别..."),
ChatResponseUtil.createPureResponse(TextType.JSON.getStartSign())),
Flux.just(ChatResponseUtil.createPureResponse(TextType.JSON.getEndSign()),
ChatResponseUtil.createResponse("\n意图识别完成!")),
result -> {
// 使用JsonParseUtil解析JSON并转换为IntentRecognitionOutputDTO对象
IntentRecognitionOutputDTO intentRecognitionOutput = jsonParseUtil.tryConvertToObject(result,
IntentRecognitionOutputDTO.class);
return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, intentRecognitionOutput);
});
return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, generator);
}
}这段代码做了什么?
- 1. 感知:从系统状态(
OverAllState)中读取用户输入 - 2. 思考:把用户输入包装成提示词,交给大模型(LLM)分析
- 3. 行动:把大模型的返回结果解析成结构化的意图对象,传递给下一个节点
比如用户说"你好",意图识别节点会判断这是"闲聊",直接返回友好的问候语;用户说"上个月销售额多少",它会判断这是"数据分析请求",继续走后续的数据库查询流程。
这就是Agent的自主性——它能自己判断该走哪条路,不需要人类预先写好所有分支。
本章小结
让我们回顾一下本章的核心概念:
- 1. Agent的本质:接收需求 → 分析问题 → 调用资源 → 返回结果。房产中介、保险经纪人都是生活中的Agent。
- 2. AI Agent的定义:能够自主感知环境、做出决策、执行行动的智能体。
- 3. Agent的三个关键词:
- • 自主性:独立运行,不需要逐步指导
- • 反应性:感知环境变化并做出反应
- • 主动性:主动采取行动,不止被动响应
- 4. Agent vs 传统程序:传统程序按固定剧本执行,Agent像即兴演员,能根据情况灵活应对。
- 5. Agent vs Chatbot:Chatbot是"你问我答"的百科全书,Agent是"你派我办"的办事员。
- 6. DataAgent:阿里云的企业级数据分析Agent,包含16+个节点,能自动完成从意图识别到报告生成的完整流程。
思考题
- 1. 生活中的Agent:除了本章提到的房产中介、保险经纪人、律师,你还能想到哪些生活中的Agent?它们分别体现了自主性、反应性、主动性中的哪些特性?
- 2. 场景辨析:假设你正在使用一个智能音箱。
- • 你对它说"今天天气怎么样",它回答"今天北京晴,25度"——这是Chatbot还是Agent?为什么?
- • 你对它说"明天下雨的话,帮我取消下午的户外活动,并给所有参加者发通知",它真的去检查日历、发送邮件——这是Chatbot还是Agent?为什么?
- 3. DataAgent的意图识别:在DataAgent的IntentRecognitionNode代码中,如果用户输入是"你好,请问怎么用SQL查重复数据",你觉得意图识别节点会把它归类为"闲聊"还是"数据分析请求"?为什么?(提示:思考这句话的语义重心在哪里)
第二章:Agent的核心能力——感知、思考、行动
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent餐厅服务员的启示
想象你走进一家高档餐厅,服务员小李接待了你。
迎客(感知):
- • 看到你穿着正装,判断可能是商务宴请
- • 注意到你带着生日蛋糕,意识到是生日聚餐
- • 听到你说"两位",同时看到门口还有朋友在停车
推荐菜品(思考):
- • 分析:商务宴请需要体面,生日需要惊喜,等人到齐再点菜
- • 决策:先推荐招牌菜和红酒,等朋友到了再点主菜
上菜(行动):
- • 把凉菜先上,热菜等朋友到了再下单
- • 悄悄通知后厨准备长寿面
- • 朋友到了,主动加椅子、倒茶水
这个过程中,小李展现了Agent的三个核心能力:感知(Perception)、思考(Reasoning/Planning)、行动(Action)。
AI Agent也是如此。本章我们就来深入理解这三个能力,并通过DataAgent的真实代码,看看企业级Agent是如何运转的。
感知(Perception):Agent如何接收和理解输入
什么是感知?
感知是Agent与外部世界交互的"感官系统"。就像人类通过眼睛、耳朵、皮肤接收信息,Agent通过各种"接口"接收输入:
| 人类感官 | Agent的感知方式 | 例子 |
|---|---|---|
| 眼睛 | 文本输入 | 用户在对话框里打字 |
| 耳朵 | 语音输入 | 用户对着手机说话 |
| 皮肤 | 系统状态 | 数据库连接是否正常 |
| 鼻子 | 环境数据 | 当前时间、用户位置 |
| 味觉 | 多模态输入 | 上传的图片、Excel文件 |
DataAgent的感知系统
DataAgent主要接收文本输入,但它感知的内容远不止用户当前说的话。
// DataAgent的意图识别节点:从状态中获取各种输入信息
public class IntentRecognitionNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 感知1:获取用户当前的输入
String userInput = StateUtil.getStringValue(state, INPUT_KEY);
log.info("User input for intent recognition: {}", userInput);
// 感知2:获取多轮对话历史(如果有之前的对话)
String multiTurn = StateUtil.getStringValue(state, MULTI_TURN_CONTEXT, "(无)");
// 感知3:构建提示词,交给大模型分析
String prompt = PromptHelper.buildIntentRecognitionPrompt(multiTurn, userInput);
// ...后续处理
}
}看到没有?DataAgent不仅感知了用户当前的输入,还感知了对话历史。这意味着:
- • 用户第一句:"帮我查一下销售数据"
- • 用户第二句:"只看华东区的"
如果没有多轮对话感知,第二句"只看华东区的"就毫无意义——华东区的什么?但有了对话历史,Agent就知道这是在上一条查询的基础上加过滤条件。
多模态感知
除了文本,现代Agent还能感知更多类型的输入:
- • 语音:智能音箱把语音转成文字
- • 图片:用户上传一张报表截图,Agent识别其中的表格数据
- • 文件:用户上传Excel,Agent读取并分析
- • 系统状态:Agent感知数据库是否可用、网络是否通畅
关键洞察:Agent的感知能力决定了它的"视野"。感知越丰富,Agent越能理解复杂的上下文。
思考(Reasoning/Planning):Agent如何分析问题
感知到信息后,Agent需要思考。这是Agent最核心、最复杂的能力。
思考的层次
Agent的思考通常分为三个层次:
第一层:意图识别
"用户到底想干什么?"
第二层:任务拆解
"要完成这个目标,需要分几步?"
第三层:细节规划
"每一步具体怎么做?用什么工具?"让我们通过DataAgent的真实代码,看看这三个层次是如何实现的。
第一层:意图识别——这是聊天还是数据分析?
DataAgent的第一个思考节点是IntentRecognitionNode:
/**
* 意图识别节点,用于识别用户输入是闲聊还是数据分析请求
*/
@Slf4j
@Component
@AllArgsConstructor
public class IntentRecognitionNode implements NodeAction {
private final LlmService llmService;
private final JsonParseUtil jsonParseUtil;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 获取用户输入
String userInput = StateUtil.getStringValue(state, INPUT_KEY);
log.info("User input for intent recognition: {}", userInput);
String multiTurn = StateUtil.getStringValue(state, MULTI_TURN_CONTEXT, "(无)");
// 构建意图识别提示
String prompt = PromptHelper.buildIntentRecognitionPrompt(multiTurn, userInput);
log.debug("Built intent recognition prompt as follows \n {} \n", prompt);
// 调用LLM进行意图识别
Flux<ChatResponse> responseFlux = llmService.callUser(prompt);
Flux<GraphResponse<StreamingOutput>> generator = FluxUtil.createStreamingGenerator(this.getClass(), state,
responseFlux,
Flux.just(ChatResponseUtil.createResponse("正在进行意图识别..."),
ChatResponseUtil.createPureResponse(TextType.JSON.getStartSign())),
Flux.just(ChatResponseUtil.createPureResponse(TextType.JSON.getEndSign()),
ChatResponseUtil.createResponse("\n意图识别完成!")),
result -> {
// 使用JsonParseUtil解析JSON并转换为IntentRecognitionOutputDTO对象
IntentRecognitionOutputDTO intentRecognitionOutput = jsonParseUtil.tryConvertToObject(result,
IntentRecognitionOutputDTO.class);
return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, intentRecognitionOutput);
});
return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, generator);
}
}这段代码的核心是第62行:llmService.callUser(prompt)——把用户的输入交给大模型(LLM)分析。
大模型会返回一个结构化的结果,比如:
{
"intent": "data_analysis",
"confidence": 0.95,
"description": "用户想要查询销售数据"
}或者:
{
"intent": "chitchat",
"confidence": 0.88,
"description": "用户在打招呼"
}基于这个结果,DataAgent决定走哪条路:
- • 数据分析 → 继续后续的数据库查询流程
- • 闲聊 → 直接返回友好的回复,结束流程
这就是第一层思考:先搞清楚用户要什么。
第二层:任务拆解——大问题拆成小步骤
确定了意图后,DataAgent进入PlannerNode(规划节点),这是Agent的"大脑司令部"。
/**
* 规划节点:根据用户需求生成执行计划
*/
@Slf4j
@Component
@AllArgsConstructor
public class PlannerNode implements NodeAction {
private final LlmService llmService;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 是否为NL2SQL模式(直接翻译SQL,不做复杂分析)
Boolean onlyNl2sql = state.value(IS_ONLY_NL2SQL, false);
Flux<ChatResponse> flux = onlyNl2sql ? handleNl2SqlOnly() : handlePlanGenerate(state);
// ...后续处理
}
private Flux<ChatResponse> handlePlanGenerate(OverAllState state) {
// 获取查询增强节点的输出(经过改写和扩展的用户问题)
String canonicalQuery = StateUtil.getCanonicalQuery(state);
log.info("Using processed query for planning: {}", canonicalQuery);
// 检查是否为修复模式(用户之前否决了计划,需要重新生成)
String validationError = StateUtil.getStringValue(state, PLAN_VALIDATION_ERROR, null);
if (validationError != null) {
log.info("Regenerating plan with user feedback: {}", validationError);
}
else {
log.info("Generating initial plan");
}
// 构建提示参数
String semanticModel = (String) state.value(GENEGRATED_SEMANTIC_MODEL_PROMPT).orElse("");
SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
String schemaStr = PromptHelper.buildMixMacSqlDbPrompt(schemaDTO, true);
// 构建用户提示
String userPrompt = buildUserPrompt(canonicalQuery, validationError, state);
String evidence = StateUtil.getStringValue(state, EVIDENCE);
// 构建模板参数
BeanOutputConverter<Plan> beanOutputConverter = new BeanOutputConverter<>(Plan.class);
Map<String, Object> params = Map.of("user_question", userPrompt, "schema", schemaStr, "evidence", evidence,
"semantic_model", semanticModel, "plan_validation_error", formatValidationError(validationError),
"format", beanOutputConverter.getFormat());
// 生成计划
String plannerPrompt = PromptConstant.getPlannerPromptTemplate().render(params);
log.debug("Planner prompt: as follows \n{}\n", plannerPrompt);
// 调用LLM生成计划
return llmService.callUser(plannerPrompt);
}
}PlannerNode的核心任务是生成执行计划。它会输出一个结构化的计划,比如:
{
"thoughtProcess": "用户想了解近三个月各产品线的销售趋势。我需要:1)查询sales表获取销售数据;2)按产品线和月份分组统计;3)用Python绘制趋势图;4)生成分析报告",
"executionPlan": [
{
"step": 1,
"toolToUse": "sql_generate",
"toolParameters": {
"instruction": "查询近三个月各产品线的销售额,按月份和产品线分组"
}
},
{
"step": 2,
"toolToUse": "python_generate",
"toolParameters": {
"instruction": "根据SQL查询结果,绘制各产品线近三个月的销售趋势折线图"
}
},
{
"step": 3,
"toolToUse": "report_generator",
"toolParameters": {
"summaryAndRecommendations": "总结销售趋势,指出增长最快的产品线和潜在问题"
}
}
]
}看到没有?Agent把一个大问题拆解成了三个小步骤:
- 1. 先查数据(SQL)
- 2. 再画图(Python)
- 3. 最后写报告
这就是第二层思考:把复杂任务拆解成可执行的步骤。
第三层:查询增强——让问题更清晰
在规划之前,DataAgent还有一个重要的思考节点:QueryEnhanceNode(查询增强节点)。
/**
* 查询增强节点:根据evidence信息把业务术语翻译、查询改写、扩展
*/
@Slf4j
@Component
@AllArgsConstructor
public class QueryEnhanceNode implements NodeAction {
private final LlmService llmService;
private final JsonParseUtil jsonParseUtil;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 获取用户输入
String userInput = StateUtil.getStringValue(state, INPUT_KEY);
log.info("User input for query enhance: {}", userInput);
// 获取证据信息(从知识库召回的相关信息)
String evidence = StateUtil.getStringValue(state, EVIDENCE);
// 获取多轮对话上下文
String multiTurn = StateUtil.getStringValue(state, MULTI_TURN_CONTEXT, "(无)");
// 构建查询处理提示
String prompt = PromptHelper.buildQueryEnhancePrompt(multiTurn, userInput, evidence);
log.debug("Built query enhance prompt as follows \n {} \n", prompt);
// 调用LLM进行查询处理
Flux<ChatResponse> responseFlux = llmService.callUser(prompt);
// ...后续处理
}
}QueryEnhanceNode的作用是什么?
假设用户问:"上个月GMV怎么样?"
Agent可能不知道"GMV"是什么。但通过查询增强:
- 1. 从知识库召回证据:"GMV = Gross Merchandise Volume = 商品交易总额 = order表中的amount字段总和"
- 2. 把用户的问题改写成:"统计上个月(2024年1月)所有订单的amount字段总和"
这就是第三层思考:把模糊的用户语言翻译成精确的技术语言。
思考的本质:大模型是Agent的"大脑"
你会发现,DataAgent的每个思考节点都调用了llmService.callUser()——大模型(LLM)是Agent思考能力的核心来源。
但Agent不只是简单地调用大模型。它通过提示词工程(Prompt Engineering)和工作流编排,让大模型在合适的时机做合适的事:
| 节点 | 大模型的任务 | 输入 | 输出 |
|---|---|---|---|
| 意图识别 | 分类 | 用户问题 | 意图类型 |
| 查询增强 | 翻译/改写 | 用户问题 + 知识库证据 | 规范化查询 |
| 规划 | 拆解任务 | 规范化查询 + 数据库Schema | 执行计划 |
| SQL生成 | 写代码 | 执行步骤 + Schema | SQL语句 |
| 语义一致性 | 检查 | SQL + 用户问题 | 通过/不通过 |
| 报告生成 | 写作 | 所有执行结果 | 分析报告 |
关键洞察:Agent的思考能力 = 大模型的推理能力 + 精心设计的提示词 + 清晰的工作流分工
行动(Action):Agent如何执行和输出
思考完成后,Agent需要行动——把想法变成现实。
行动的类型
Agent的行动可以分为几类:
| 行动类型 | 说明 | DataAgent中的例子 |
|---|---|---|
| 工具调用 | 调用外部API或工具 | 查询数据库、调用大模型 |
| 代码执行 | 运行程序代码 | 执行SQL、运行Python |
| 内容生成 | 创造新的内容 | 生成报告、绘制图表 |
| 状态更新 | 修改内部状态 | 记录执行结果、更新对话历史 |
| 人机交互 | 与人类互动 | 请求确认、展示中间结果 |
DataAgent的行动节点
让我们看看DataAgent中几个典型的行动节点。
行动1:SQL执行
/**
* SQL执行节点:执行SQL查询并处理结果
*/
@Slf4j
@Component
@AllArgsConstructor
public class SqlExecuteNode implements NodeAction {
private final DatabaseUtil databaseUtil;
private final Nl2SqlService nl2SqlService;
private final LlmService llmService;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
Integer currentStep = PlanProcessUtil.getCurrentStepNumber(state);
// 获取生成的SQL
String sqlQuery = StateUtil.getStringValue(state, SQL_GENERATE_OUTPUT);
sqlQuery = nl2SqlService.sqlTrim(sqlQuery);
log.info("Executing SQL query: {}", sqlQuery);
// 获取Agent ID,动态获取数据源配置
String agentIdStr = StateUtil.getStringValue(state, Constant.AGENT_ID);
Long agentId = Long.valueOf(agentIdStr);
DbConfigBO dbConfig = databaseUtil.getAgentDbConfig(agentId);
return executeSqlQuery(state, currentStep, sqlQuery, dbConfig, agentId);
}
private Map<String, Object> executeSqlQuery(OverAllState state, Integer currentStep, String sqlQuery,
DbConfigBO dbConfig, Long agentId) {
// 准备查询参数
DbQueryParameter dbQueryParameter = new DbQueryParameter();
dbQueryParameter.setSql(sqlQuery);
dbQueryParameter.setSchema(dbConfig.getSchema());
Accessor dbAccessor = databaseUtil.getAgentAccessor(agentId);
final Map<String, Object> result = new HashMap<>();
// 执行SQL查询并获取结果
ResultSetBO resultSetBO = dbAccessor.executeSqlAndReturnObject(dbConfig, dbQueryParameter);
// 调用大模型获取图表配置信息
DisplayStyleBO displayStyleBO = enrichResultSetWithChartConfig(state, resultSetBO);
// 更新步骤结果
Map<String, String> existingResults = StateUtil.getObjectValue(state, SQL_EXECUTE_NODE_OUTPUT,
Map.class, new HashMap<>());
Map<String, String> updatedResults = PlanProcessUtil.addStepResult(existingResults, currentStep,
strResultSetJson);
// 准备返回结果
result.putAll(Map.of(SQL_EXECUTE_NODE_OUTPUT, updatedResults, SQL_REGENERATE_REASON,
SqlRetryDto.empty(), SQL_RESULT_LIST_MEMORY, resultSetBO.getData(), PLAN_CURRENT_STEP,
currentStep + 1, SQL_GENERATE_COUNT, 0));
return result;
}
}SqlExecuteNode的行动流程:
- 1. 从状态中获取要执行的SQL
- 2. 动态获取数据库连接配置(不同Agent可能连接不同数据库)
- 3. 执行SQL查询
- 4. 调用大模型分析结果,生成图表配置(比如判断用柱状图还是折线图)
- 5. 把结果存入状态,供后续节点使用
这就是工具调用 + 代码执行的行动。
行动2:Python代码执行
/**
* Python执行节点:运行Python代码获取分析结果
*/
@Slf4j
@Component
public class PythonExecuteNode implements NodeAction {
private final CodePoolExecutorService codePoolExecutor;
private final ObjectMapper objectMapper;
private final JsonParseUtil jsonParseUtil;
private final CodeExecutorProperties codeExecutorProperties;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 获取生成的Python代码
String pythonCode = StateUtil.getStringValue(state, PYTHON_GENERATE_NODE_OUTPUT);
// 获取SQL执行结果作为输入数据
List<Map<String, String>> sqlResults = StateUtil.hasValue(state, SQL_RESULT_LIST_MEMORY)
? StateUtil.getListValue(state, SQL_RESULT_LIST_MEMORY) : new ArrayList<>();
// 检查重试次数
int triesCount = StateUtil.getObjectValue(state, PYTHON_TRIES_COUNT, Integer.class, 0);
// 构建执行任务
CodePoolExecutorService.TaskRequest taskRequest = new CodePoolExecutorService.TaskRequest(pythonCode,
objectMapper.writeValueAsString(sqlResults), null);
// 运行Python代码
CodePoolExecutorService.TaskResponse taskResponse = this.codePoolExecutor.runTask(taskRequest);
if (!taskResponse.isSuccess()) {
// 执行失败,检查是否超过最大重试次数
if (triesCount >= codeExecutorProperties.getPythonMaxTriesCount()) {
// 超过重试次数,启动降级策略
return handleFallback(state, taskResponse);
}
// 抛出异常,触发重试
throw new RuntimeException("Python execution failed");
}
// 执行成功,处理输出结果
String stdout = taskResponse.stdOut();
// Python输出的JSON字符串可能有Unicode转义,解析回汉字
Object value = jsonParseUtil.tryConvertToObject(stdout, Object.class);
if (value != null) {
stdout = objectMapper.writeValueAsString(value);
}
return Map.of(PYTHON_EXECUTE_NODE_OUTPUT, stdout, PYTHON_IS_SUCCESS, true);
}
}PythonExecuteNode的行动流程:
- 1. 获取PythonGenerateNode生成的Python代码
- 2. 获取SQL执行结果作为输入数据
- 3. 在隔离环境中执行Python代码(可能是Docker容器,保证安全)
- 4. 如果失败,自动重试(最多3次)
- 5. 如果一直失败,启动降级策略(跳过Python分析,继续后续流程)
这就是代码执行 + 错误处理 + 降级策略的行动。
行动3:报告生成
/**
* 报告生成节点:创建综合分析报告
*/
@Slf4j
@Component
public class ReportGeneratorNode implements NodeAction {
private final LlmService llmService;
private final UserPromptService promptConfigService;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 获取必要输入
String plannerNodeOutput = StateUtil.getStringValue(state, PLANNER_NODE_OUTPUT);
String userInput = StateUtil.getCanonicalQuery(state);
Integer currentStep = StateUtil.getObjectValue(state, PLAN_CURRENT_STEP, Integer.class, 1);
@SuppressWarnings("unchecked")
HashMap<String, String> executionResults = StateUtil.getObjectValue(state, SQL_EXECUTE_NODE_OUTPUT,
HashMap.class, new HashMap<>());
// 解析计划,获取当前步骤
Plan plan = converter.convert(plannerNodeOutput);
ExecutionStep executionStep = getCurrentExecutionStep(plan, currentStep);
String summaryAndRecommendations = executionStep.getToolParameters().getSummaryAndRecommendations();
// 生成报告流
Flux<ChatResponse> reportGenerationFlux = generateReport(userInput, plan, executionResults,
summaryAndRecommendations, agentId);
// ...返回结果
}
private Flux<ChatResponse> generateReport(String userInput, Plan plan, HashMap<String, String> executionResults,
String summaryAndRecommendations, Long agentId) {
// 构建用户需求和计划描述
String userRequirementsAndPlan = buildUserRequirementsAndPlan(userInput, plan);
// 构建分析步骤和数据结果描述
String analysisStepsAndData = buildAnalysisStepsAndData(plan, executionResults);
// 获取优化配置
List<UserPromptConfig> optimizationConfigs = promptConfigService.getOptimizationConfigs("report-generator",
agentId);
// 构建报告生成提示词
String reportPrompt = PromptHelper.buildReportGeneratorPromptWithOptimization(userRequirementsAndPlan,
analysisStepsAndData, summaryAndRecommendations, optimizationConfigs);
// 调用大模型生成报告
return llmService.callUser(reportPrompt);
}
}ReportGeneratorNode的行动流程:
- 1. 收集所有前置节点的执行结果(SQL查询结果、Python分析结果、图表配置)
- 2. 构建完整的提示词,包含用户需求、执行计划、所有数据结果
- 3. 调用大模型生成结构化的分析报告
- 4. 清空中间状态,准备迎接下一个用户请求
这就是内容生成的行动——把所有中间结果整合成一份人类可读的报告。
三者闭环:感知→思考→行动→(反馈)→感知...
Agent的真正威力在于:感知、思考、行动不是一次性的,而是一个持续运转的闭环。
┌─────────────┐
│ 感知环境 │
│ (用户输入) │
└──────┬──────┘
│
▼
┌─────────────┐
│ 思考决策 │
│ (意图识别、 │
│ 任务规划) │
└──────┬──────┘
│
▼
┌─────────────┐
│ 执行行动 │
│ (SQL查询、 │
│ Python执行) │
└──────┬──────┘
│
▼
┌─────────────┐
│ 观察结果 │
│ (成功?失败?│
│ 需要调整?) │
└──────┬──────┘
│
└────────→ 回到"感知",继续下一轮DataAgent中的闭环体现
DataAgent的工作流完美体现了这个闭环:
第一轮循环(感知→思考→行动→观察):
- 1. 感知:用户问"近三个月销售趋势如何?"
- 2. 思考:意图识别→查询增强→规划(3个步骤:查数据、画图、写报告)
- 3. 行动:SQL生成节点写出SQL
- 4. 观察:语义一致性节点检查SQL是否正确
第二轮循环(根据反馈调整):
- • 如果SQL检查通过 → 继续执行SQL
- • 如果SQL检查不通过 → 回到SQL生成节点,重新生成
第三轮循环(执行和再调整):
- 1. 感知:SQL执行成功,拿到了数据
- 2. 思考:Python生成节点分析数据特点,决定画什么图
- 3. 行动:Python执行节点运行代码
- 4. 观察:如果Python代码报错,回到Python生成节点重试
这个闭环可以一直运转,直到任务完成或者达到最大重试次数。
关键洞察:Agent的智能不仅体现在单次思考的质量,更体现在闭环运转的鲁棒性——出错时能感知、能调整、能恢复。
本章小结
- 1. 感知(Perception):Agent通过文本、语音、系统状态等多种方式接收输入。DataAgent不仅感知用户当前输入,还感知对话历史、知识库证据、数据库Schema等上下文信息。
- 2. 思考(Reasoning/Planning):Agent的思考分为三个层次:
- • 意图识别:判断用户想干什么
- • 任务拆解:把大问题拆成小步骤
- • 细节规划:每一步具体怎么做
DataAgent通过QueryEnhanceNode、PlannerNode等节点实现分层思考。 - 3. 行动(Action):Agent的行动包括工具调用、代码执行、内容生成等。DataAgent的SqlExecuteNode执行SQL查询,PythonExecuteNode运行Python代码,ReportGeneratorNode生成最终报告。
- 4. 闭环运转:感知→思考→行动→观察反馈→再感知... Agent的智能体现在能根据执行结果不断调整,直到任务完成。
- 5. 大模型是大脑,工作流是骨架:Agent的思考能力依赖大模型,但仅靠大模型不够。DataAgent通过16+个节点的精心编排,让大模型在合适的时机做合适的事,形成完整的智能体。
思考题
- 1. 感知的边界:在DataAgent中,IntentRecognitionNode不仅读取用户输入,还读取
MULTI_TURN_CONTEXT(多轮对话历史)。假设用户连续提问:如果没有多轮对话感知,第二句会出现什么问题?Agent会怎么误解用户的意图?
- • 第一句:"查一下上个月的销售额"
- • 第二句:"再对比一下前年的"
- 2. 思考的分层:DataAgent的PlannerNode会生成一个包含多个步骤的执行计划。假设用户问:"分析近半年各产品线的销售趋势,找出增长最快的产品,并预测下个月的销量"。
你觉得PlannerNode会生成几个步骤?每个步骤分别调用什么工具(sql_generate、python_generate、report_generator)?为什么?
- 3. 闭环的鲁棒性:PythonExecuteNode在执行失败时会重试,超过最大次数后启动降级策略。假设你在使用DataAgent分析数据,Python代码因为数据格式问题一直执行失败,Agent进入降级模式。作为用户,你希望Agent如何告诉你这个情况?是直接说"Python分析失败",还是更详细地说明"由于数据包含异常值,自动分析暂时不可用,这是基础统计结果..."?哪种方式更体现Agent的"智能"?
第三章:LLM是Agent的大脑——大语言模型的角色
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgentAgent就像一个人
想象一下,你面前站着一个"数据分析助手"。他能听懂你的问题,会查数据库,会写代码,还会画图做报告。这个助手为什么这么厉害?
如果把Agent比作一个人:
- • 工具(SQL、Python) 是他的手脚——负责干活
- • 工作流(StateGraph) 是他的神经系统——负责协调
- • LLM(大语言模型) 是他的大脑皮层——负责思考、理解、决策
没有手脚,人无法行动;没有神经系统,手脚会乱成一团;但没有大脑,人就只是一具空壳。LLM在Agent中扮演的正是这个"大脑"的角色。
本章将带你理解:LLM到底是什么?它有哪些核心能力?DataAgent如何利用LLM来驱动整个数据分析流程?
3.1 什么是LLM(大语言模型)
3.1.1 简单比喻:一个读了海量书籍的"学霸"
LLM(Large Language Model,大语言模型) 本质上是一个经过特殊训练的计算机程序。你可以把它想象成一个"超级学霸":
- • 他读过互联网上数以亿计的书籍、文章、代码、对话记录
- • 在这个过程中,他学会了语言的规律、语法、逻辑,以及各行各业的知识
- • 当你问他问题时,他不是"搜索"答案,而是基于学到的知识"生成"答案
就像人类大脑通过神经元连接来存储和加工信息,LLM通过数以千亿计的参数(可以理解为虚拟的"神经元连接")来理解和生成语言。
小知识:GPT-4、Claude、通义千问、文心一言等都是LLM的具体产品。DataAgent支持接入多种LLM,通过统一接口调用。
3.1.2 LLM不是搜索引擎
很多初学者容易混淆LLM和搜索引擎的区别:
| 特性 | 搜索引擎(如百度、Google) | LLM(如ChatGPT) |
|---|---|---|
| 工作方式 | 检索已有网页,返回链接 | 基于知识生成新回答 |
| 对问题的理解 | 关键词匹配 | 语义理解,能处理复杂表达 |
| 推理能力 | 无 | 能进行多步逻辑推理 |
| 知识时效性 | 实时(最新网页) | 有截止日期(训练数据的截止时间) |
| 创造性 | 无 | 能创作文章、写代码、生成方案 |
举个例子:
- • 你问搜索引擎"2024年Q3销售额环比下降的原因",它返回的是包含这些关键词的网页链接
- • 你问LLM同样的问题,它会分析"销售额"、"环比下降"、"原因"这些概念之间的关系,给出一个结构化的分析框架
3.2 LLM的四大核心能力
LLM之所以能成为Agent的"大脑",是因为它具备四种关键能力:
3.2.1 理解能力(Understanding)
LLM能理解人类语言的真正含义,而不只是字面意思。
例子:
- • 用户说:"上个月北京卖得怎么样?"
- • LLM理解:"上个月"=上一个月,"北京"=地区维度,"卖得怎么样"=销售额/销量指标
- • 它还能理解这是口语化表达,需要转化为正式的数据查询
在DataAgent中,QueryEnhanceNode就利用LLM的理解能力,把用户的口语化问题改写成规范的数据分析查询。
3.2.2 生成能力(Generation)
LLM能根据需求生成高质量的文本、代码、结构化数据。
例子:
- • 需求:"查询2024年北京地区各月的销售额"
- • LLM生成:
SELECT month, SUM(amount) FROM sales WHERE city='北京' AND year=2024 GROUP BY month
在DataAgent中,SqlGenerateNode利用LLM的生成能力,根据自然语言描述自动编写SQL语句。
3.2.3 推理能力(Reasoning)
LLM能进行多步逻辑推理,把复杂问题拆解成可执行的步骤。
例子:
- • 用户问:"分析近一年各地区销售趋势,找出增长最快的前3个地区,并预测下季度表现"
- • LLM推理过程:
- 1. 先提取近一年的销售数据(SQL查询)
- 2. 计算各地区增长率(Python计算)
- 3. 排序找出前3名(Python排序)
- 4. 基于历史趋势做预测(Python预测模型)
- 5. 汇总成报告(文本生成)
在DataAgent中,PlannerNode利用LLM的推理能力,把用户的复杂问题拆解成一步一步的执行计划。
3.2.4 总结能力(Summarization)
LLM能从大量信息中提取关键要点,生成简洁的总结。
例子:
- • 输入:1000行销售数据 + 5个分析图表 + 统计指标
- • LLM总结:"华东地区Q3销售额同比增长23%,主要驱动力来自新客户增长(+35%),但客户留存率下降5%需关注。"
在DataAgent中,ReportGeneratorNode利用LLM的总结能力,把分析结果转化为可读的业务报告。
3.3 LLM如何驱动DataAgent
现在我们来看DataAgent中LLM的实际工作流程。想象一下用户问了一个问题:"分析上季度各产品线的利润情况"。
步骤1:理解用户意图
用户输入:"分析上季度各产品线的利润情况"
↓
IntentRecognitionNode(意图识别)调用LLM
↓
LLM判断:这是一个"数据分析"意图,需要查询数据库并生成报告LLM不是盲目执行,而是先"想清楚"用户想要什么。是查数据?还是生成报告?还是需要预测?
步骤2:生成执行计划
PlannerNode(计划生成)调用LLM
↓
LLM分析:
- 需要哪些表?products、sales、costs
- 需要几步?
Step 1: SQL查询各产品线收入和成本
Step 2: Python计算利润率和排名
Step 3: 生成分析报告
↓
输出:Plan对象(JSON格式)这就是LLM的推理能力在发挥作用——把大问题拆成小步骤。
步骤3:编写SQL/Python代码
SqlGenerateNode调用LLM
↓
LLM根据Plan中的指令生成SQL:
SELECT product_line, SUM(revenue) as total_revenue,
SUM(cost) as total_cost
FROM sales s JOIN costs c ON s.product_id = c.product_id
WHERE quarter = '2024Q3'
GROUP BY product_line这里用到的是LLM的生成能力。LLM学过大量的SQL语法和数据库知识,所以能写出正确的查询语句。
步骤4:总结分析结果
ReportGeneratorNode调用LLM
↓
输入:SQL查询结果 + Python分析结果
↓
LLM生成报告:
"上季度各产品线利润分析:
1. 电子产品线利润最高(¥1,200万,利润率28%)
2. 家居产品线利润增长最快(环比+15%)
3. 服装产品线利润率下降,需关注成本控制"这是LLM的总结能力——把冷冰冰的数据变成有洞察力的业务语言。
3.4 DataAgent中的LLM调用抽象
DataAgent没有直接硬编码某个LLM的调用方式,而是定义了一个统一的接口:LlmService。
3.4.1 LlmService接口设计
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/LlmService.java
public interface LlmService {
// 同时传入系统提示词和用户提示词
Flux<ChatResponse> call(String system, String user);
// 只传入系统提示词
Flux<ChatResponse> callSystem(String system);
// 只传入用户提示词
Flux<ChatResponse> callUser(String user);
// 将流式响应转换为字符串流
default Flux<String> toStringFlux(Flux<ChatResponse> responseFlux) {
return responseFlux.map(ChatResponseUtil::getText);
}
}设计亮点:
- • 统一接口:无论底层是通义千问、GPT-4还是其他模型,上层代码都用同样的方式调用
- • 流式返回:
Flux<ChatResponse>表示响应是流式的,用户可以实时看到LLM的输出,不用干等 - • 三种调用模式:同时传system+user、只传system、只传user,覆盖不同场景
3.4.2 系统提示词 vs 用户提示词
在调用LLM时,DataAgent区分两种提示词:
| 类型 | 作用 | 例子 |
|---|---|---|
| System Prompt(系统提示词) | 定义Agent的角色、能力、约束 | "你是一个数据分析专家,擅长SQL和Python..." |
| User Prompt(用户提示词) | 携带具体的任务内容 | "查询2024年北京地区的销售额" |
这种分离的好处是:系统提示词定义"你是谁",用户提示词定义"你要做什么"。当用户连续问多个问题时,系统提示词可以复用,只替换用户提示词即可。
3.5 Prompt Engineering基础
3.5.1 什么是Prompt
Prompt(提示词) 就是你给LLM的"任务说明书"。就像你给下属布置任务时需要说清楚要求,给LLM的Prompt也需要清晰明确。
一个差的Prompt:
"查一下销售数据"问题:太模糊。LLM不知道要查哪个表、哪些字段、什么时间段。
一个好的Prompt:
角色:你是一位数据分析专家,精通SQL查询。
任务:根据下面的数据库Schema,编写SQL查询。
Schema:
- sales表:id, product_name, amount, sale_date, region
要求:
1. 查询2024年北京地区的销售总额
2. 按月份分组
3. 返回月份和销售额两列
输出格式:只返回SQL代码,不要解释。3.5.2 好的Prompt = 清晰的角色 + 明确的任务 + 具体的格式要求
DataAgent的Prompt设计遵循这个公式:
1. 清晰的角色
你是一位拥有深厚业务洞察力的高级数据分析专家。
你的核心职责是解析用户的业务问题,并基于给定的数据库Schema,
制定一个严谨、可执行的分步执行计划。2. 明确的任务
核心任务流程:
1. 解构需求:深入分析用户问题,明确核心业务目标
2. Schema验证:确认计划查询的字段在表中真实存在
3. 制定策略:创建逻辑清晰的多步计划
4. 生成计划:输出JSON对象3. 具体的格式要求
输出格式(必须是合法的JSON):
{
"thought_process": "分析思路...",
"execution_plan": [
{"step": 1, "tool_to_use": "SQL_GENERATE_NODE", ...}
]
}3.6 DataAgent中的Prompt设计实战
3.6.1 系统Prompt定义Agent角色
DataAgent的PlannerNode使用了一个精心设计的系统Prompt模板(planner.txt),定义了Agent的角色和能力边界:
# 角色:高级数据分析智能体 (Senior Data Analysis Agent)
你是一位拥有深厚业务洞察力的高级数据分析专家。你的核心职责是解析用户的业务问题,
并基于给定的数据库Schema,制定一个严谨、可执行的分步执行计划(Plan)。
你需要决定在每一步使用什么工具,从而将模糊的业务问题转化为最终的洞察结论。
**重要原则:你必须且只能输出一个合法的JSON对象。严禁包含markdown标记、注释或任何JSON结构之外的文本。**
# 可用工具 (Available Tools)
## 1. SQL_GENERATE_NODE
- 核心用途:执行SQL查询以提取或聚合数据
- 参数:instruction(详细的SQL需求描述)
## 2. PYTHON_GENERATE_NODE
- 核心用途:当SQL难以满足需求时使用
- 适用于:复杂逻辑计算、数据清洗、高级统计分析、图表绘制
## 3. REPORT_GENERATOR_NODE
- 核心用途:流程的最后一步,整合所有输出,总结发现并给出建议这个Prompt的精妙之处:
- • 角色定位清晰:"高级数据分析专家"——LLM会调用相关知识来生成更专业的计划
- • 能力边界明确:只能用三种工具,不能天马行空
- • 输出格式强制:必须输出合法JSON,方便程序解析
3.6.2 Prompt模板文件的位置
DataAgent的所有Prompt模板都放在 resources/prompts/ 目录下:
resources/prompts/
├── planner.txt # 计划生成Prompt
├── new-sql-generate.txt # SQL生成Prompt
├── python-generator.txt # Python代码生成Prompt
├── report-generator-plain.txt # 报告生成Prompt
├── intent-recognition.txt # 意图识别Prompt
├── query-enhancement.txt # 查询增强Prompt
└── ...这种模板化设计的优势:
- • 修改Prompt不需要改代码,只需编辑文本文件
- • 不同场景(SQL生成、Python生成、报告生成)有独立的Prompt,互不干扰
- • 可以通过配置动态切换Prompt,实现A/B测试
3.6.3 PlannerNode中的Prompt构建
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PlannerNode.java
public class PlannerNode implements NodeAction {
private final LlmService llmService;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 获取用户查询
String canonicalQuery = StateUtil.getCanonicalQuery(state);
// 获取数据库Schema信息
SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
String schemaStr = PromptHelper.buildMixMacSqlDbPrompt(schemaDTO, true);
// 获取证据信息(RAG检索结果)
String evidence = StateUtil.getStringValue(state, EVIDENCE);
// 构建模板参数
BeanOutputConverter<Plan> beanOutputConverter = new BeanOutputConverter<>(Plan.class);
Map<String, Object> params = Map.of(
"user_question", canonicalQuery,
"schema", schemaStr,
"evidence", evidence,
"format", beanOutputConverter.getFormat()
);
// 渲染Prompt模板
String plannerPrompt = PromptConstant.getPlannerPromptTemplate().render(params);
// 调用LLM生成计划
return llmService.callUser(plannerPrompt);
}
}关键流程:
- 1. 从State中获取上下文(用户问题、Schema、证据)
- 2. 用
BeanOutputConverter生成JSON格式说明(告诉LLM应该输出什么结构) - 3. 将变量填充到Prompt模板中
- 4. 调用
llmService.callUser()执行LLM推理
3.7 结构化输出:让LLM返回JSON
3.7.1 为什么需要结构化输出
LLM天生擅长生成自然语言,但Agent需要程序可解析的数据结构。如果LLM的输出是:
"我觉得应该先查sales表,然后算一下各地区的销售额..."程序很难从中提取"要查什么表"、"要做什么计算"。但如果LLM输出的是JSON:
{
"thought_process": "先查sales表获取原始数据,再按地区分组计算销售额",
"execution_plan": [
{"step": 1, "tool_to_use": "SQL_GENERATE_NODE", "tool_parameters": {"instruction": "查询sales表..."}}
]
}程序就能直接解析,知道第一步要调用SQL_GENERATE_NODE,参数是什么。
3.7.2 DataAgent的结构化输出设计
DataAgent定义了两个核心的结构化输出DTO:
Plan(执行计划):
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/dto/planner/Plan.java
@Data
public class Plan {
@JsonProperty("thought_process")
@JsonPropertyDescription("简要描述你的分析思路。必须明确提到你检查了哪些表和字段")
private String thoughtProcess;
@JsonProperty("execution_plan")
@JsonPropertyDescription("执行计划的步骤列表")
private List<ExecutionStep> executionPlan;
}ExecutionStep(执行步骤):
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/dto/planner/ExecutionStep.java
@Data
public class ExecutionStep {
@JsonProperty("step")
@JsonPropertyDescription("步骤顺序号")
private int step;
@JsonProperty("tool_to_use")
@JsonPropertyDescription("工具名称")
private String toolToUse;
@JsonProperty("tool_parameters")
@JsonPropertyDescription("工具参数")
private ToolParameters toolParameters;
@Data
@JsonInclude(JsonInclude.Include.NON_NULL) // 忽略null值,让JSON更干净
public static class ToolParameters {
@JsonProperty("instruction")
@JsonPropertyDescription("当工具是SQL_GENERATE_NODE时,填写详细的SQL需求")
private String instruction;
@JsonProperty("sql_query")
@JsonPropertyDescription("SQL_GENERATE_NODE运行完后,会把生成的SQL塞进来")
private String sqlQuery;
@JsonProperty("summary_and_recommendations")
@JsonPropertyDescription("REPORT_GENERATOR_NODE专用,报告的大纲")
private String summaryAndRecommendations;
}
}3.7.3 如何让LLM输出JSON
DataAgent使用了Spring AI的BeanOutputConverter来实现结构化输出:
// 在PlannerNode中
BeanOutputConverter<Plan> beanOutputConverter = new BeanOutputConverter<>(Plan.class);
// 将JSON格式说明注入Prompt
Map<String, Object> params = Map.of(
// ... 其他参数
"format", beanOutputConverter.getFormat()
);beanOutputConverter.getFormat()会生成这样的格式说明:
你的响应必须是以下JSON格式:
{
"thought_process": "string类型,简要描述你的分析思路",
"execution_plan": [
{
"step": "integer类型,步骤顺序号",
"tool_to_use": "string类型,工具名称",
"tool_parameters": {
"instruction": "string类型,详细的SQL需求"
}
}
]
}这样,LLM就知道"我要输出什么结构",而不是自由发挥写一段话。
3.7.4 一个真实的Plan示例
当用户问"分析极曜汽车近一年的购车线索转化质量"时,LLM生成的Plan可能是:
{
"thought_process": "用户想要分析线索转化质量。我已确认leads_table表包含渠道、区域、时间以及各转化阶段字段。计划:先按渠道和区域提取转化漏斗数据,再用Python计算转化率和排名,最后生成报告。",
"execution_plan": [
{
"step": 1,
"tool_to_use": "SQL_GENERATE_NODE",
"tool_parameters": {
"instruction": "按渠道来源分组,查询近一年的线索转化漏斗核心指标(留资数、到店数、试驾数、下定数、交车数)"
}
},
{
"step": 2,
"tool_to_use": "SQL_GENERATE_NODE",
"tool_parameters": {
"instruction": "按地理区域(省份、城市)分组,查询近一年的线索转化漏斗核心指标"
}
},
{
"step": 3,
"tool_to_use": "PYTHON_GENERATE_NODE",
"tool_parameters": {
"instruction": "基于步骤1和步骤2的结果,计算各地区转化率,识别Top 5高转化率城市和Top 5低转化率城市"
}
},
{
"step": 4,
"tool_to_use": "REPORT_GENERATOR_NODE",
"tool_parameters": {
"summary_and_recommendations": "总结高转化率渠道和区域的共同特征,指出转化漏斗瓶颈,提出优化建议"
}
}
]
}这个JSON被程序解析后,PlanExecutorNode就会按顺序执行这4个步骤。
3.8 本章小结
本章我们学习了LLM在Agent中的核心地位:
- 1. LLM是Agent的大脑:负责理解、推理、生成和总结
- 2. 四大核心能力:
- • 理解能力:听懂用户的真实意图
- • 生成能力:写出SQL、Python代码和报告
- • 推理能力:把复杂问题拆解成执行步骤
- • 总结能力:从数据中提取业务洞察
- 3. Prompt Engineering:好的Prompt = 清晰的角色 + 明确的任务 + 具体的格式要求
- 4. 结构化输出:通过JSON格式让LLM的输出可被程序解析,Plan和ExecutionStep是DataAgent的核心数据结构
- 5. LlmService抽象:统一接口屏蔽底层LLM差异,支持流式响应
理解LLM的角色后,下一章我们将学习Agent的"手脚"——工具系统。没有工具,LLM只能"空想";有了工具,Agent才能真正改变世界。
思考题
- 1. 场景题:假设用户输入"最近手机卖得怎么样",请尝试写一段Prompt(包含角色、任务、格式要求),让LLM生成一个包含SQL查询和Python分析的两步Plan。
- 2. 分析题:DataAgent的PlannerNode在构建Prompt时,会把数据库Schema、用户问题、证据信息都传给LLM。为什么要传Schema?如果不传Schema,LLM可能会犯什么错误?
第四章:工具是Agent的手脚——Tool Use与Function Calling
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent人之所以强大,是因为会用工具
想象一个场景:
- • 原始人徒手打不过狮子,但拿起长矛就能狩猎
- • 木匠徒手砍不断大树,但用锯子就能精准加工
- • 程序员徒手算不出千万级数据的统计,但写一段Python代码就能秒出结果
工具,是人类文明的基石。
Agent也是如此。LLM虽然聪明,但它"困在"服务器里:不能上网查最新新闻,不能连接你的数据库,不能运行代码,不能生成文件。没有工具,LLM只能给你"纸上谈兵"的建议;有了工具,Agent才能真正解决问题。
如果把Agent比作一个人:
- • LLM 是大脑,负责思考
- • 工具 是手脚,负责行动
- • Function Calling 是神经系统,负责"大脑指挥手脚"
本章将带你理解:Agent为什么需要工具?有哪些常见工具?DataAgent如何利用Function Calling机制来调用SQL、Python等工具?以及如何保证工具调用的安全性?
4.1 为什么Agent需要工具
4.1.1 LLM的三大局限
尽管LLM很聪明,但它有三个根本性的局限:
局限1:知识有截止日期
LLM的知识来自训练数据,而训练数据有截止时间。比如GPT-4的知识截止到2024年初,它不知道:
- • 今天的股价是多少
- • 明天的天气怎么样
- • 你家数据库里有什么表
类比:就像一位学识渊博但隐居山林的学者,他读过很多书,但不知道外面世界正在发生什么。
局限2:不能直接与外部世界交互
LLM运行在封闭的环境中,它不能:
- • 连接你的MySQL数据库执行查询
- • 访问你的公司内部系统
- • 发送邮件或消息
- • 浏览实时网页
它只能"说话"(生成文本),不能"动手"(执行操作)。
局限3:数学计算和代码执行需要精确性
LLM擅长"理解"数学问题,但不一定擅长"计算":
- • 问它"12345 × 67890 = ?",它可能算错
- • 让它心算一个复杂统计,结果可能不精确
- • 让它"假想"运行代码,输出可能和实际运行不一致
类比:就像心算和用计算器的区别。再聪明的人,面对复杂计算也需要工具。
4.1.2 工具弥补LLM的短板
| LLM的局限 | 对应的工具 | 作用 |
|---|---|---|
| 知识有截止日期 | 搜索工具 | 获取实时信息 |
| 不能连接数据库 | 数据库工具 | 执行SQL查询 |
| 计算不精确 | 计算工具 | 执行Python代码 |
| 不能持久化结果 | 文件工具 | 读写文件、生成报告 |
核心思想:让LLM做它擅长的事(理解、推理、决策),让工具做它们擅长的事(精确执行、外部交互)。
4.2 常见工具类型
在数据分析Agent中,常见的工具有以下几类:
4.2.1 搜索工具(获取实时信息)
作用:弥补LLM知识时效性的不足,获取最新信息。
例子:
- • 用户问"昨天特斯拉的股价是多少?"
- • Agent调用搜索工具,查询金融API或搜索引擎
- • 把搜索结果返回给LLM,LLM再生成回答
在DataAgent中,EvidenceRecallNode就是一种特殊的"搜索工具"——它在向量数据库中搜索与用户问题相关的业务知识文档,把搜索结果作为"证据"提供给LLM。
4.2.2 数据库工具(SQL查询)
作用:让Agent能够查询结构化数据。
例子:
- • 用户问"上季度各地区的销售额"
- • Agent生成SQL:
SELECT region, SUM(amount) FROM sales WHERE quarter='2024Q3' GROUP BY region - • 调用数据库工具执行SQL
- • 把查询结果返回给LLM
在DataAgent中,SqlExecuteNode就是数据库工具的执行节点。它连接MySQL等数据库,执行LLM生成的SQL,并把结果返回。
4.2.3 计算工具(Python执行)
作用:执行复杂计算、数据清洗、统计分析、图表绘制。
例子:
- • SQL查询返回了原始数据
- • 用户要求"计算环比增长率,并画出趋势图"
- • Agent生成Python代码,调用Pandas计算增长率,调用Matplotlib画图
- • 调用Python执行工具运行代码
- • 把执行结果(图表数据)返回给LLM
在DataAgent中,PythonExecuteNode就是计算工具的执行节点。它在Docker沙箱中运行Python代码,确保安全。
4.2.4 文件工具(读写文件)
作用:持久化分析结果,生成报告文件。
例子:
- • 分析完成后,Agent生成一份HTML报告
- • 调用文件工具把报告保存到磁盘
- • 用户可以通过链接下载报告
在DataAgent中,ReportGeneratorNode会生成HTML/Markdown格式的分析报告,并支持导出。
4.3 Function Calling机制
现在我们来到本章的核心:Function Calling(函数调用)。这是LLM与工具交互的标准机制。
4.3.1 Function Calling的工作流程
Function Calling的本质是:LLM决定"我要调用什么工具",程序执行工具,再把结果返回给LLM。
┌─────────────┐ 1.用户提问 ┌─────────────┐
│ 用户 │ ──────────────────> │ LLM │
│ │ │ (大脑) │
└─────────────┘ └──────┬──────┘
│
│ 2.分析:需要调用工具
▼
┌─────────────┐
│ 生成调用 │
│ 请求: │
│ tool: SQL │
│ params: {} │
└──────┬──────┘
│
│ 3.执行工具
▼
┌─────────────┐
│ 工具 │
│ (SQL执行) │
└──────┬──────┘
│
│ 4.返回结果
▼
┌─────────────┐
│ 查询结果 │
│ {数据...} │
└──────┬──────┘
│
│ 5.带着结果继续思考
▼
┌─────────────┐
│ LLM │
│ 生成最终 │
│ 回答/下一步 │
└──────┬──────┘
│
│ 6.输出给用户
▼
┌─────────────┐
│ 用户 │
│ (看到结果) │
└─────────────┘关键洞察:LLM不是直接执行工具,而是生成调用指令。程序解析指令,执行工具,再把结果喂回给LLM。这是一个"LLM → 工具 → LLM"的循环。
4.3.2 类比:Function Calling就像"开处方"
想象LLM是一位医生:
- • 病人(用户)说"我头疼、发烧"
- • 医生(LLM)诊断后,开处方:"需要做血常规检查"
- • 护士(程序)拿到处方,执行检查(抽血、化验)
- • 护士把检查报告交给医生
- • 医生根据报告,给出诊断:"你是病毒性感冒,需要休息和服药"
医生不会亲自操作化验仪器,但他知道什么时候该开什么检查。这就是Function Calling的精髓。
4.4 DataAgent中的工具系统
DataAgent的工具系统由多个**Node(节点)**组成,每个节点负责一类工具的调用。这些节点被编排在一个StateGraph工作流中,由PlanExecutorNode统一调度。
4.4.1 SQL执行工具:SqlExecuteNode
SqlExecuteNode负责执行LLM生成的SQL查询,是DataAgent最基础的工具节点。
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SqlExecuteNode.java
@Slf4j
@Component
@AllArgsConstructor
public class SqlExecuteNode implements NodeAction {
private final DatabaseUtil databaseUtil;
private final Nl2SqlService nl2SqlService;
private final LlmService llmService;
private final DataAgentProperties properties;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 1. 获取当前步骤和SQL查询
Integer currentStep = PlanProcessUtil.getCurrentStepNumber(state);
String sqlQuery = StateUtil.getStringValue(state, SQL_GENERATE_OUTPUT);
sqlQuery = nl2SqlService.sqlTrim(sqlQuery);
log.info("Executing SQL query: {}", sqlQuery);
// 2. 获取Agent对应的数据库配置
String agentIdStr = StateUtil.getStringValue(state, Constant.AGENT_ID);
Long agentId = Long.valueOf(agentIdStr);
DbConfigBO dbConfig = databaseUtil.getAgentDbConfig(agentId);
// 3. 执行SQL查询
return executeSqlQuery(state, currentStep, sqlQuery, dbConfig, agentId);
}
}核心流程:
- 1. 从State中获取LLM生成的SQL语句
- 2. 根据Agent ID获取对应的数据库连接配置(DataAgent支持多租户,不同Agent连接不同数据库)
- 3. 创建数据库访问器,执行SQL
- 4. 处理查询结果,更新State
执行SQL的关键代码:
// 构建查询参数
DbQueryParameter dbQueryParameter = new DbQueryParameter();
dbQueryParameter.setSql(sqlQuery);
dbQueryParameter.setSchema(dbConfig.getSchema());
// 获取数据库访问器
Accessor dbAccessor = databaseUtil.getAgentAccessor(agentId);
// 执行SQL并获取结果
ResultSetBO resultSetBO = dbAccessor.executeSqlAndReturnObject(dbConfig, dbQueryParameter);
// 将结果存入State,供后续节点使用
Map<String, String> updatedResults = PlanProcessUtil.addStepResult(
existingResults, currentStep, strResultSetJson
);设计亮点:
- • 多租户支持:通过
agentId动态获取数据库配置,不同Agent可以连接不同的数据库 - • 结果丰富化:执行SQL后,还会调用LLM生成图表配置(
enrichResultSetWithChartConfig),决定数据应该用表格、柱状图还是折线图展示 - • 错误处理:SQL执行失败时,会记录错误信息,触发重试或降级逻辑
4.4.2 Python执行工具:PythonExecuteNode
PythonExecuteNode负责在沙箱环境中执行LLM生成的Python代码,用于复杂计算、数据分析和图表生成。
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PythonExecuteNode.java
@Slf4j
@Component
public class PythonExecuteNode implements NodeAction {
private final CodePoolExecutorService codePoolExecutor;
private final ObjectMapper objectMapper;
private final JsonParseUtil jsonParseUtil;
private final CodeExecutorProperties codeExecutorProperties;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 1. 获取Python代码和SQL结果数据
String pythonCode = StateUtil.getStringValue(state, PYTHON_GENERATE_NODE_OUTPUT);
List<Map<String, String>> sqlResults = StateUtil.hasValue(state, SQL_RESULT_LIST_MEMORY)
? StateUtil.getListValue(state, SQL_RESULT_LIST_MEMORY) : new ArrayList<>();
// 2. 构建任务请求
CodePoolExecutorService.TaskRequest taskRequest = new CodePoolExecutorService.TaskRequest(
pythonCode,
objectMapper.writeValueAsString(sqlResults), // SQL结果作为输入数据
null
);
// 3. 在Docker沙箱中执行Python代码
CodePoolExecutorService.TaskResponse taskResponse = this.codePoolExecutor.runTask(taskRequest);
// 4. 处理执行结果
if (!taskResponse.isSuccess()) {
// 执行失败,检查是否需要重试
int triesCount = StateUtil.getObjectValue(state, PYTHON_TRIES_COUNT, Integer.class, 0);
if (triesCount >= codeExecutorProperties.getPythonMaxTriesCount()) {
// 超过最大重试次数,启动降级策略
log.error("Python执行失败且已超过最大重试次数,采用降级策略");
return createFallbackResult(state);
}
throw new RuntimeException("Python Execute Failed: " + taskResponse.stdErr());
}
// 5. 执行成功,返回结果
String stdout = taskResponse.stdOut();
log.info("Python Execute Success! StdOut: {}", stdout);
return Map.of(PYTHON_EXECUTE_NODE_OUTPUT, stdout, PYTHON_IS_SUCCESS, true);
}
}核心流程:
- 1. 从State中获取LLM生成的Python代码
- 2. 从State中获取上一步SQL查询的结果数据(作为Python的输入)
- 3. 调用
codePoolExecutor.runTask()在沙箱中执行代码 - 4. 如果成功,返回标准输出(stdout);如果失败,根据重试策略处理
为什么需要Python工具?
SQL擅长"查数据",但不擅长"深度分析"。比如:
- • 计算环比/同比增长率(需要跨行计算)
- • 做线性回归预测(需要统计库)
- • 绘制复杂图表(需要可视化库)
- • 数据清洗和转换(需要灵活的逻辑)
这些任务用Python的Pandas、NumPy、Matplotlib等库来做,比SQL更高效。
4.4.3 向量检索工具:EvidenceRecallNode
EvidenceRecallNode是DataAgent的"知识检索工具",负责在向量数据库中搜索与用户问题相关的业务知识。
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/EvidenceRecallNode.java
@Slf4j
@Component
public class EvidenceRecallNode implements NodeAction {
private final LlmService llmService;
private final AgentVectorStoreService vectorStoreService;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 1. 获取用户问题和Agent ID
String question = StateUtil.getStringValue(state, INPUT_KEY);
String agentId = StateUtil.getStringValue(state, AGENT_ID);
// 2. 调用LLM进行查询重写(把口语化问题改写成适合检索的查询)
String prompt = PromptHelper.buildEvidenceQueryRewritePrompt(multiTurn, question);
Flux<ChatResponse> responseFlux = llmService.callUser(prompt);
// 3. 从LLM响应中提取重写后的查询
String standaloneQuery = extractStandaloneQuery(llmOutput);
// 4. 在向量数据库中检索相关文档
List<Document> businessTermDocuments = vectorStoreService
.getDocumentsForAgent(agentId, standaloneQuery, DocumentMetadataConstant.BUSINESS_TERM);
List<Document> agentKnowledgeDocuments = vectorStoreService
.getDocumentsForAgent(agentId, standaloneQuery, DocumentMetadataConstant.AGENT_KNOWLEDGE);
// 5. 构建格式化的证据内容
String evidence = buildFormattedEvidenceContent(businessTermDocuments, agentKnowledgeDocuments);
// 6. 返回证据,供后续节点使用
return Map.of(EVIDENCE, evidence);
}
}核心流程:
- 1. 获取用户的原始问题
- 2. 调用LLM把口语化问题改写成"检索友好"的查询(比如把"最近手机卖得怎么样"改写成"手机产品销售趋势分析")
- 3. 在向量数据库中检索相关的业务知识文档(如产品定义、指标口径、业务规则)
- 4. 把检索结果格式化成"证据"文本,注入到后续Prompt中
为什么需要这个工具?
因为不同公司有不同的业务术语和规则。比如:
- • "活跃用户"在A公司定义是"7天内有登录",在B公司是"30天内有下单"
- • "GMV"是否包含退款?不同公司口径不同
- • "新客"是指首次注册还是首次下单?
这些业务知识存储在向量数据库中,EvidenceRecallNode负责把它们找出来,告诉LLM:"你分析时要按这个口径来"。
4.5 工具调用的安全性
让LLM执行SQL和Python代码,就像让一位"聪明的实习生"直接操作生产数据库——必须做好安全防护。
4.5.1 SQL执行的安全性
DataAgent在SQL执行层面做了以下防护:
1. 只读查询(默认)
DataAgent的SQL生成Prompt中明确要求"只生成SELECT查询",禁止生成INSERT、UPDATE、DELETE、DROP等修改性语句。
2. 数据库权限控制
为Agent配置的数据库账号应该只有只读权限,即使LLM生成了危险SQL,数据库也会拒绝执行。
3. Schema隔离
不同Agent连接不同的数据库/Schema,一个Agent无法访问其他Agent的数据。
4.5.2 Python执行的安全性:Docker沙箱
Python代码的危险性更高——它可以删除文件、访问网络、消耗大量资源。DataAgent使用Docker容器沙箱来隔离Python执行环境。
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/code/impls/DockerCodePoolExecutorService.java
public class DockerCodePoolExecutorService extends AbstractCodePoolExecutorService
implements CodePoolExecutorService {
private final DockerClient dockerClient;
// 创建容器的HostConfig(安全隔离配置)
private HostConfig createHostConfig(Path tempDir) {
HostConfig config = newHostConfig()
// 1. 内存限制:防止内存耗尽攻击
.withMemory(this.properties.getLimitMemory() * 1024L * 1024L)
// 2. CPU限制:防止CPU占满
.withCpuCount(this.properties.getCpuCore())
// 3. 丢弃所有Linux Capability:禁止特权操作
.withCapDrop(Capability.ALL)
// 4. 网络隔离:限制网络访问
.withNetworkMode(this.properties.getNetworkMode())
// 5. 临时文件系统:容器内/tmp是临时的
.withTmpFs(Map.of("/tmp", ""));
// 6. 只读挂载:把脚本文件挂载为只读
if (!this.isRemote) {
List<Bind> binds = new ArrayList<>();
binds.add(new Bind(
tempDir.resolve("script.py").toAbsolutePath().toString(),
new Volume("/app/script.py"),
AccessMode.ro // 只读!
));
config.withBinds(binds.toArray(new Bind[0]));
}
return config;
}
}Docker沙箱的六层安全防护:
| 安全措施 | 作用 | 防护的攻击 |
|---|---|---|
| 内存限制 | 限制容器最大内存 | 内存耗尽攻击(OOM) |
| CPU限制 | 限制可用CPU核心数 | CPU占满攻击 |
| Capability丢弃 | 禁止所有特权系统调用 | 提权、内核攻击 |
| 网络隔离 | 限制容器网络访问 | 内网探测、数据外发 |
| 只读挂载 | 脚本文件不可修改 | 篡改执行代码 |
| 临时文件系统 | /tmp目录不持久化 | 恶意文件驻留 |
4.5.3 超时控制
除了资源隔离,DataAgent还设置了严格的超时机制:
// Docker容器执行超时
dockerClient.waitContainerCmd(containerId)
.start()
.awaitCompletion(this.properties.getContainerTimeout(), TimeUnit.SECONDS);
// Python代码执行超时(通过timeout命令)
private String buildExecutionCommand(Path tempDir) {
return String.format(
"... timeout -s SIGKILL %s python3 -u script.py ...",
properties.getCodeTimeout()
);
}如果Python代码陷入死循环或执行时间过长,会被强制终止,避免占用系统资源。
4.5.4 重试与降级
当工具调用失败时,DataAgent有完善的容错机制:
// 在PythonExecuteNode中
if (!taskResponse.isSuccess()) {
int triesCount = StateUtil.getObjectValue(state, PYTHON_TRIES_COUNT, Integer.class, 0);
if (triesCount >= codeExecutorProperties.getPythonMaxTriesCount()) {
// 超过最大重试次数,启动降级策略
log.error("Python执行失败且已超过最大重试次数,采用降级策略继续处理");
String fallbackOutput = "{}";
return createFallbackResult(state);
}
// 未超过重试次数,抛出异常触发重试
throw new RuntimeException("Python Execute Failed: " + taskResponse.stdErr());
}降级策略:如果Python执行多次都失败,Agent不会直接报错退出,而是返回一个空结果({}),让工作流继续执行后续节点(如报告生成),只是分析深度会降低。
4.6 本章小结
本章我们学习了Agent的"手脚"——工具系统:
- 1. 为什么需要工具:LLM有三大局限(知识截止日期、不能外部交互、计算不精确),工具弥补这些短板
- 2. 常见工具类型:搜索工具、数据库工具、计算工具、文件工具
- 3. Function Calling机制:LLM生成调用指令 → 程序执行工具 → 结果返回LLM → LLM继续思考
- 4. DataAgent的工具系统:
- • SqlExecuteNode:执行SQL查询,连接多租户数据库
- • PythonExecuteNode:在Docker沙箱中执行Python代码
- • EvidenceRecallNode:在向量数据库中检索业务知识
- 5. 安全性:
- • SQL层:只读查询 + 数据库权限控制
- • Python层:Docker沙箱隔离(内存/CPU/Capability/网络/只读挂载/临时文件)
- • 超时控制:防止死循环和资源耗尽
- • 重试与降级:失败时不中断整个流程
理解工具系统后,你应该能回答这个问题:为什么Agent比单纯的LLM更强大? 因为Agent = LLM(大脑)+ 工具(手脚)+ 工作流(神经系统)。三者结合,才能真正解决复杂的实际问题。
思考题
- 1. 场景题:假设用户问"帮我删除销售额为0的订单记录",DataAgent的SQL生成节点会如何处理?为什么这是不安全的?请从Prompt设计、数据库权限、业务风险三个角度分析。
- 2. 设计题:如果你要为一个"智能客服Agent"设计工具系统,除了本章提到的搜索、数据库、Python、文件工具外,你还需要什么工具?请列举2-3个,并说明它们的作用和调用方式。
第五章:记忆让Agent更聪明
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent金鱼的烦恼
想象一下,你养了一条金鱼。每次你走到鱼缸前,它都会惊恐地游开——因为它完全不记得你是谁。你昨天才喂过它,今天它却像第一次见到你一样。金鱼的记忆只有7秒,这让它无法建立任何持续的关系,也无法从过去的经验中学习。
早期的AI就像这条金鱼。你问它"我叫什么名字",它回答"我不知道"。你刚告诉它"我叫小明",下一秒再问,它又忘了。每次对话都是全新的开始,没有任何记忆的积累。
但人类不一样。你能记住昨天和朋友聊了什么,能回忆起上周学过的知识,甚至能想起十年前的一次旅行。正是这种记忆能力,让你可以持续学习、不断成长。
那么,如何让AI Agent也拥有记忆呢?这就是本章要探讨的核心问题。
5.1 为什么Agent需要记忆
5.1.1 没有记忆的Agent有多笨
假设你正在用一个数据分析Agent查询销售情况:
你:"帮我查一下上个月的销售额"
Agent:"上个月销售额是150万元。"
你:"那再帮我看看利润是多少?"
Agent:"请问您要查哪个月的利润?"
你:"就是刚才说的那个月啊!"
Agent:"抱歉,我不记得刚才说的是哪个月。"
这种体验让人抓狂。没有记忆的Agent,每次交互都是孤立的,无法理解上下文,更做不到连贯的对话。
5.1.2 记忆让Agent更智能
有了记忆,Agent就能:
- 1. 保持对话连贯:记住用户之前说过的话,理解指代关系
- 2. 积累用户偏好:记住用户喜欢什么格式、关注什么指标
- 3. 避免重复劳动:记住已经查过的数据,直接复用
- 4. 持续学习进化:从每次交互中积累经验,越用越聪明
记忆是智能的基石。没有记忆,Agent永远是个"新手";有了记忆,Agent才能成长为"专家"。
5.2 短期记忆:Agent的"工作记忆"
5.2.1 什么是短期记忆
人类的短期记忆就像大脑的"工作台",能同时存放几件事。比如你在算账时,会记住"刚才加了15,现在要乘3",算完就忘了。
Agent的短期记忆也一样,它保存当前对话的上下文,让Agent知道"刚才说了什么"。对话结束后,短期记忆通常会被清空。
5.2.2 DataAgent的短期记忆实现
DataAgent使用MultiTurnContextManager来管理短期记忆。它就像Agent的"记事本",记录着对话的每一轮内容。
// 文件路径:DataAgent-main/data-agent-management/.../MultiTurnContextManager.java
@Component
public class MultiTurnContextManager {
// 用线程安全的Map存储每个对话的上下文
private final ConcurrentHashMap<String, Deque<ConversationTurn>> contextStore =
new ConcurrentHashMap<>();
@Autowired
private DataAgentProperties properties;
/**
* 开始新一轮对话
*/
public void beginTurn(String threadId, String userQuery) {
Deque<ConversationTurn> turns = contextStore.computeIfAbsent(
threadId, k -> new LinkedList<>()
);
// 如果超出最大轮数,移除最旧的记录
int maxHistory = properties.getMaxturnhistory(); // 默认5轮
while (turns.size() >= maxHistory) {
turns.pollFirst(); // 移除最早的一轮
}
// 创建新的对话轮次
ConversationTurn newTurn = new ConversationTurn();
newTurn.setUserQuery(userQuery);
newTurn.setTimestamp(System.currentTimeMillis());
turns.addLast(newTurn);
}
/**
* 追加Agent的回复片段(流式输出时逐步积累)
*/
public void appendPlannerChunk(String threadId, String chunk) {
Deque<ConversationTurn> turns = contextStore.get(threadId);
if (turns != null && !turns.isEmpty()) {
ConversationTurn currentTurn = turns.getLast();
currentTurn.appendAssistantResponse(chunk);
}
}
/**
* 完成当前轮次
*/
public void finishTurn(String threadId) {
// 可以在这里做清理或持久化操作
}
}看,这个设计很巧妙:
- • 用
ConcurrentHashMap保证线程安全,支持多用户同时对话 - • 用
Deque(双端队列)实现"先进先出",自动淘汰旧记忆 - • 每轮对话包含用户问题和Agent回复,形成完整的上下文
5.2.3 短期记忆的使用
有了存储,还要会用。DataAgent在每次处理用户请求时,都会把短期记忆注入到提示词中:
// 文件路径:DataAgent-main/data-agent-management/.../GraphServiceImpl.java
public class GraphServiceImpl implements GraphService {
@Autowired
private MultiTurnContextManager multiTurnContextManager;
@Override
public Flux<GraphNodeResponse> streamSearch(GraphRequest request) {
String threadId = request.getThreadId();
String userQuery = request.getQuery();
// 1. 开始新一轮对话
multiTurnContextManager.beginTurn(threadId, userQuery);
// 2. 构建上下文(包含历史对话)
String multiTurnContext = multiTurnContextManager.buildContext(threadId);
// 3. 将上下文放入状态,供后续节点使用
OverAllState state = new OverAllState();
state.put(Constant.MULTI_TURN_CONTEXT, multiTurnContext);
state.put(Constant.CURRENT_QUERY, userQuery);
// 4. 执行工作流...
return executeGraph(state);
}
}这样,当Agent生成SQL或Python代码时,提示词中已经包含了之前的对话内容,Agent就能理解"刚才说的那个月"指的是什么了。
5.2.4 短期记忆的边界
短期记忆不是越多越好。DataAgent通过配置控制记忆容量:
// 文件路径:DataAgent-main/data-agent-management/.../DataAgentProperties.java
@ConfigurationProperties(prefix = "spring.ai.alibaba.data-agent")
public class DataAgentProperties {
/**
* 最大保留几轮对话历史
*/
private int maxturnhistory = 5;
/**
* 单轮对话最大长度(字符数)
*/
private int maxplanlength = 2000;
// getter和setter...
}为什么限制5轮?因为:
- 1. LLM的上下文窗口有限:太多历史会挤占当前问题的空间
- 2. 太远的历史帮助不大:用户通常只关心最近几轮的内容
- 3. 成本考虑:更多token意味着更高的调用费用
配置提示:你可以在
application.yml中调整这些参数:spring:
ai:
alibaba:
data-agent:
maxturnhistory: 5
maxplanlength: 2000
5.3 长期记忆:Agent的"知识库"
5.3.1 什么是长期记忆
短期记忆像便利贴,用完就扔;长期记忆像图书馆,永久保存。
Agent的长期记忆包括:
- • 业务知识:产品定义、指标口径、业务规则
- • 用户偏好:张三喜欢看表格,李四喜欢图表
- • 历史案例:上次类似的问题是怎么解决的
- • 文档资料:产品手册、API文档、操作指南
5.3.2 向量数据库:长期记忆的"书架"
长期记忆需要专门的存储方式。DataAgent使用向量数据库(Vector Store),它就像图书馆的书架,能根据"语义相似度"快速找到相关资料。
传统数据库是精确匹配:你搜"苹果",只能找到包含"苹果"两个字的内容。
向量数据库是语义匹配:你搜"苹果",还能找到"iPhone"、"苹果手机"、"Apple公司",因为它们在语义空间中是相近的。
传统数据库 向量数据库
┌─────────┐ ┌─────────┐
│ 苹果 │ │ 苹果 │◄──┐
│ 香蕉 │ │ iPhone │ │ 语义相近
│ 橘子 │ │ Apple │◄──┘
│ 梨 │ │ 香蕉 │
└─────────┘ │ 橘子 │
精确匹配 └─────────┘
语义相似度搜索5.3.3 DataAgent的向量存储服务
DataAgent通过AgentVectorStoreService管理长期记忆:
// 文件路径:DataAgent-main/data-agent-management/.../AgentVectorStoreServiceImpl.java
@Service
public class AgentVectorStoreServiceImpl implements AgentVectorStoreService {
@Autowired
private VectorStore vectorStore; // Spring AI提供的向量存储抽象
@Autowired
private DataAgentProperties properties;
/**
* 为指定Agent搜索相关文档
*/
@Override
public List<Document> getDocumentsForAgent(String agentId, String query, VectorType vectorType) {
// 1. 构建搜索请求,带过滤条件
SearchRequest searchRequest = SearchRequest.builder()
.query(query)
.topK(properties.getVectorStore().getDefaultTopkLimit()) // 默认返回8条
.similarityThreshold(properties.getVectorStore().getDefaultSimilarityThreshold()) // 相似度阈值0.4
.filterExpression("agentId == '" + agentId + "' && vectorType == '" + vectorType.name() + "'")
.build();
// 2. 执行相似度搜索
List<Document> documents = vectorStore.similaritySearch(searchRequest);
return documents;
}
/**
* 高级搜索:支持混合搜索(向量+关键词)
*/
@Override
public List<Document> search(String agentId, String query, VectorType vectorType, boolean hybrid) {
if (hybrid) {
// 混合搜索:结合语义相似度和关键词匹配
return doHybridSearch(agentId, query, vectorType);
}
return getDocumentsForAgent(agentId, query, vectorType);
}
}这里有几个关键概念:
- • TopK:返回最相似的前K条结果(默认8条)
- • SimilarityThreshold:相似度门槛,低于这个值的会被过滤(默认0.4)
- • FilterExpression:按AgentID和类型过滤,确保不同Agent的记忆不混淆
- • HybridSearch:结合语义搜索和关键词搜索,提高召回率
5.3.4 文档的存储与检索
长期记忆不是凭空出现的,需要先把资料"录入"。DataAgent支持多种文档类型:
// 文件路径:DataAgent-main/data-agent-management/.../DataAgentProperties.java
public class DataAgentProperties {
/**
* 文本分片配置
*/
private TextSplitterProperties textSplitter = new TextSplitterProperties();
public static class TextSplitterProperties {
private int chunkSize = 1000; // 每个片段约1000字符
private int chunkOverlap = 200; // 片段间重叠200字符,避免断句
private String splitterType = "recursive"; // 分片策略
}
}为什么要把文档切小?
因为LLM的输入长度有限,一次塞不进整本手册。而且,小块内容更精准:用户问"退货政策",只需找到相关的那一段,不用把整本手册都传给LLM。
重叠200字符是为什么?
防止关键信息被切断。比如"退款条件:商品未使用且包装完整",如果不重叠,可能被切成"退款条件:商品未使用"和"且包装完整",语义就不完整了。
5.4 RAG:给Agent装上"外接大脑"
5.4.1 什么是RAG
RAG(Retrieval-Augmented Generation,检索增强生成)是Agent使用长期记忆的核心机制。
它的工作流程就像开卷考试:
- 1. 用户提问:"我们的退货政策是什么?"
- 2. 检索资料:去知识库找到相关的政策文档
- 3. 拼接提示:把问题和资料一起给LLM
- 4. 生成答案:LLM基于资料回答问题
用户提问 ──► 检索相关资料 ──► 拼接提示词 ──► LLM生成答案
│
▼
[向量数据库]
业务知识库
历史文档5.4.2 DataAgent的RAG实现:EvidenceRecallNode
DataAgent的EvidenceRecallNode专门负责RAG检索。它就像Agent的"图书管理员",根据用户问题去找相关资料。
// 文件路径:DataAgent-main/data-agent-management/.../EvidenceRecallNode.java
@Component
public class EvidenceRecallNode implements NodeAction {
@Autowired
private AgentVectorStoreService vectorStoreService;
@Autowired
private LlmService llmService;
@Override
public Map<String, Object> apply(OverAllState state) {
String query = StateUtil.getStringValue(state, Constant.CURRENT_QUERY);
String agentId = StateUtil.getStringValue(state, Constant.AGENT_ID);
// 第一步:查询改写(可选)
// 把口语化的问题改写成更适合检索的形式
String enhancedQuery = enhanceQuery(query);
// 第二步:检索业务术语(BUSINESS_TERM)
List<Document> businessTerms = vectorStoreService.getDocumentsForAgent(
agentId, enhancedQuery, VectorType.BUSINESS_TERM
);
// 第三步:检索Agent专属知识(AGENT_KNOWLEDGE)
List<Document> agentKnowledge = vectorStoreService.getDocumentsForAgent(
agentId, enhancedQuery, VectorType.AGENT_KNOWLEDGE
);
// 第四步:格式化证据内容
StringBuilder evidence = new StringBuilder();
evidence.append("=== 相关业务术语 ===\n");
for (Document doc : businessTerms) {
evidence.append("- ").append(doc.getContent()).append("\n");
}
evidence.append("\n=== 相关知识 ===\n");
for (Document doc : agentKnowledge) {
evidence.append("- ").append(doc.getContent()).append("\n");
}
// 第五步:将证据存入状态,供后续节点使用
Map<String, Object> result = new HashMap<>();
result.put(Constant.EVIDENCE_CONTENT, evidence.toString());
return result;
}
/**
* 查询改写:把"那个指标"改写成具体的指标名称
*/
private String enhanceQuery(String query) {
// 简单实现:直接返回原查询
// 高级实现:用LLM进行查询扩展和同义词替换
return query;
}
}这个节点的工作流程很清晰:
- 1. 查询改写:把用户的口语化表达改写成更规范的检索词
- 2. 分类检索:同时检索"业务术语"和"Agent知识"两类资料
- 3. 格式化输出:把检索结果整理成清晰的文本,方便后续节点使用
- 4. 状态传递:通过
OverAllState把证据传递给PlannerNode等后续节点
5.4.3 RAG的优势与局限
优势:
- • 知识实时更新:不用重新训练模型,只需更新知识库
- • 答案可溯源:能告诉用户"这个答案来自哪份文档"
- • 减少幻觉:LLM基于真实资料回答,而不是瞎编
- • 保护隐私:敏感数据留在本地,不传给大模型厂商
局限:
- • 检索质量决定答案质量:如果找错了资料,答案也会错
- • 需要维护知识库:文档要定期更新,分片策略要调优
- • 复杂推理仍有挑战:跨文档的关联推理比较困难
RAG不是万能的,但在大多数企业场景中,它是让Agent拥有专业知识的最佳方案。
5.5 记忆管理的实战策略
5.5.1 短期记忆 vs 长期记忆:如何分工
| 特性 | 短期记忆 | 长期记忆 |
|---|---|---|
| 存储内容 | 当前对话历史 | 业务知识、用户偏好 |
| 存储位置 | 内存(ConcurrentHashMap) | 向量数据库 |
| 生命周期 | 对话期间 | 永久 |
| 容量限制 | 5轮对话 | 无上限(受存储限制) |
| 使用场景 | 理解上下文指代 | 回答专业问题 |
| 实现复杂度 | 低 | 高 |
5.5.2 记忆清理与重置
DataAgent提供了记忆管理功能:
// 文件路径:DataAgent-main/data-agent-management/.../MultiTurnContextManager.java
public class MultiTurnContextManager {
/**
* 重置指定对话的所有记忆
*/
public void clearContext(String threadId) {
contextStore.remove(threadId);
}
/**
* 丢弃当前未完成的轮次(用户取消或出错时)
*/
public void discardPending(String threadId) {
Deque<ConversationTurn> turns = contextStore.get(threadId);
if (turns != null && !turns.isEmpty()) {
ConversationTurn lastTurn = turns.getLast();
if (!lastTurn.isCompleted()) {
turns.removeLast(); // 移除未完成的轮次
}
}
}
/**
* 重新开始最后一轮(用户反馈修改时)
*/
public void restartLastTurn(String threadId, String newQuery) {
Deque<ConversationTurn> turns = contextStore.get(threadId);
if (turns != null && !turns.isEmpty()) {
turns.removeLast(); // 移除旧的
beginTurn(threadId, newQuery); // 创建新的
}
}
}这些方法在以下场景很有用:
- • clearContext:用户说"我们重新开始",清空所有历史
- • discardPending:Agent生成到一半出错了,丢弃不完整的内容
- • restartLastTurn:用户说"不对,我的意思是...",基于原问题重新理解
5.5.3 记忆的持久化
短期记忆存在内存里,服务重启就丢了。如果需要持久化,可以:
- 1. 定期快照:把内存中的对话历史写入Redis或数据库
- 2. 异步持久化:对话结束后,异步保存到数据库
- 3. 分级存储:热数据在内存,温数据在Redis,冷数据在数据库
DataAgent目前使用内存存储短期记忆,这是为了简单和性能。在生产环境中,建议接入Redis实现分布式共享。
5.6 本章小结
本章我们探讨了Agent的记忆机制,这是让Agent从"工具"升级为"助手"的关键能力。
核心概念回顾:
- 1. 短期记忆:保存当前对话的上下文,让Agent理解"刚才说了什么"。DataAgent用
MultiTurnContextManager实现,默认保留5轮对话。 - 2. 长期记忆:保存业务知识和用户偏好,让Agent成为"专家"。DataAgent用向量数据库存储,通过语义相似度检索相关资料。
- 3. RAG:检索增强生成,是Agent使用长期记忆的核心机制。先检索相关资料,再基于资料生成答案,减少幻觉,提高专业性。
- 4. 记忆管理:需要控制记忆容量、提供清理重置机制、考虑持久化策略。
DataAgent的实践亮点:
- • 线程安全的
ConcurrentHashMap存储短期记忆 - • 可配置的
maxturnhistory和maxplanlength控制记忆容量 - •
AgentVectorStoreService提供语义检索和混合搜索 - •
EvidenceRecallNode实现两阶段RAG(查询改写+向量检索) - • 状态机
OverAllState在各节点间传递记忆
思考题
- 1. 场景设计:假设你在开发一个客服Agent,用户可能会说"还是刚才那个问题"、"换种方式问"、"我指的是前天说的那件事"。你会如何设计记忆机制来应对这些场景?
- 2. 技术权衡:DataAgent的短期记忆默认只保留5轮。如果增加到20轮,会带来什么好处和问题?你会如何平衡?
- 3. RAG优化:如果你发现Agent经常检索到不相关的文档,或者遗漏了关键文档,你会从哪些方面优化RAG的效果?(提示:可以从查询改写、分片策略、相似度阈值等角度思考)
第六章:规划让Agent更有条理——Planning与任务拆解
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent做一顿年夜饭
春节前夕,你决定为家人做一顿丰盛的年夜饭。走进厨房,面对冰箱里的食材,你会怎么做?
没有规划的人会手忙脚乱:先炒个青菜,发现米饭还没煮;刚腌好的鱼下了锅,发现葱姜蒜没切;等所有菜炒完,第一道菜已经凉了...
有规划的人会提前列好菜单:
- 1. 先规划菜单:冷盘4个、热菜6个、汤1个、主食饺子
- 2. 再列采购清单:缺什么食材、缺什么调料
- 3. 安排备菜顺序:先泡发干货,再腌制肉类,最后处理蔬菜
- 4. 确定烹饪顺序:汤先炖上(需要2小时),米饭煮上(需要40分钟),同时准备凉菜,最后炒热菜
- 5. 上桌时机:确保所有热菜同时出锅,汤和主食刚好配齐
这就是规划的力量——把复杂的大目标拆解成有序的小步骤,让执行有条不紊。
Agent面对复杂任务时,也需要这样的规划能力。本章我们就来看看,Agent是如何"先想后做"的。
6.1 为什么Agent需要规划
6.1.1 复杂问题不能一步到位
假设你问Agent:"分析一下上季度各产品线的销售趋势,找出下滑最严重的产品,预测下季度走势,并生成一份带图表的分析报告。"
这个任务包含多少子任务?
- 1. 查询上季度各产品线的销售数据
- 2. 计算同比、环比增长率
- 3. 识别下滑最严重的产品线
- 4. 分析下滑原因(可能需要对比历史数据)
- 5. 用统计模型预测下季度走势
- 6. 生成可视化图表(趋势图、对比图)
- 7. 整合所有结果,生成结构化报告
如果Agent没有规划能力,可能会:
- • 顺序混乱:先生成报告模板,才发现数据还没查
- • 遗漏步骤:忘了做预测分析,直接给结论
- • 重复劳动:两次查询相同的数据
- • 无法纠错:做到第6步发现第2步算错了,全盘重来
6.1.2 规划的本质:拆解与排序
规划的核心就是两件事:
- 1. 拆解(Decomposition):把大目标拆成小任务
- 2. 排序(Sequencing):确定任务的执行顺序
对人类来说,这是本能。你出门旅行,会自动想:定机票→订酒店→查攻略→收拾行李→去机场。
但对Agent来说,这种能力不是天生的,需要专门设计。这就是规划能力(Planning)——让Agent像项目经理一样,先制定计划,再逐步执行。
好的规划 = 正确的步骤 + 合理的顺序 + 可执行的粒度
6.2 常见的规划模式
在AI领域,研究者们提出了多种规划模式。我们介绍三种最常见的。
6.2.1 Chain-of-Thought(思维链):一步步思考
早期的LLM回答问题像考试蒙选择题——直接给答案,不对就错了。
直接回答模式:
问:一个农场有鸡和兔共35只,脚共94只,鸡兔各几只?
答:鸡23只,兔12只。(可能是对的,也可能是错的,不知道咋算的)
Chain-of-Thought模式(思维链):
问:一个农场有鸡和兔共35只,脚共94只,鸡兔各几只?
答:让我逐步思考:
- 1. 假设全是鸡,应该有 35×2=70 只脚
- 2. 实际多了 94-70=24 只脚
- 3. 每只兔比鸡多2只脚,所以兔有 24÷2=12 只
- 4. 鸡有 35-12=23 只
- 5. 验证:23×2 + 12×4 = 46+48=94,正确
答案是:鸡23只,兔12只。
看出区别了吗?思维链把"黑盒推理"变成了"白盒推理",不仅答案更准,而且错了也能发现哪步错了。
DataAgent的Plan对象就是思维链的具象化:
// 文件路径:DataAgent-main/data-agent-management/.../dto/planner/Plan.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Plan {
@JsonProperty("thought_process")
@JsonPropertyDescription("简要描述你的分析思路。必须明确提到你检查了哪些表和字段")
private String thoughtProcess;
@JsonProperty("execution_plan")
@JsonPropertyDescription("执行计划的步骤列表")
private List<ExecutionStep> executionPlan;
// ...其他方法
}thoughtProcess字段就是Agent的"内心独白",记录了它为什么要这样规划。
6.2.2 ReAct(推理+行动):边想边做
ReAct(Reasoning + Acting)模式不一次性制定完整计划,而是每一步都先"思考"再"行动",根据行动结果调整下一步。
思考1:用户要查销售额,我应该先找销售表
│
▼
行动1:查询数据库表名
│
▼
观察1:找到表`orders`,有amount字段
│
▼
思考2:好的,我可以查这个表的amount总和
│
▼
行动2:执行SQL查询
│
▼
观察2:结果为150万
│
▼
思考3:任务完成,可以回复用户了
│
▼
最终答案:上月销售额为150万元ReAct更灵活,适合开放性探索任务;DataAgent的"规划-执行"模式更可控,适合企业级数据分析。两者没有绝对优劣,看场景选择。
6.2.3 Plan-and-Execute(先规划再执行)
这是DataAgent采用的模式:先一次性制定完整计划,再按步骤执行。
优点:
- • 计划全局可见,便于人工审核
- • 执行路径清晰,便于调试和优化
- • 适合有明确目标的任务(如数据分析)
缺点:
- • 灵活性不如ReAct
- • 如果前期规划有误,后期需要重新规划
DataAgent通过动态修复机制弥补了这个缺点——执行失败时可以重新规划。
6.3 DataAgent的规划系统
6.3.1 规划的数据结构:Plan + ExecutionStep
DataAgent把规划结果抽象为两个核心类:
Plan(计划):
// 文件路径:DataAgent-main/data-agent-management/.../dto/planner/Plan.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Plan {
@JsonProperty("thought_process")
@JsonPropertyDescription("简要描述你的分析思路。必须明确提到你检查了哪些表和字段")
private String thoughtProcess;
@JsonProperty("execution_plan")
@JsonPropertyDescription("执行计划的步骤列表")
private List<ExecutionStep> executionPlan;
// 为NL2SQL模式准备的简单计划(直接查询,不做复杂分析)
public static String nl2SqlPlan() {
return NL2SQL_PLAN_JSON;
}
}ExecutionStep(执行步骤):
// 文件路径:DataAgent-main/data-agent-management/.../dto/planner/ExecutionStep.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ExecutionStep {
@JsonProperty("step")
@JsonPropertyDescription("步骤顺序号")
private int step;
@JsonProperty("tool_to_use")
@JsonPropertyDescription("工具名称")
private String toolToUse;
@JsonProperty("tool_parameters")
@JsonPropertyDescription("工具参数")
private ToolParameters toolParameters;
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL) // 序列化时忽略null值
public static class ToolParameters {
@JsonProperty("instruction")
@JsonPropertyDescription("SQL_GENERATE_NODE时:详细的SQL需求;PYTHON_GENERATE_NODE时:详细的编程需求")
private String instruction;
@JsonProperty("summary_and_recommendations")
@JsonPropertyDescription("REPORT_GENERATOR_NODE专用,报告的大纲")
private String summaryAndRecommendations;
@JsonProperty("sql_query")
@JsonPropertyDescription("SQL_GENERATE_NODE运行完后,会把生成的SQL塞进来")
private String sqlQuery;
}
}这个设计非常清晰:
- • thoughtProcess:Agent的"内心独白",解释为什么要这样规划
- • executionPlan:具体的任务清单,每一步都有序号、工具和参数
- • ExecutionStep:原子操作,不可再分的基本任务单元
- • ToolParameters:根据工具类型携带不同参数(SQL指令、Python指令、报告大纲)
6.3.2 规划器:PlannerNode
PlannerNode是DataAgent的"项目经理"。它接收用户的需求,分析现状,制定执行计划。
用户Query + 历史上下文 + 数据库Schema + 检索到的证据
│
▼
┌───────────────┐
│ PlannerNode │
│ (规划器) │
└───────────────┘
│
▼
Plan(JSON格式)
thoughtProcess + executionPlan[]PlannerNode的实现:
// 文件路径:DataAgent-main/data-agent-management/.../workflow/node/PlannerNode.java
@Slf4j
@Component
@AllArgsConstructor
public class PlannerNode implements NodeAction {
private final LlmService llmService;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 是否为NL2SQL模式(直接翻译SQL,不做复杂分析)
Boolean onlyNl2sql = state.value(IS_ONLY_NL2SQL, false);
// NL2SQL模式走简化流程,否则走标准规划流程
Flux<ChatResponse> flux = onlyNl2sql ? handleNl2SqlOnly() : handlePlanGenerate(state);
// ...流式输出处理
}
private Flux<ChatResponse> handlePlanGenerate(OverAllState state) {
// 获取查询增强节点的输出(经过改写和扩展的用户问题)
String canonicalQuery = StateUtil.getCanonicalQuery(state);
log.info("Using processed query for planning: {}", canonicalQuery);
// 检查是否为修复模式(用户之前否决了计划,需要重新生成)
String validationError = StateUtil.getStringValue(state, PLAN_VALIDATION_ERROR, null);
if (validationError != null) {
log.info("Regenerating plan with user feedback: {}", validationError);
} else {
log.info("Generating initial plan");
}
// 构建提示参数
String semanticModel = (String) state.value(GENEGRATED_SEMANTIC_MODEL_PROMPT).orElse("");
SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
String schemaStr = PromptHelper.buildMixMacSqlDbPrompt(schemaDTO, true);
// 构建用户提示(如果是修复模式,会包含错误信息)
String userPrompt = buildUserPrompt(canonicalQuery, validationError, state);
String evidence = StateUtil.getStringValue(state, EVIDENCE);
// 构建模板参数,传入LLM
BeanOutputConverter<Plan> beanOutputConverter = new BeanOutputConverter<>(Plan.class);
Map<String, Object> params = Map.of(
"user_question", userPrompt,
"schema", schemaStr,
"evidence", evidence,
"semantic_model", semanticModel,
"plan_validation_error", formatValidationError(validationError),
"format", beanOutputConverter.getFormat()
);
// 生成计划:渲染Prompt模板 + 调用LLM
String plannerPrompt = PromptConstant.getPlannerPromptTemplate().render(params);
return llmService.callUser(plannerPrompt);
}
private Flux<ChatResponse> handleNl2SqlOnly() {
// NL2SQL模式直接返回预置的简单计划
return Flux.just(ChatResponseUtil.createPureResponse(Plan.nl2SqlPlan()));
}
}这里有几个关键点:
- 1. 上下文整合:规划器不是凭空规划,而是基于Schema(数据库结构)、Evidence(业务知识)、Context(对话历史)综合判断
- 2. 结构化输出:通过
BeanOutputConverter强制LLM输出JSON,而不是自由文本 - 3. 修复模式:如果
PLAN_VALIDATION_ERROR存在,说明之前的计划被否决了,会重新规划 - 4. NL2SQL简化:对于简单的SQL查询,直接返回预置的单步计划,提高效率
6.3.3 规划提示词模板
好的提示词决定了规划质量。DataAgent的Planner提示词非常详细:
# 角色:高级数据分析智能体 (Senior Data Analysis Agent)
你是一位拥有深厚业务洞察力的高级数据分析专家。你的核心职责是解析用户的业务问题,并基于给定的数据库 Schema,制定一个严谨、可执行的分步执行计划(Plan)。
**重要原则:你必须且只能输出一个合法的 JSON 对象。严禁包含 markdown 标记(如 ```json)、注释或任何 JSON 结构之外的文本。**
# 用户反馈处理 (最高优先级)
{plan_validation_error}
**如果上方存在用户反馈信息:**
1. **强制执行**:反馈中包含的要求必须在新的计划中得到满足。
2. **绝对合规**:用户反馈的各个方面都必须纳入你的计划中
3. **无例外情况**:若用户提及 "需要用 Python",你必须包含 Python 生成节点(PYTHON_GENERATE_NODE)相关步骤
4. **优先级覆盖**:用户反馈中的要求优先于任何默认的分析方法
# 核心任务流程
1. **解构需求**:深入分析用户问题,明确核心业务目标、所需指标(如转化率、GMV)、维度(如按地区、按渠道)和时间范围。
2. **Schema 验证 (至关重要)**:**必须首先分析 `AVAILABLE DATA CONTEXT` 中的 Schema**。确认你计划查询的字段(列名)在表中真实存在。整个计划必须完全基于此schema构建,严禁臆造字段。
3. **制定策略**:创建一个逻辑清晰的多步计划。标准策略通常包括:
* **数据提取 (SQL)**:编写针对性的指令来提取原始数据。如果逻辑复杂,请拆分为多个 SQL 步骤。
* **深度分析 (Python)**:对于 SQL 难以实现的复杂计算(如环比/同比计算、统计分析、预测、相关性分析)或图表绘制,必须使用 Python。
* **总结汇报 (Report)**:最后一步必须是生成报告,总结发现并给出建议。
4. **生成计划**:输出 JSON 对象。
# 可用工具 (Available Tools)
## 1. SQL_GENERATE_NODE
* **核心用途**:执行 SQL 查询以提取或聚合数据。这是分析的基础。
* **参数**:
* `instruction` (string): **[下游指令]** 将直接作为 SQL 生成专家的 Prompt。
* **严格约束**:
* **禁止**使用"SQL生成"、"查询数据"、"执行查询"等模糊词语。
* **必须**包含:目标表名(或业务实体名)、具体的聚合维度(如按月、按地区)、过滤条件(如2023年)。
* **正例**:"从 orders 表查询 2023 年北京地区的销售总额,并按月份分组"。
* **反例**:"SQL生成"(**绝对禁止**,这将导致任务失败)。
## 2. PYTHON_GENERATE_NODE
* **核心用途**:当 SQL 难以满足需求时使用。适用于:复杂逻辑计算、数据清洗、高级统计分析、图表绘制。
* **参数**:
* `instruction` (string): 给 Python 解释器的具体编程指令。
## 3. REPORT_GENERATOR_NODE
* **核心用途**:流程的最后一步。用于整合所有步骤的输出,回答用户最初的问题,并提供商业建议。
* **参数**:
* `summary_and_recommendations` (string): 报告的大纲、需要回答的关键问题和建议方向。
# 可用数据上下文 (Available Data Context)
基于用户问题,系统已召回以下数据库 Schema。**你的计划必须且只能基于这些表结构。**
```sql
{schema}【Reference information】
{evidence}
思考路径指南 (用于填充 thought_process 字段)
在生成 JSON 之前,请遵循以下步骤进行思考,并将思考的摘要写入 JSON 的 thought_process 字段中:
1、理解目标:用户的核心疑问是什么?(例如:"分析线索转化质量")。
2、核对 Schema:检查 {schema}。我有 region, channel 字段吗?我有代表转化节点的字段吗?如果没有相关字段,我不能制定查询该字段的计划。
3、拆解步骤:思考如何将大问题拆解为 SQL 查询和 Python 计算。例如:
3.1、需要按渠道看转化率? -> Step 1: SQL_GENERATE_NODE (描述:按渠道分组查询各阶段人数)。
3.2、需要按地区看转化率? -> Step 2: SQL_GENERATE_NODE (描述:按地区分组查询各阶段人数)。
3.3、需要计算复杂的比率或排名? -> Step 3: PYTHON_GENERATE_NODE (指令:读取前两步数据,计算转化率并排名)。
3.4 需要总结? -> Step 4: REPORT_GENERATOR_NODE。
4、撰写指令:确保 SQL_GENERATE_NODE 的 instruction 足够详细,让写 SQL 的同事(下游节点)一看就懂,不需要再问用户。
5、构建 JSON:组装最终结果。
输出格式 (必须是合法的 JSON)
请注意:tool_parameters 对象是动态的,请根据 tool_to_use 仅填充必要的字段,不要输出值为 null 的字段。
{format}
示例(EXAMPLE)
User Input: "分析极曜汽车近一年的购车线索转化质量,尤其是不同地区的线索质量情况"
Your Output:
{
"thought_process": "用户想要分析'极曜汽车'近一年的线索转化质量...",
"execution_plan": [
{
"step": 1,
"tool_to_use": "SQL_GENERATE_NODE",
"tool_parameters": {
"instruction": "按渠道来源分组,查询近一年的线索转化漏斗核心指标。"
}
},
{
"step": 2,
"tool_to_use": "SQL_GENERATE_NODE",
"tool_parameters": {
"instruction": "按地理区域(省份、城市)分组,查询近一年的线索转化漏斗核心指标。"
}
},
{
"step": 3,
"tool_to_use": "PYTHON_GENERATE_NODE",
"tool_parameters": {
"instruction": "基于步骤1(渠道数据)和步骤2(区域数据)的结果,进行深入分析..."
}
},
{
"step": 4,
"tool_to_use": "REPORT_GENERATOR_NODE",
"tool_parameters": {
"summary_and_recommendations": "综合以上分析结果,总结出高转化率渠道和区域的共同特征..."
}
}
]
}
User's Current Request
User Input: "{user_question}"
这个提示词模板非常详细,包含:
- **角色设定**:"高级数据分析专家"
- **重要原则**:只能输出合法JSON
- **用户反馈处理**:最高优先级,必须满足用户反馈
- **核心任务流程**:解构需求→Schema验证→制定策略→生成计划
- **可用工具说明**:每个工具的用途、参数、约束条件
- **思考路径指南**:教LLM如何一步步思考
- **输出格式要求**:JSON结构、字段说明
- **示例**:完整的输入输出示例
这就像给实习生一份极其详细的"作业指导书",说得越清楚,他做得越到位。
## 6.4 计划的执行与调度
### 6.4.1 执行器:PlanExecutorNode
制定计划只是第一步,更重要的是执行。`PlanExecutorNode`就是DataAgent的"执行经理",负责按计划推进任务。
Plan(计划)
│
▼
PlanExecutorNode(执行器)
│
├──► 验证计划合法性
│
├──► 获取当前步骤
│
├──► 路由到对应节点
│ ├──► SQL_GENERATE_NODE → 生成SQL → 执行SQL
│ ├──► PYTHON_GENERATE_NODE → 生成Python → 执行Python
│ └──► REPORT_GENERATOR_NODE → 生成报告
│
└──► 检查是否还有下一步
├──► 有 → 继续执行
└──► 无 → 结束
**PlanExecutorNode的实现**:
```java
// 文件路径:DataAgent-main/data-agent-management/.../workflow/node/PlanExecutorNode.java
@Slf4j
@Component
public class PlanExecutorNode implements NodeAction {
// 支持的工具节点类型
private static final Set<String> SUPPORTED_NODES = Set.of(
SQL_GENERATE_NODE,
PYTHON_GENERATE_NODE,
REPORT_GENERATOR_NODE
);
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 1. 验证计划结构
Plan plan;
try {
plan = PlanProcessUtil.getPlan(state);
} catch (Exception e) {
log.error("Plan validation failed due to a parsing error.", e);
return buildValidationResult(state, false,
"Validation failed: The plan is not a valid JSON structure.");
}
// 验证执行计划是否为空
if (!validateExecutionPlanStructure(plan)) {
return buildValidationResult(state, false,
"Validation failed: The generated plan is empty or has no execution steps.");
}
// 2. 验证每个执行步骤
for (ExecutionStep step : plan.getExecutionPlan()) {
String validationResult = validateExecutionStep(step);
if (validationResult != null) {
return buildValidationResult(state, false, validationResult);
}
}
log.info("Plan validation successful.");
// 3. 如果开启人工复核,则在执行前暂停,跳转到human_feedback节点
Boolean humanReviewEnabled = state.value(HUMAN_REVIEW_ENABLED, false);
if (Boolean.TRUE.equals(humanReviewEnabled)) {
log.info("Human review enabled: routing to human_feedback node");
return Map.of(PLAN_VALIDATION_STATUS, true, PLAN_NEXT_NODE, HUMAN_FEEDBACK_NODE);
}
// 4. 获取当前步骤编号
int currentStep = PlanProcessUtil.getCurrentStepNumber(state);
List<ExecutionStep> executionPlan = plan.getExecutionPlan();
// 5. 检查计划是否已完成
if (currentStep > executionPlan.size()) {
log.info("Plan completed, current step: {}, total steps: {}", currentStep, executionPlan.size());
return Map.of(
PLAN_CURRENT_STEP, 1,
PLAN_NEXT_NODE, isOnlyNl2Sql ? StateGraph.END : REPORT_GENERATOR_NODE,
PLAN_VALIDATION_STATUS, true
);
}
// 6. 获取当前步骤,确定下一个执行节点
ExecutionStep executionStep = executionPlan.get(currentStep - 1);
String toolToUse = executionStep.getToolToUse();
return determineNextNode(toolToUse);
}
/**
* 确定下一个执行节点
*/
private Map<String, Object> determineNextNode(String toolToUse) {
if (SUPPORTED_NODES.contains(toolToUse)) {
log.info("Determined next execution node: {}", toolToUse);
return Map.of(PLAN_NEXT_NODE, toolToUse, PLAN_VALIDATION_STATUS, true);
} else if (HUMAN_FEEDBACK_NODE.equals(toolToUse)) {
return Map.of(PLAN_NEXT_NODE, toolToUse, PLAN_VALIDATION_STATUS, true);
} else {
return Map.of(PLAN_VALIDATION_STATUS, false,
PLAN_VALIDATION_ERROR, "Unsupported node type: " + toolToUse);
}
}
/**
* 验证单个执行步骤
*/
private String validateExecutionStep(ExecutionStep step) {
// 验证工具名称是否合法
if (step.getToolToUse() == null || !SUPPORTED_NODES.contains(step.getToolToUse())) {
return "Validation failed: Plan contains an invalid tool name: '"
+ step.getToolToUse() + "' in step " + step.getStep();
}
// 验证工具参数是否存在
if (step.getToolParameters() == null) {
return "Validation failed: Tool parameters are missing for step " + step.getStep();
}
// 根据节点类型验证特定参数
switch (step.getToolToUse()) {
case SQL_GENERATE_NODE:
if (!StringUtils.hasText(step.getToolParameters().getInstruction())) {
return "Validation failed: SQL generation node is missing description in step " + step.getStep();
}
break;
case PYTHON_GENERATE_NODE:
if (!StringUtils.hasText(step.getToolParameters().getInstruction())) {
return "Validation failed: Python generation node is missing instruction in step " + step.getStep();
}
break;
case REPORT_GENERATOR_NODE:
if (!StringUtils.hasText(step.getToolParameters().getSummaryAndRecommendations())) {
return "Validation failed: Report generation node is missing summary_and_recommendations in step " + step.getStep();
}
break;
}
return null; // 验证通过
}
}执行器的逻辑很清晰:
- 1. 验证计划:确保计划不是空的,JSON格式正确
- 2. 验证步骤:逐个检查每个步骤的工具名称、参数完整性
- 3. 人工审核:如果开启了人工复核,暂停执行等待用户确认
- 4. 获取步骤:按序号取出当前要执行的步骤
- 5. 路由跳转:返回下一个节点的名称,StateGraph负责调度
6.4.2 调度器:PlanExecutorDispatcher
PlanExecutorDispatcher是"交通警察",负责根据执行结果决定下一步去哪。
// 文件路径:DataAgent-main/data-agent-management/.../workflow/dispatcher/PlanExecutorDispatcher.java
@Slf4j
public class PlanExecutorDispatcher implements EdgeAction {
private static final int MAX_REPAIR_ATTEMPTS = 2;
@Override
public String apply(OverAllState state) {
// 1. 检查计划验证是否通过
boolean validationPassed = StateUtil.getObjectValue(state, PLAN_VALIDATION_STATUS, Boolean.class, false);
if (validationPassed) {
log.info("Plan validation passed. Proceeding to next step.");
String nextNode = state.value(PLAN_NEXT_NODE, END);
if ("END".equals(nextNode)) {
log.info("Plan execution completed successfully.");
return END;
}
return nextNode;
} else {
// 2. 计划验证失败,检查修复次数
int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
if (repairCount > MAX_REPAIR_ATTEMPTS) {
log.error("Plan repair attempts exceeded the limit of {}. Terminating execution.", MAX_REPAIR_ATTEMPTS);
return END;
}
log.warn("Plan validation failed. Routing back to PlannerNode for repair. Attempt count: {}.", repairCount);
return PLANNER_NODE;
}
}
}这个调度器很聪明:
- 1. 验证通过:继续执行下一步(SQL生成、Python生成或报告生成)
- 2. 验证失败:检查已修复次数
- • 如果未超过最大次数(2次),回到
PlannerNode重新规划 - • 如果超过最大次数,结束执行,避免无限循环
6.5 计划的动态调整:执行失败时重新规划
6.5.1 现实世界的意外
再好的计划也可能遇到意外:
- • SQL执行失败:表不存在、权限不足、语法错误
- • 数据不符合预期:查询结果为空、格式不对
- • LLM生成错误:生成的SQL有逻辑bug
- • 用户反馈:用户说"不对,我要的不是这个"
没有容错机制,Agent就会"一崩到底"。
6.5.2 DataAgent的修复机制
DataAgent通过PlanExecutorDispatcher实现计划修复:
用户:分析一下销售情况
│
▼
Planner:制定4步计划
│
▼
PlanExecutorNode:验证计划 → 通过
│
▼
执行步骤1(查数据)→ 成功
│
▼
执行步骤2(做分析)→ 失败:数据格式不对
│
▼
PlanExecutorDispatcher:验证失败,repairCount=0 < 2
│
▼
回到 PlannerNode,传入错误信息
│
▼
Planner:调整计划,增加数据清洗步骤
│
▼
新计划:查数据 → 清洗数据 → 做分析 → 生成报告
│
▼
继续执行...修复模式的PlannerNode处理:
// PlannerNode.java 中的修复逻辑
private String buildUserPrompt(String input, String validationError, OverAllState state) {
if (validationError == null) {
return input; // 正常模式,直接返回用户输入
}
// 修复模式:构建包含错误信息的提示词
String previousPlan = StateUtil.getStringValue(state, PLANNER_NODE_OUTPUT, "");
return String.format(
"IMPORTANT: User rejected previous plan with feedback: \"%s\"\n\n"
+ "Original question: %s\n\n"
+ "Previous rejected plan:\n%s\n\n"
+ "CRITICAL: Generate new plan incorporating user feedback (\"%s\")",
validationError, input, previousPlan, validationError
);
}这就是"试错学习"——Agent从错误中调整策略,最终完成任务。
6.5.3 修复的实战场景
假设用户问:"查一下去年的销售额"
第一次规划:
步骤1:SQL_GENERATE_NODE → 生成SQL查询去年销售额
执行失败:
错误:表
sales不存在
第一次修复(回到PlannerNode):
Planner看到错误信息,重新规划:
步骤1:SQL_GENERATE_NODE → 查询所有表名,找到销售相关表
步骤2:SQL_GENERATE_NODE → 根据找到的表,查询去年销售额
第二次执行:
步骤1成功:找到表名为
orders
步骤2成功:查询到去年销售额为150万
6.6 从规划到执行:DataAgent的完整计划生命周期
让我们把整个过程串起来,看看DataAgent的完整计划生命周期:
用户提问:"分析近三个月各产品线的销售趋势"
│
▼
[PlannerNode] 制定计划
│
├── 分析Schema:确认有sales表、product表、date字段
├── 拆解任务:查数据 → 画图 → 写报告
└── 生成Plan(JSON格式)
│
▼
[PlanExecutorNode] 验证计划
│
├── 检查JSON格式是否正确
├── 检查每一步的工具名称是否合法
├── 检查每一步的参数是否完整
└── 验证通过 ✓
│
▼
[PlanExecutorDispatcher] 路由到第一步
│
▼
[SQL_GENERATE_NODE] 生成SQL
│
├── "查询近三个月各产品线销售额,按月份和产品线分组"
└── 生成SQL:SELECT ...
│
▼
[SQL_EXECUTE_NODE] 执行SQL
│
├── 连接数据库,执行查询
└── 返回结果数据
│
▼
[PlanExecutorDispatcher] 检查是否还有下一步
│
├── 有下一步 → 继续
│
▼
[PYTHON_GENERATE_NODE] 生成Python代码
│
├── "根据SQL结果,绘制各产品线近三个月销售趋势折线图"
└── 生成Python代码
│
▼
[PYTHON_EXECUTE_NODE] 执行Python
│
├── 在Docker沙箱中运行代码
└── 返回图表数据
│
▼
[PlanExecutorDispatcher] 检查是否还有下一步
│
├── 有下一步 → 继续
│
▼
[REPORT_GENERATOR_NODE] 生成报告
│
├── 整合所有结果(数据 + 图表)
└── 生成结构化分析报告
│
▼
[PlanExecutorDispatcher] 计划执行完毕
│
└── 返回 END
│
▼
用户收到:数据表格 + 趋势图 + 分析结论这个流程体现了DataAgent规划系统的几个核心设计:
- 1. 计划先行:PlannerNode先制定完整计划,而不是边做边想
- 2. 验证严格:PlanExecutorNode每一步都验证合法性
- 3. 调度灵活:PlanExecutorDispatcher根据状态决定下一步去哪
- 4. 容错修复:失败时自动回到PlannerNode重新规划(最多2次)
- 5. 人工介入:支持人工审核模式,重要操作需用户确认
6.7 本章小结
本章我们探讨了Agent的规划能力,这是让Agent从"被动应答"升级为"主动解决"的关键。
核心概念回顾:
- 1. 规划的必要性:复杂任务必须拆解为步骤,否则Agent会混乱、遗漏、重复。
- 2. 三种规划模式:
- • Chain-of-Thought:让Agent"逐步思考",把黑盒推理变成白盒推理
- • ReAct:边想边做,适合开放性探索任务
- • Plan-and-Execute:先规划再执行,适合企业级数据分析(DataAgent采用)
- 3. DataAgent的规划体系:
- •
PlannerNode:项目经理,基于Schema和证据制定计划 - •
Plan+ExecutionStep:结构化的任务清单(thoughtProcess + executionPlan) - •
PlanExecutorNode:执行经理,验证计划并路由到对应节点 - •
PlanExecutorDispatcher:交通警察,根据验证结果决定下一步 - 4. 动态修复:执行失败时自动重新规划(最多2次),避免一崩到底。
- 5. 提示词工程:Planner的提示词模板非常详细,包含角色设定、工具说明、约束条件、示例等,确保LLM生成高质量计划。
DataAgent的实践亮点:
- •
BeanOutputConverter强制LLM输出结构化JSON计划 - •
ExecutionStep的原子化设计,每步明确工具+参数 - • 状态机驱动,通过
OverAllState在节点间传递计划和进度 - • 最大2次的修复机制,平衡容错性和效率
- • 支持人工审核,重要操作需用户确认
思考题
- 1. 场景设计:假设用户问"对比一下我们和竞争对手的优劣势"。你会如何设计规划流程?需要哪些步骤?可能遇到什么意外?DataAgent的PlannerNode会如何处理?
- 2. 模式选择:DataAgent使用"先规划后执行"模式,ReAct使用"边想边做"模式。如果你要开发一个"旅游规划Agent",帮用户制定旅行计划,你会选择哪种模式?为什么?
- 3. 容错设计:DataAgent的修复次数限制为2次。如果增加到10次,会带来什么问题?如果不限制,又会有什么风险?你会如何设计一个"智能"的修复策略?
第七章:多Agent协作——从单打独斗到团队作战
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent一个厨房里的协作艺术
想象你走进一家高档餐厅的后厨:
主厨站在中央,审视着今天的订单:"三号桌要一份招牌牛排、一份海鲜意面。"
切配师傅立刻开始处理蔬菜和肉类,把切好的食材按顺序摆在盘子里。
炒锅师傅接过食材,点火、倒油、翻炒,精准控制着火候和时间。
甜点师 meanwhile 正在准备提拉米苏,完全不需要别人催促。
服务员在门口等候,菜一出锅立刻端走,还不忘提醒:"三号桌对坚果过敏,甜点不要放杏仁片。"
这五个人的团队配合默契、各司其职,最终让客人享用到了完美的晚餐。
如果只有一个人呢?主厨既要切菜又要炒菜还要做甜点,等他把牛排煎好,意面早就凉了。单打独斗的效率,永远比不上团队协作。
在AI的世界里,这个道理同样适用。
7.1 从单Agent到多Agent系统
7.1.1 单Agent的局限
回顾前面几章,我们讲的DataAgent本质上是一个单Agent系统——一个"超级助手"独立完成从意图识别到报告生成的全部工作。
单Agent的优点是简单、集中、好管理。但它也有明显的天花板:
| 局限 | 说明 |
|---|---|
| 任务复杂度过高 | 当一个问题涉及多个专业领域(数据分析+法律审查+市场调研),单个Agent难以同时精通所有领域 |
| 并行效率低 | 单Agent只能一步一步执行,无法像人类团队那样并行推进 |
| 容易"钻牛角尖" | 单个Agent可能陷入某种思维定式,缺乏外部视角的纠偏 |
| 责任边界模糊 | 如果出错了,很难定位是哪个环节的问题 |
7.1.2 什么是多Agent系统
多Agent系统(Multi-Agent System,MAS) 是由多个Agent组成的协作网络。每个Agent有自己的专长、角色和职责,它们通过通信和协调来共同完成一个宏大目标。
用餐厅的例子来类比:
| 餐厅角色 | Agent角色 | 职责 |
|---|---|---|
| 主厨 | 协调者Agent(Orchestrator) | 分配任务、把控整体进度 |
| 切配师傅 | 数据准备Agent | 清洗数据、提取特征 |
| 炒锅师傅 | 分析Agent | 执行核心算法、生成洞察 |
| 甜点师 | 可视化Agent | 制作图表、设计报告样式 |
| 服务员 | 交互Agent | 与用户沟通、收集反馈 |
一句话总结:单Agent是"全能型个人选手",多Agent是"专业化团队作战"。
7.2 多Agent协作的三种经典模式
学术界和工业界总结出了三种最常见的多Agent协作模式:
7.2.1 分工协作模式(Division of Labor)
每个Agent负责流水线上的一个环节,像工厂流水线一样接力完成任务。
用户提问
↓
[意图识别Agent] → 明确需求
↓
[数据检索Agent] → 查找相关数据
↓
[分析Agent] → 执行计算和分析
↓
[报告Agent] → 生成可读报告
↓
返回给用户DataAgent的启示:虽然DataAgent运行在单个进程中,但它的16个节点本质上就是在模拟分工协作——IntentRecognitionNode负责"理解需求",SqlGenerateNode负责"查数据",PythonAnalyzeNode负责"做分析",ReportGeneratorNode负责"写报告"。每个节点只专注于自己的任务,通过OverAllState传递"接力棒"。
7.2.2 协商讨论模式(Discussion & Debate)
多个Agent围绕同一个问题各抒己见,通过讨论达成共识或最优解。
想象一个投资委员会:
- • 基本面分析Agent:"这家公司财务很健康,建议买入。"
- • 技术分析Agent:"但股价已经突破阻力位,短期有回调风险。"
- • 风险控制Agent:"行业政策最近有变化,建议观望。"
- • 协调者Agent:"综合三方意见,建议分批建仓,先买入30%仓位。"
这种模式特别适合需要多维度权衡的决策场景。
7.2.3 层级汇报模式(Hierarchical)
Agent之间形成上下级关系,上级Agent分配任务,下级Agent执行并汇报。
[总指挥官Agent]
|
+----------------+----------------+
| | |
[数据分析小队] [市场调研小队] [文案撰写小队]
| | | | | | |
[SQL] [Python] [图表] [爬虫] [访谈] [标题] [正文]优点:结构清晰,便于管理和追责。哪个环节出错,直接找对应的Agent。
缺点:层级过多会导致信息传递损耗,就像"传话游戏"一样,最下层的Agent可能误解了最初的意图。
7.3 DataAgent中的"协作思想"
虽然DataAgent不是严格意义上的多Agent系统(它运行在一个Java进程里),但它的设计处处体现了多Agent协作的思想。
7.3.1 节点即"微Agent"
DataAgent的每个节点都可以看作一个微型的专用Agent:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PlannerNode.java
public class PlannerNode implements NodeAction {
// 这个"Agent"只负责一件事:制定计划
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 读取当前状态(接收其他"Agent"传来的信息)
String canonicalQuery = StateUtil.getCanonicalQuery(state);
SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
// 专注完成自己的任务
String plannerPrompt = PromptConstant.getPlannerPromptTemplate().render(params);
return llmService.callUser(plannerPrompt);
}
}// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SqlGenerateNode.java
public class SqlGenerateNode implements NodeAction {
// 这个"Agent"只负责一件事:写SQL
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 接收PlannerNode传来的执行计划
String instruction = getCurrentExecutionStepInstruction(state);
// 专注完成SQL生成
// ...
}
}PlannerNode和SqlGenerateNode互不干涉:
- • PlannerNode不需要知道SQL怎么写
- • SqlGenerateNode不需要知道计划是怎么制定的
- • 它们只通过
OverAllState交换信息
这不正是分工协作的精髓吗?
7.3.2 Dispatcher:协作的"调度员"
在DataAgent中,**Dispatcher(调度器)**扮演了"协调者"的角色,决定下一步该哪个"Agent"干活:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/PlanExecutorDispatcher.java
@Slf4j
public class PlanExecutorDispatcher implements EdgeAction {
private static final int MAX_REPAIR_ATTEMPTS = 2;
@Override
public String apply(OverAllState state) {
boolean validationPassed = StateUtil.getObjectValue(state, PLAN_VALIDATION_STATUS, Boolean.class, false);
if (validationPassed) {
// 验证通过,安排下一个执行节点去干活
String nextNode = state.value(PLAN_NEXT_NODE, END);
return nextNode;
}
else {
// 验证失败,安排PlannerNode返工(最多2次)
int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
if (repairCount > MAX_REPAIR_ATTEMPTS) {
log.error("修复次数超过限制,终止执行");
return END; // 让流程结束
}
return PLANNER_NODE; // 安排PlannerNode重新制定计划
}
}
}这个Dispatcher就像餐厅里的主厨:
- • 切配完成了?→ 安排炒锅师傅上
- • 炒锅出问题了?→ 让切配重新准备(最多重试2次)
- • 所有人都干完了?→ 宣布收工(END)
7.3.3 State:协作的"共享白板"
多Agent协作最大的挑战是信息共享。DataAgent用OverAllState解决了这个问题。
想象一个团队会议室里的共享白板:
- • PlannerNode在白板上写下"第一步:查询sales表"
- • SqlGenerateNode看到白板,执行后把SQL结果写上去
- • PythonAnalyzeNode看到SQL结果,把分析结论写上去
- • 所有Agent都能看到白板上的最新内容
// StateGraph中的每个节点都可以读写State
public Map<String, Object> apply(OverAllState state) {
// 读取其他节点留下的信息
String previousResult = StateUtil.getStringValue(state, SQL_EXECUTE_NODE_OUTPUT);
// 处理完成后,把自己的结果写回State
return Map.of(PYTHON_ANALYSIS_NODE_OUTPUT, analysisResult);
}7.4 什么时候需要真正的多Agent系统
DataAgent用"单进程多节点"的方式模拟了多Agent协作,那什么时候需要真正独立运行的多个Agent呢?
场景1:跨系统协作
当不同的Agent运行在不同的服务器上,甚至由不同的团队维护时:
- • 客服Agent:处理用户投诉(部署在客服部门)
- • 库存Agent:查询仓库库存(部署在供应链部门)
- • 物流Agent:跟踪快递状态(部署在物流公司)
三个Agent属于不同的组织,必须通过API或消息队列通信。
场景2:安全隔离
当某些任务必须在隔离环境中执行时:
- • 主Agent:接收用户请求、制定计划
- • 沙箱Agent:在Docker容器中执行不可信的Python代码
- • 审计Agent:监控所有操作是否符合合规要求
如果都放在一个进程里,恶意代码可能危及整个系统。
场景3:负载均衡
当任务量巨大时,多个相同的Agent并行工作:
- • 1000份报表需要生成
- • 启动10个"报表Agent"同时处理
- • 协调者Agent分配任务、汇总结果
7.5 多Agent协作的设计原则
如果你要设计一个多Agent系统,记住以下原则:
原则1:职责单一
每个Agent只做一件事,并做好它。就像DataAgent的节点一样,PlannerNode不碰SQL,SqlGenerateNode不碰Python。
原则2:接口明确
Agent之间通过明确的接口通信,而不是直接访问对方的内部状态。
// 好的设计:通过State交换信息
Map<String, Object> result = Map.of(SQL_GENERATE_OUTPUT, sql);
// 坏的设计:直接调用另一个Agent的内部方法
sqlGenerateNode.internalQueryParser(); // 不要这样做!原则3:容错设计
一个Agent失败了,不应该拖垮整个系统。DataAgent的Dispatcher会在验证失败时安排重试,而不是直接崩溃。
原则4:人类可介入
在多Agent协作的的关键节点,应该预留"人类接管"的接口。DataAgent的HumanFeedbackNode就是一个很好的例子(我们将在第九章详细讲解)。
本章小结
- 1. 单Agent的局限:面对复杂、跨领域、高并发的任务时,单打独斗效率低下。
- 2. 多Agent系统的三种协作模式:
- • 分工协作:流水线接力,各专其职
- • 协商讨论:多维度辩论,综合决策
- • 层级汇报:树形结构,上级指挥下级
- 3. DataAgent的协作思想:虽然DataAgent是单进程系统,但它的节点设计(PlannerNode、SqlGenerateNode等)和Dispatcher调度机制,体现了多Agent协作的核心思想。
- 4. 共享状态(State)是多Agent协作的关键:
OverAllState就像团队白板,让所有参与者看到最新进展。 - 5. 真正多Agent系统的适用场景:跨组织协作、安全隔离、负载均衡。
思考题
- 1. 场景设计:假设你要设计一个"智能旅行社"多Agent系统,需要完成"根据用户预算和喜好制定旅行计划+预订机票酒店+生成行程攻略"的任务。请设计3个专门化的Agent,说明每个Agent的职责和它们之间的协作流程。
- 2. 模式辨析:DataAgent的16个节点更像是"分工协作模式"还是"层级汇报模式"?为什么?PlannerNode和PlanExecutorNode之间是什么关系?
- 3. 扩展思考:如果把DataAgent改造为真正的多Agent系统(每个节点是一个独立的微服务),会带来什么好处和风险?从通信延迟、容错性、部署复杂度三个角度分析。
第八章:工作流编排——Agent的"神经系统"
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent医院看病的工作流
想象你去医院看病的全过程:
- 1. 挂号处:出示身份证,拿到挂号单
- 2. 门诊室:医生问诊、开检查单
- 3. 收费处:缴费
- 4. 检验科:抽血、拍片
- 5. 回门诊室:医生查看检验结果
- 6. 药房:凭处方取药
- 7. 离院
这个流程有几个特点:
- • 有明确的顺序:不能先取药再看医生
- • 有分支判断:如果检查结果正常,直接取药;如果异常,可能需要住院
- • 有循环回退:检验结果不清楚,医生可能要求重新检查
- • 有并行可能:抽血和拍片可以在不同楼层同时进行
这就是工作流(Workflow)——一组按照特定规则编排的活动,共同完成一个业务目标。
Agent要完成复杂任务,同样需要一个精心设计的"工作流"来协调各个环节。本章将学习DataAgent如何用StateGraph来编排它的16+个节点。
8.1 什么是工作流编排
8.1.1 从剧本到即兴表演
在第六章中,我们把Agent比作"即兴演员"——没有固定剧本,根据现场情况灵活发挥。
但即使是即兴演员,也需要一个舞台框架:
- • 幕什么时候拉开
- • 灯光怎么打
- • 场景怎么切换
- • 什么时候谢幕
工作流编排就是这个"舞台框架"。它不负责写台词(那是LLM的工作),但负责:
- • 哪个节点先上场
- • 哪个节点后上场
- • 什么条件下切换场景
- • 出错时怎么处理
8.1.2 工作流的核心要素
任何工作流系统都离不开四个核心要素:
| 要素 | 类比 | 说明 |
|---|---|---|
| 节点(Node) | 演员 | 执行具体任务的单元 |
| 边(Edge) | 舞台通道 | 连接节点的路径 |
| 状态(State) | 共享道具箱 | 节点之间传递数据的载体 |
| 条件(Condition) | 导演指令 | 决定走哪条边的规则 |
8.2 StateGraph:DataAgent的工作流引擎
DataAgent使用 Spring AI Alibaba Graph 框架中的 StateGraph 来编排工作流。
8.2.1 什么是StateGraph
StateGraph(状态图) 是一种有向图结构:
- • 节点(Node) = 处理步骤(如意图识别、SQL生成)
- • 边(Edge) = 节点之间的连接
- • 状态(State) = 流经整个图的数据对象
START
|
v
[IntentRecognition]
|
+-----+-----+
| |
v v
[EvidenceRecall] END
|
v
[QueryEnhance]
|
...数据(State)像流水一样在图中流动,每经过一个节点,就会被加工一次,直到到达终点(END)。
8.2.2 DataAgent的工作流全景图
DataAgent的工作流包含16+个节点,是一个相当复杂的图。让我们看看它的"蓝图":
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/config/DataAgentConfiguration.java
@Bean
public StateGraph nl2sqlGraph(NodeBeanUtil nodeBeanUtil, CodeExecutorProperties codeExecutorProperties)
throws GraphStateException {
StateGraph stateGraph = new StateGraph(NL2SQL_GRAPH_NAME, keyStrategyFactory)
// 添加所有节点
.addNode(INTENT_RECOGNITION_NODE, nodeBeanUtil.getNodeBeanAsync(IntentRecognitionNode.class))
.addNode(EVIDENCE_RECALL_NODE, nodeBeanUtil.getNodeBeanAsync(EvidenceRecallNode.class))
.addNode(QUERY_ENHANCE_NODE, nodeBeanUtil.getNodeBeanAsync(QueryEnhanceNode.class))
.addNode(SCHEMA_RECALL_NODE, nodeBeanUtil.getNodeBeanAsync(SchemaRecallNode.class))
.addNode(TABLE_RELATION_NODE, nodeBeanUtil.getNodeBeanAsync(TableRelationNode.class))
.addNode(FEASIBILITY_ASSESSMENT_NODE, nodeBeanUtil.getNodeBeanAsync(FeasibilityAssessmentNode.class))
.addNode(SQL_GENERATE_NODE, nodeBeanUtil.getNodeBeanAsync(SqlGenerateNode.class))
.addNode(PLANNER_NODE, nodeBeanUtil.getNodeBeanAsync(PlannerNode.class))
.addNode(PLAN_EXECUTOR_NODE, nodeBeanUtil.getNodeBeanAsync(PlanExecutorNode.class))
.addNode(SQL_EXECUTE_NODE, nodeBeanUtil.getNodeBeanAsync(SqlExecuteNode.class))
.addNode(PYTHON_GENERATE_NODE, nodeBeanUtil.getNodeBeanAsync(PythonGenerateNode.class))
.addNode(PYTHON_EXECUTE_NODE, nodeBeanUtil.getNodeBeanAsync(PythonExecuteNode.class))
.addNode(PYTHON_ANALYZE_NODE, nodeBeanUtil.getNodeBeanAsync(PythonAnalyzeNode.class))
.addNode(REPORT_GENERATOR_NODE, nodeBeanUtil.getNodeBeanAsync(ReportGeneratorNode.class))
.addNode(SEMANTIC_CONSISTENCY_NODE, nodeBeanUtil.getNodeBeanAsync(SemanticConsistencyNode.class))
.addNode(HUMAN_FEEDBACK_NODE, nodeBeanUtil.getNodeBeanAsync(HumanFeedbackNode.class));
// ... 连接边(见下文)
}这段代码就像剧组的演员表——把16个"演员"(节点)全部登记在册,但还没有告诉他们上场顺序。
8.2.3 连接节点:普通边与条件边
节点注册完后,下一步是连接它们。
普通边(addEdge):一对一的固定连接
// 从START开始,先到意图识别节点
stateGraph.addEdge(START, INTENT_RECOGNITION_NODE)
// 证据召回完成后,必然进入查询增强
.addEdge(EVIDENCE_RECALL_NODE, QUERY_ENHANCE_NODE)
// Python代码生成后,必然进入Python执行
.addEdge(PYTHON_GENERATE_NODE, PYTHON_EXECUTE_NODE)
// 报告生成后,流程结束
.addEdge(REPORT_GENERATOR_NODE, END)
// 计划生成后,进入计划执行器
.addEdge(PLANNER_NODE, PLAN_EXECUTOR_NODE)
// Python分析完成后,回到计划执行器继续下一步
.addEdge(PYTHON_ANALYZE_NODE, PLAN_EXECUTOR_NODE);普通边就像工厂流水线的传送带:A做完后必然传给B,没有任何选择余地。
条件边(addConditionalEdges):根据运行时的状态动态决定走向
// 意图识别后,可能去证据召回,也可能直接结束(如果是闲聊)
stateGraph.addConditionalEdges(INTENT_RECOGNITION_NODE,
edge_async(new IntentRecognitionDispatcher()),
Map.of(EVIDENCE_RECALL_NODE, EVIDENCE_RECALL_NODE, END, END))
// Schema召回后,可能去表关系分析,也可能结束
.addConditionalEdges(SCHEMA_RECALL_NODE, edge_async(new SchemaRecallDispatcher()),
Map.of(TABLE_RELATION_NODE, TABLE_RELATION_NODE, END, END))
// 表关系分析后,可能成功进入可行性评估,也可能失败重试,还可能结束
.addConditionalEdges(TABLE_RELATION_NODE, edge_async(new TableRelationDispatcher()),
Map.of(FEASIBILITY_ASSESSMENT_NODE, FEASIBILITY_ASSESSMENT_NODE,
END, END,
TABLE_RELATION_NODE, TABLE_RELATION_NODE)); // retry条件边就像铁路道岔:列车开到岔路口,根据信号(运行时状态)决定走哪条轨道。
8.2.4 条件边的"大脑":Dispatcher
条件边之所以能"做选择",是因为有一个**Dispatcher(调度器)**在幕后决策。
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/IntentRecognitionDispatcher.java
public class IntentRecognitionDispatcher implements EdgeAction {
@Override
public String apply(OverAllState state) throws Exception {
IntentRecognitionOutputDTO intent = StateUtil.getObjectValue(state,
INTENT_RECOGNITION_NODE_OUTPUT, IntentRecognitionOutputDTO.class);
// 如果用户意图是"数据分析",走分析流程
if (intent != null && intent.isDataAnalysis()) {
return EVIDENCE_RECALL_NODE;
}
// 如果是闲聊,直接结束
return END;
}
}这个Dispatcher就像一个智能道岔工:
- • 它查看当前状态(State)中的意图识别结果
- • 如果是数据分析请求 → 扳向"证据召回"轨道
- • 如果是闲聊 → 扳向"结束"轨道
8.3 状态管理:工作流的"血液"
如果说节点是工作流的"器官",边是"血管",那么**State(状态)**就是流动的"血液"——携带养分(数据)输送到各个器官。
8.3.1 OverAllState:全局状态容器
DataAgent的所有节点共享一个OverAllState对象。每个节点可以读取自己需要的数据,也可以写入自己的输出结果。
public Map<String, Object> apply(OverAllState state) throws Exception {
// 读取:从"血液"中获取养分
String userInput = StateUtil.getStringValue(state, INPUT_KEY);
// 处理...
// 写入:把代谢产物排回"血液"
return Map.of(INTENT_RECOGNITION_NODE_OUTPUT, result);
}8.3.2 KeyStrategy:状态合并的策略
多个节点可能对同一个key写入数据,怎么合并?DataAgent通过KeyStrategy来定义合并规则:
KeyStrategyFactory keyStrategyFactory = () -> {
HashMap<String, KeyStrategy> keyStrategyHashMap = new HashMap<>();
// 用户输入:新值直接替换旧值
keyStrategyHashMap.put(INPUT_KEY, KeyStrategy.REPLACE);
// 意图识别结果:新值直接替换旧值
keyStrategyHashMap.put(INTENT_RECOGNITION_NODE_OUTPUT, KeyStrategy.REPLACE);
// 计划当前步骤:新值直接替换旧值
keyStrategyHashMap.put(PLAN_CURRENT_STEP, KeyStrategy.REPLACE);
// SQL执行结果:新值直接替换旧值
keyStrategyHashMap.put(SQL_EXECUTE_NODE_OUTPUT, KeyStrategy.REPLACE);
return keyStrategyHashMap;
};目前DataAgent主要使用REPLACE策略——新值覆盖旧值。这符合数据分析场景的特点:每个节点的输出都是最新结论,不需要保留历史版本。
类比:就像白板上的内容,写新的就把旧的擦掉,始终只保留最新信息。
8.4 工作流的执行:编译与运行
StateGraph定义好了"蓝图",但它还不能直接运行。需要编译成可执行的图。
8.4.1 编译图(Compile)
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java
public GraphServiceImpl(StateGraph stateGraph, ExecutorService executorService,
MultiTurnContextManager multiTurnContextManager, LangfuseService langfuseReporter)
throws GraphStateException {
// 编译图,并在人工反馈节点前设置"中断点"
this.compiledGraph = stateGraph.compile(
CompileConfig.builder()
.interruptBefore(HUMAN_FEEDBACK_NODE)
.build()
);
// ...
}编译会做两件事:
- 1. 校验图的合法性:检查是否有孤立的节点、是否有循环依赖
- 2. 生成执行计划:把图结构转换成可执行的运行时结构
interruptBefore(HUMAN_FEEDBACK_NODE)是一个关键配置——它告诉系统:"运行到人工反馈节点之前,先暂停下来,等待外部信号。"(我们将在第九章详细讲解这个机制。)
8.4.2 流式执行(Stream)
DataAgent采用流式执行模式,用户可以实时看到每个节点的输出:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java
private void handleNewProcess(GraphRequest graphRequest) {
// 构建初始状态
Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(
Map.of(IS_ONLY_NL2SQL, nl2sqlOnly,
INPUT_KEY, query,
AGENT_ID, agentId,
HUMAN_REVIEW_ENABLED, humanReviewEnabled),
RunnableConfig.builder().threadId(threadId).build()
);
// 订阅流,处理每个节点的输出
nodeOutputFlux.subscribe(
output -> handleNodeOutput(graphRequest, output), // 正常输出
error -> handleStreamError(agentId, threadId, error), // 错误处理
() -> handleStreamComplete(agentId, threadId) // 完成处理
);
}流式执行的好处:
- • 即时反馈:用户不需要等整个流程跑完,每经过一个节点就能看到输出
- • 可中断:用户随时可以喊"停"
- • 资源友好:不需要等所有节点都完成才返回结果
8.5 工作流的容错设计
一个健壮的工作流必须能处理"意外情况"。DataAgent在工作流层面设计了多处容错机制:
8.5.1 循环重试
// 表关系分析失败后,可以回到自身重新执行
.addConditionalEdges(TABLE_RELATION_NODE, edge_async(new TableRelationDispatcher()),
Map.of(FEASIBILITY_ASSESSMENT_NODE, FEASIBILITY_ASSESSMENT_NODE,
END, END,
TABLE_RELATION_NODE, TABLE_RELATION_NODE)) // 失败后重试这就像工厂流水线上设置了"返工区"——质检不合格的产品会被送回前面重新加工。
8.5.2 最大重试次数
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/PlanExecutorDispatcher.java
private static final int MAX_REPAIR_ATTEMPTS = 2;
@Override
public String apply(OverAllState state) {
int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
if (repairCount > MAX_REPAIR_ATTEMPTS) {
log.error("修复次数超过限制,终止执行");
return END; // 不再重试,直接结束
}
return PLANNER_NODE; // 安排重试
}无限重试可能导致系统卡死。设置上限就像给电梯装"防坠落安全钳"——最多下坠一段距离就会被强制制动。
8.5.3 错误传播与终止
// 如果任何节点抛出异常,流式订阅的错误回调会处理
nodeOutputFlux.subscribe(
output -> handleNodeOutput(graphRequest, output),
error -> handleStreamError(agentId, threadId, error), // 捕获异常
() -> handleStreamComplete(agentId, threadId)
);
private void handleStreamError(String agentId, String threadId, Throwable error) {
log.error("流处理出错,threadId: {}", threadId, error);
// 清理资源、通知用户、结束流程
stopStreamProcessing(threadId);
}本章小结
- 1. 工作流编排是Agent的"舞台框架",负责协调各个节点何时上场、如何衔接。
- 2. StateGraph的四大要素:
- • 节点(Node):执行任务的单元
- • 边(Edge):连接节点的路径
- • 状态(State):节点间传递数据的载体
- • 条件(Condition):动态决定走向的规则
- 3. DataAgent的工作流:16+个节点通过普通边和条件边连接成一张有向图,由Dispatcher根据运行时状态动态调度。
- 4. 状态管理:
OverAllState是全局共享状态容器,KeyStrategy.REPLACE定义了数据合并规则。 - 5. 编译与执行:StateGraph需要先编译成
CompiledGraph,支持流式执行和断点暂停。 - 6. 容错设计:循环重试、最大重试次数限制、错误传播与优雅终止。
思考题
- 1. 流程设计题:假设你要设计一个"智能招聘Agent"的工作流,包含"简历解析→技能匹配→HR初筛→部门面试→发放Offer"等环节。请画出StateGraph的节点和边,并标注哪些用普通边、哪些用条件边。
- 2. 分析题:DataAgent中
PYTHON_ANALYZE_NODE执行完成后,为什么会回到PLAN_EXECUTOR_NODE而不是直接到REPORT_GENERATOR_NODE?这个设计有什么好处? - 3. 扩展思考:如果DataAgent的工作流中某个节点(比如
SQL_EXECUTE_NODE)执行时间特别长(比如查询了上亿行数据),会阻塞整个工作流吗?为什么?(提示:思考dbOperationExecutor线程池的作用)
第九章:人在回路——当Agent需要"请示领导"
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent开篇:自动驾驶的"手动接管"按钮
想象你正在驾驶一辆具备自动驾驶功能的汽车:
高速公路上,车辆平稳地自动行驶。突然,前方出现了一起严重交通事故,车道被杂物堵塞,导航地图也没有更新这条信息。
这时,车内响起提示音:"前方路况复杂,请驾驶员接管方向盘。"
你立刻握住方向盘,观察情况后决定减速变道,安全通过了事故路段。
这个场景里,人类驾驶员在关键时刻介入了决策过程。这不是因为自动驾驶技术不够好,而是因为:
- • 有些场景超出算法的训练范围
- • 安全攸关的决策需要人类把关
- • 人类拥有算法不具备的常识和判断力
在AI Agent的世界里,这种"关键时刻让人类介入"的机制叫做 Human-in-the-Loop(人在回路)。
9.1 为什么需要人在回路
9.1.1 Agent不是万能的
尽管DataAgent能自动完成意图识别、SQL生成、数据分析等复杂任务,但它在某些场景下仍然需要人类的智慧:
| 场景 | 为什么需要人类 |
|---|---|
| 计划审批 | Agent制定的分析计划可能不符合业务逻辑,需要业务专家确认 |
| 敏感数据 | 涉及财务、人事、客户隐私的查询,需要权限审批 |
| 高成本操作 | 生成大规模报表、调用付费API,需要确认预算 |
| 模糊需求 | 用户的问题本身不够清晰,需要人工澄清 |
| 异常结果 | Agent返回的结果明显不合理,需要人工判断是数据问题还是逻辑问题 |
9.1.2 "全自动化"的风险
假设DataAgent没有任何人工审核环节:
用户问:"删除上个月所有测试订单。"
Agent理解有误,生成了
DELETE FROM orders WHERE month = '2024-01'——删除了所有1月份的订单,包括正式订单!
如果没有人在回路,这个错误会立即执行,造成不可逆的损失。
有了人在回路:
Agent生成SQL后暂停,把计划展示给用户:"我将执行以下操作:删除orders表中2024年1月的所有记录,预计影响15,234条数据。请确认。"
用户一看:"等等,我只想让删测试订单!"——灾难避免了。
9.2 DataAgent的人在回路设计
DataAgent在计划执行前设置了人工审核环节,让用户有机会查看和修改Agent制定的分析计划。
9.2.1 整体流程
用户提问
↓
[意图识别] → [证据召回] → [查询增强] → ... → [计划生成]
↓
[计划执行器] 检查:是否启用了人工复核?
↓
是 ──────────────────────────────→ [人工反馈节点] 暂停,等待用户
│ ↓
│ 用户查看计划,选择:
│ ├─ 同意 → 继续执行
│ ├─ 拒绝+反馈 → 重新生成计划
│ └─ 不处理 → 保持暂停
│ ↓
└──────────────────────────────────── [SQL生成] → ...9.2.2 启用人工复核
用户在发起请求时,可以通过参数开启人工复核:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java
private void handleNewProcess(GraphRequest graphRequest) {
boolean nl2sqlOnly = graphRequest.isNl2sqlOnly();
// 只有当不是纯SQL生成模式时,才允许人工复核
boolean humanReviewEnabled = graphRequest.isHumanFeedback() && !(nl2sqlOnly);
Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(
Map.of(
// ... 其他参数
HUMAN_REVIEW_ENABLED, humanReviewEnabled // 把用户的选择传入State
),
RunnableConfig.builder().threadId(threadId).build()
);
}9.2.3 计划执行器的"分岔口"
PlanExecutorNode在运行时会检查HUMAN_REVIEW_ENABLED标志:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PlanExecutorNode.java
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// ... 验证计划合法性 ...
// 关键判断:是否开启人工复核?
Boolean humanReviewEnabled = state.value(HUMAN_REVIEW_ENABLED, false);
if (Boolean.TRUE.equals(humanReviewEnabled)) {
log.info("人工复核已启用:路由到人工反馈节点");
return Map.of(PLAN_VALIDATION_STATUS, true,
PLAN_NEXT_NODE, HUMAN_FEEDBACK_NODE);
}
// 未启用复核,直接继续执行计划
// ...
}这就像高速公路上的检查站:
- • 普通车辆(未开启复核)→ 直接放行
- • 危险品运输车(开启复核)→ 必须停车检查,确认安全后才能继续
9.3 人工反馈节点:HumanFeedbackNode
当流程走到HumanFeedbackNode时,整个工作流会暂停,等待用户的输入。
9.3.1 节点的核心逻辑
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/HumanFeedbackNode.java
@Slf4j
@Component
public class HumanFeedbackNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
Map<String, Object> updated = new HashMap<>();
// 检查最大修复次数,防止无限循环
int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
if (repairCount >= 3) {
log.warn("最大修复次数(3次)已达,结束流程");
updated.put("human_next_node", "END");
return updated;
}
// 检查是否有用户反馈数据
Map<String, Object> feedbackData = StateUtil.getObjectValue(
state, HUMAN_FEEDBACK_DATA, Map.class, Map.of());
if (feedbackData.isEmpty()) {
// 没有反馈 → 等待用户
updated.put("human_next_node", "WAIT_FOR_FEEDBACK");
return updated;
}
// 处理反馈结果
Object approvedValue = feedbackData.getOrDefault("feedback", true);
boolean approved = approvedValue instanceof Boolean approvedBoolean
? approvedBoolean
: Boolean.parseBoolean(approvedValue.toString());
if (approved) {
// 用户同意 → 继续执行
log.info("计划已批准 → 执行");
updated.put("human_next_node", PLAN_EXECUTOR_NODE);
updated.put(HUMAN_REVIEW_ENABLED, false); // 关闭复核,避免再次暂停
}
else {
// 用户拒绝 → 重新生成计划
log.info("计划被拒绝 → 重新生成(第{}次)", repairCount + 1);
updated.put("human_next_node", PLANNER_NODE);
updated.put(PLAN_REPAIR_COUNT, repairCount + 1);
updated.put(PLAN_CURRENT_STEP, 1);
updated.put(HUMAN_REVIEW_ENABLED, true);
// 保存用户的反馈内容,供PlannerNode参考
String feedbackContent = feedbackData.getOrDefault("feedback_content", "").toString();
updated.put(PLAN_VALIDATION_ERROR,
StringUtils.hasLength(feedbackContent) ? feedbackContent : "计划被用户拒绝");
// 清空旧的计划输出
updated.put(PLANNER_NODE_OUTPUT, "");
}
return updated;
}
}这段代码处理了三种状态:
| 状态 | 处理逻辑 |
|---|---|
| 首次到达,无反馈 | 返回WAIT_FOR_FEEDBACK,工作流暂停 |
| 用户批准 | 关闭复核标志,路由回计划执行器继续执行 |
| 用户拒绝 | 增加修复计数,清空旧计划,路由回计划生成器重新制定 |
9.3.2 中断与恢复机制
DataAgent使用CompiledGraph.interruptBefore()来实现"暂停":
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java
this.compiledGraph = stateGraph.compile(
CompileConfig.builder()
.interruptBefore(HUMAN_FEEDBACK_NODE) // 在人工反馈节点前设置断点
.build()
);这就像在乐谱上标记了一个休止符:演奏到这里暂停,等待指挥家给出继续的手势。
9.3.3 用户反馈的传递
当用户在前端看到计划并做出选择后,系统通过handleHumanFeedback方法恢复工作流:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java
private void handleHumanFeedback(GraphRequest graphRequest) {
String threadId = graphRequest.getThreadId();
String feedbackContent = graphRequest.getHumanFeedbackContent();
// 构建反馈数据
Map<String, Object> feedbackData = Map.of(
"feedback", !graphRequest.isRejectedPlan(), // 是否批准
"feedback_content", feedbackContent // 用户的具体意见
);
// 更新State
Map<String, Object> stateUpdate = new HashMap<>();
stateUpdate.put(HUMAN_FEEDBACK_DATA, feedbackData);
// 把反馈写入图的当前状态
RunnableConfig baseConfig = RunnableConfig.builder().threadId(threadId).build();
RunnableConfig updatedConfig = compiledGraph.updateState(baseConfig, stateUpdate);
// 恢复执行
RunnableConfig resumeConfig = RunnableConfig.builder(updatedConfig)
.addMetadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY, feedbackData)
.build();
Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(null, resumeConfig);
// ... 订阅流,继续处理
}这个过程就像:
- 1. 暂停:导演喊"卡",演员停在原地
- 2. 沟通:导演走过去告诉演员哪里需要调整
- 3. 恢复:导演喊"Action",演员从刚才暂停的地方继续
关键点是threadId——它确保恢复的是同一个"会话",而不是新开一个流程。
9.4 HumanFeedbackDispatcher:反馈后的路由
HumanFeedbackNode处理完用户的反馈后,还需要一个Dispatcher来决定下一步去哪:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/HumanFeedbackDispatcher.java
public class HumanFeedbackDispatcher implements EdgeAction {
@Override
public String apply(OverAllState state) throws Exception {
String nextNode = (String) state.value("human_next_node", END);
// 如果是等待反馈状态,返回END让图暂停
if ("WAIT_FOR_FEEDBACK".equals(nextNode)) {
return END;
}
return nextNode;
}
}这个Dispatcher很简单:
- • 如果还在等反馈 → 返回
END(实际上工作流已经暂停,这里的END只是优雅地结束当前步骤) - • 如果用户已批准 → 返回
PLAN_EXECUTOR_NODE,继续执行 - • 如果用户已拒绝 → 返回
PLANNER_NODE,重新生成计划
9.5 人在回路的设计原则
通过DataAgent的实现,我们可以总结出人在回路设计的几个关键原则:
原则1:选择合适的介入点
不是所有环节都需要人类介入。DataAgent选择在计划执行前暂停,因为:
- • 太早(如意图识别后):用户还没看到Agent要做什么,无法有效判断
- • 太晚(如SQL执行后):错误已经造成,无法挽回
- • 刚刚好(计划生成后):用户能完整看到Agent的"作战方案",决定要不要执行
原则2:提供清晰的上下文
让人类做决策时,必须提供足够的上下文信息。DataAgent在暂停时会通过SSE流把计划内容推送给前端,让用户看到:
- • Agent打算执行哪些步骤
- • 每个步骤用什么工具
- • 预计会产生什么结果
原则3:支持"拒绝+反馈"
人类说"不"的时候,通常伴随着"为什么"。DataAgent允许用户:
- • 拒绝计划
- • 填写具体反馈(如"不要查2023年的数据,只看2024年")
- • Agent根据反馈重新生成计划
这比简单的"通过/拒绝"更有价值。
原则4:防止无限循环
if (repairCount >= 3) {
log.warn("最大修复次数(3次)已达,结束流程");
updated.put("human_next_node", "END");
return updated;
}如果用户和Agent"杠上了"——用户一直拒绝,Agent一直重新生成——系统会在3次后强制结束,避免无限循环。
本章小结
- 1. 人在回路(Human-in-the-Loop) 是在关键环节引入人类判断的机制,用于弥补AI的局限性、规避安全风险。
- 2. DataAgent的HITL设计:在计划执行前设置暂停点,让用户审核Agent制定的分析计划。
- 3. 核心组件:
- •
GraphRequest.humanFeedback:用户决定是否启用复核 - •
PlanExecutorNode:检查复核标志,决定是否路由到人工节点 - •
HumanFeedbackNode:处理三种状态(等待反馈/批准/拒绝) - •
interruptBefore():实现工作流的暂停与恢复 - •
threadId:确保恢复的是同一个会话 - 4. 设计原则:选择合适介入点、提供清晰上下文、支持拒绝+反馈、防止无限循环。
思考题
- 1. 场景分析:假设一个医疗诊断Agent,在哪些环节应该设置人在回路?为什么?(提示:考虑误诊风险、患者隐私、治疗方案选择等因素)
- 2. 代码分析:在
HumanFeedbackNode中,如果用户拒绝了计划但feedback_content为空,系统会怎么处理?PlannerNode重新生成计划时,能利用到什么信息? - 3. 设计题:DataAgent目前只在"计划执行前"设置了一个暂停点。如果你是产品经理,还会在哪里增加人在回路?请说明理由和具体实现思路。
第十章:安全与容错——让Agent"稳中求胜"
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent黑箱 vs 白箱——你信任一个你看不见的系统吗?
想象一下,你走进一家餐厅,点了一道菜。厨师在封闭的厨房里忙碌,你看不见他用了什么食材、怎么烹饪、花了多长时间。最后端上来一盘菜——好吃,但你完全不知道为什么好吃。如果下次再来,味道变了,你也不知道问题出在哪里。
这就是黑箱系统:输入进去,输出出来,中间发生了什么,一无所知。
再想象另一家餐厅,厨房是透明的玻璃房。你能看见厨师切菜、炒菜、调味,甚至能闻到香味。如果哪道菜咸了,你一眼就能看出是盐放多了。
这就是白箱系统:整个过程透明可见,每个环节都可追溯、可理解、可调试。
Agent系统,尤其是基于大语言模型(LLM)的Agent,天然带有"黑箱"属性:
- • LLM的输出有随机性,同样的输入可能得到不同的结果
- • Agent的决策链条很长,涉及多个节点、多次工具调用
- • 出了问题很难定位:是Prompt写错了?是工具返回了错误数据?还是LLM" hallucination"(幻觉)了?
可观测性(Observability) 就是让黑箱变白箱的能力。它不是可有可无的"锦上添花",而是生产级Agent系统的必备基础设施。
本章将带你理解可观测性的三大支柱,看看DataAgent是如何实现全方位观测的,以及如何在实际开发中调试Agent系统。
为什么Agent特别需要可观测性
1. LLM的随机性让行为难以预测
传统软件是确定性的:输入A,永远输出B。但LLM是概率性的:输入A,可能输出B、C或D,每次都有细微差别。
这种随机性带来几个挑战:
- • 难以复现:用户报告了一个bug,你用同样的输入测试,可能无法复现
- • 难以调试:不知道LLM为什么会给出某个答案
- • 难以优化:不清楚哪个环节是性能瓶颈
2. Agent的决策链条长且复杂
一个典型的Agent执行流程可能涉及:
- 1. 意图识别(Intent Recognition)
- 2. 知识检索(RAG)
- 3. 计划生成(Planning)
- 4. 多轮工具调用(Tool Use)
- 5. 结果汇总与报告生成
每个环节都可能出错。如果没有观测手段,你只能在最后看到"答案错了",却不知道错在哪一步。
3. 生产环境需要快速定位问题
当Agent系统上线后,可能遇到各种问题:
- • 响应变慢了,是LLM API延迟增加?还是某个工具执行超时?
- • 答案质量下降,是检索召回率降低?还是Prompt被污染了?
- • 某个用户反馈Agent"疯了",你需要快速查看完整的执行轨迹
没有观测,就没有诊断。没有诊断,就没有修复。
可观测性的三大支柱
业界将可观测性总结为三大支柱(Three Pillars of Observability):
第一支柱:日志(Logging)——记录发生了什么
日志是系统运行过程中产生的文本记录,就像飞机的"黑匣子",记录了每个关键时刻发生了什么。
日志的核心作用是:
- • 事后追溯:出问题后查看日志,还原现场
- • 状态记录:记录关键变量的值、决策结果
- • 异常捕获:记录错误堆栈和异常信息
DataAgent中的日志示例:
// GraphServiceImpl.java
log.info("Stream processing completed successfully for threadId: {}", threadId);
log.error("Error in stream processing for threadId: {}: ", threadId, error);
log.debug("Received Stream output: {}", chunk);这三行日志分别对应:
- •
INFO:正常流程的关键节点(流处理完成) - •
ERROR:异常和错误(流处理出错) - •
DEBUG:详细调试信息(收到流式输出片段)
日志级别小贴士:生产环境通常开启INFO级别,开发环境可以开启DEBUG级别。ERROR级别必须记录足够的信息用于排查问题。
第二支柱:指标(Metrics)——量化性能表现
指标是将系统行为量化为数字,用于监控和告警。
DataAgent通过Langfuse记录了详细的LLM指标:
// LangfuseService.java
private static final AttributeKey<Long> GEN_AI_PROMPT_TOKENS =
AttributeKey.longKey("gen_ai.usage.prompt_tokens");
private static final AttributeKey<Long> GEN_AI_COMPLETION_TOKENS =
AttributeKey.longKey("gen_ai.usage.completion_tokens");
private static final AttributeKey<Long> GEN_AI_TOTAL_TOKENS =
AttributeKey.longKey("gen_ai.usage.total_tokens");这些指标会被自动上报到Langfuse平台,你可以:
- • 查看每次请求的Token消耗
- • 分析哪个节点最"费Token"
- • 监控成本趋势,及时发现异常
第三支柱:追踪(Tracing)——跟踪请求流转路径
追踪是记录一个请求在系统中完整的流转路径,就像快递的物流轨迹。
一个用户请求进入Agent系统后,可能经过:
用户请求 → GraphController → GraphService → IntentRecognitionNode →
EvidenceRecallNode → PlannerNode → SqlGenerateNode → SqlExecuteNode →
ReportGeneratorNode → 返回用户分布式追踪能记录每个环节的:
- • 开始时间和结束时间(计算耗时)
- • 输入和输出(查看数据变化)
- • 是否出错(错误类型和堆栈)
DataAgent使用OpenTelemetry实现分布式追踪:
// OpenTelemetryConfig.java
@Bean
public OpenTelemetry openTelemetry() {
// 创建 OTLP HTTP 导出器,将追踪数据发送到 Langfuse
OtlpHttpSpanExporter spanExporter = OtlpHttpSpanExporter.builder()
.setEndpoint(host + "/api/public/otel/v1/traces")
.addHeader("Authorization", "Basic " + encodedAuth)
.setTimeout(10, TimeUnit.SECONDS)
.build();
// 配置资源信息(服务名称等)
Resource resource = Resource.getDefault()
.merge(Resource.create(Attributes.of(
AttributeKey.stringKey("service.name"), SERVICE_NAME)));
// 创建TracerProvider并注册Span处理器
tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(spanExporter)
.setScheduleDelay(1, TimeUnit.SECONDS)
.setMaxExportBatchSize(100)
.build())
.setResource(resource)
.build();
return OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.build();
}这段代码做了三件事:
- 1. 配置导出器:将追踪数据发送到Langfuse的OTLP接收端点
- 2. 配置资源:标识这是"data-agent"服务的数据
- 3. 批处理:每1秒或每100个Span批量发送,减少网络开销
DataAgent的可观测性设计
DataAgent的可观测性体系可以用一张图概括:
┌─────────────────────────────────────────────────────────────┐
│ DataAgent 可观测性体系 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 用户请求 │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 日志系统 │ │ 指标系统 │ │ 追踪系统 │ │
│ │ (Slf4j) │ │ (Langfuse) │ │(OpenTelemetry)│ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Langfuse 可视化平台 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 追踪链路 │ │ Token统计│ │ 延迟分析 │ │ │
│ │ │ (Traces)│ │ (Usage) │ │(Latency)│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘Langfuse集成:追踪每次LLM调用
Langfuse是一个开源的LLM可观测性平台,DataAgent通过OpenTelemetry协议与其集成。
LangfuseService是DataAgent的核心观测组件:
// LangfuseService.java(简化版)
@Component
public class LangfuseService {
private final Tracer tracer;
private final boolean enabled;
// 按 threadId 隔离的 Token 累计器
private static final ConcurrentHashMap<String, long[]> TOKEN_ACCUMULATOR =
new ConcurrentHashMap<>();
/**
* 开始一个追踪 Span
*/
public Span startLLMSpan(String spanName, GraphRequest request) {
Span span = tracer.spanBuilder(spanName)
.setSpanKind(SpanKind.CLIENT)
.setParent(Context.current())
.startSpan();
// 记录请求上下文
span.setAttribute("input.value", request.getQuery());
span.setAttribute("data_agent.agent_id", request.getAgentId());
span.setAttribute("data_agent.thread_id", request.getThreadId());
// 初始化 Token 累计器
TOKEN_ACCUMULATOR.put(request.getThreadId(), new long[] { 0, 0 });
return span;
}
/**
* 累计 Token 用量(线程安全)
*/
public static void accumulateTokens(Object threadId, long promptTokens, long completionTokens) {
long[] tokens = TOKEN_ACCUMULATOR.get(threadId);
if (tokens != null) {
synchronized (tokens) {
tokens[0] += promptTokens; // Prompt Tokens
tokens[1] += completionTokens; // Completion Tokens
}
}
}
/**
* 结束 Span(成功)
*/
public void endSpanSuccess(Span span, String threadId, String output) {
span.setAttribute("output.value", output);
// 写入累计的 Token 数据
applyAccumulatedTokens(span, threadId);
span.setStatus(StatusCode.OK);
span.end();
}
}这段代码展示了几个关键设计:
- 1. Span生命周期管理:
startLLMSpan创建追踪,endSpanSuccess/endSpanError结束追踪 - 2. 线程安全的Token累计:使用
ConcurrentHashMap+synchronized数组,支持多线程并发 - 3. 丰富的上下文信息:记录query、agentId、threadId等,方便后续检索
FluxUtil中的Token累积统计
DataAgent的流式处理中,每次LLM响应都会经过FluxUtil,这里会提取并累计Token:
// FluxUtil.java(关键片段)
private static void extractAndAccumulateTokens(Object threadId, ChatResponse response) {
if (threadId == null || response.getMetadata() == null) {
return;
}
Usage usage = response.getMetadata().getUsage();
if (usage != null && (usage.getPromptTokens() > 0 || usage.getCompletionTokens() > 0)) {
// 将本次调用的 Token 用量累计到 Langfuse
LangfuseService.accumulateTokens(
threadId,
usage.getPromptTokens(),
usage.getCompletionTokens()
);
}
}这个方法在每次收到LLM响应时自动调用,实现了无侵入式的指标采集——业务代码不需要关心Token统计,FluxUtil自动完成。
StreamContext:流式处理的上下文管理
每个用户请求在DataAgent中都有一个独立的StreamContext,封装了所有相关状态:
// StreamContext.java(简化版)
@Data
public class StreamContext {
private Disposable disposable; // 用于取消订阅
private Sinks.Many<ServerSentEvent<GraphNodeResponse>> sink; // 流式输出通道
private Span span; // OpenTelemetry 追踪 Span
private TextType textType; // 当前文本类型
// 收集流式输出内容,用于 Langfuse 上报
private final StringBuilder outputCollector = new StringBuilder();
// 线程安全的清理标记
private final AtomicBoolean cleaned = new AtomicBoolean(false);
public void appendOutput(String chunk) {
outputCollector.append(chunk);
}
public String getCollectedOutput() {
return outputCollector.toString();
}
/**
* 线程安全的资源清理
*/
public void cleanup() {
// 使用 CAS 确保只执行一次
if (!cleaned.compareAndSet(false, true)) {
return;
}
// 清理 Disposable 和 Sink...
}
}StreamContext的设计亮点:
- • 按threadId隔离:每个对话有独立的上下文,互不干扰
- • 自动收集输出:
outputCollector累积所有输出,用于最终上报 - • 线程安全清理:
AtomicBoolean+compareAndSet确保资源只释放一次,避免重复清理导致的异常
流式输出与实时反馈
SSE技术:让用户"看见"Agent思考
传统的HTTP请求是"一问一答":客户端发送请求,服务端处理完成后一次性返回结果。但对于Agent系统,用户可能需要等待几秒甚至几十秒,这段时间页面一片空白,体验很差。
SSE(Server-Sent Events,服务器发送事件) 解决了这个问题。它允许服务端在请求处理过程中,实时推送中间结果给客户端。
DataAgent的SSE端点实现:
// GraphController.java(简化版)
@RestController
public class GraphController {
@GetMapping(value = "/api/stream/search",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<GraphNodeResponse>> streamSearch(
@RequestParam("agentId") String agentId,
@RequestParam("query") String query,
ServerHttpResponse response) {
// 设置 SSE 相关的 HTTP 头
response.getHeaders().add("Cache-Control", "no-cache");
response.getHeaders().add("Connection", "keep-alive");
// 创建 Sink(流式数据通道)
Sinks.Many<ServerSentEvent<GraphNodeResponse>> sink =
Sinks.many().unicast().onBackpressureBuffer();
// 构建请求并启动图处理
GraphRequest request = GraphRequest.builder()
.agentId(agentId)
.query(query)
.build();
graphService.graphStreamProcess(sink, request);
// 返回 Flux,Spring 自动处理 SSE 格式
return sink.asFlux()
.doOnSubscribe(subscription ->
log.info("Client subscribed to stream"))
.doOnCancel(() -> {
log.info("Client disconnected");
graphService.stopStreamProcessing(threadId);
})
.doOnError(error -> {
log.error("Stream error", error);
graphService.stopStreamProcessing(threadId);
})
.doOnComplete(() ->
log.info("Stream completed successfully"));
}
}这段代码的关键点:
- 1.
produces = MediaType.TEXT_EVENT_STREAM_VALUE:声明这是一个SSE端点 - 2.
Sinks.Many:Project Reactor的流式数据通道,支持多生产者单消费者 - 3. 生命周期钩子:
doOnSubscribe(客户端连接)、doOnCancel(客户端断开)、doOnError(出错)、doOnComplete(完成) - 4. 自动清理:客户端断开时,自动停止对应的流处理,释放资源
GraphNodeResponse:流式数据的封装
每个推送给前端的数据包都封装为GraphNodeResponse:
// GraphNodeResponse.java
@Data
@Builder
public class GraphNodeResponse {
private String agentId; // Agent 标识
private String threadId; // 对话线程标识
private String nodeName; // 当前节点名称(如"PlannerNode")
private TextType textType; // 文本类型(如思考过程、SQL代码、最终结果)
private String text; // 实际内容
@Builder.Default
private boolean error = false; // 是否出错
@Builder.Default
private boolean complete = false; // 是否完成
// 便捷工厂方法
public static GraphNodeResponse error(String agentId, String threadId, String text) { ... }
public static GraphNodeResponse complete(String agentId, String threadId) { ... }
}前端收到这样的数据流:
event: message
data: {"nodeName":"IntentRecognitionNode","text":"用户想查询销售额","textType":"THINKING"}
event: message
data: {"nodeName":"PlannerNode","text":"计划:1.生成SQL 2.执行查询","textType":"PLAN"}
event: message
data: {"nodeName":"SqlGenerateNode","text":"SELECT SUM(amount) FROM orders","textType":"SQL"}
event: message
data: {"nodeName":"SqlExecuteNode","text":"查询结果:1,234,567元","textType":"RESULT"}
event: complete
data: {"complete":true}用户能实时看到Agent在每个节点做了什么,而不是干等最后结果。这种透明感极大提升了用户体验和信任度。
调试技巧实战
技巧一:查看每个Node的输入输出状态
DataAgent的日志系统记录了每个节点的执行:
// GraphServiceImpl.java
private void handleNodeOutput(GraphRequest request, NodeOutput output) {
log.debug("Received output: {}", output.getClass().getSimpleName());
if (output instanceof StreamingOutput streamingOutput) {
String node = output.node();
String chunk = output.chunk();
log.debug("Node: {}, Chunk: {}", node, chunk);
// ...
}
}调试方法:开启DEBUG级别日志,查看完整的节点输出流。
技巧二:分析Prompt和LLM响应
在Langfuse平台上,你可以查看每次LLM调用的完整信息:
- • 输入Prompt:实际发送给LLM的内容
- • 输出响应:LLM返回的原始文本
- • Token用量:Prompt Token数、Completion Token数
- • 延迟:从请求到响应的时间
如果Agent输出异常,首先检查:
- 1. Prompt是否正确拼接?变量是否被正确替换?
- 2. LLM响应是否包含异常内容?
- 3. Token用量是否超限?
技巧三:追踪执行路径
通过OpenTelemetry的Trace ID,你可以在Langfuse中查看完整的执行链路:
Trace: graph-stream (threadId: abc-123)
├── Span: IntentRecognitionNode (200ms)
├── Span: EvidenceRecallNode (350ms)
├── Span: PlannerNode (800ms)
├── Span: SqlGenerateNode (1200ms)
├── Span: SqlExecuteNode (150ms)
└── Span: ReportGeneratorNode (600ms)一眼就能看出:SqlGenerateNode耗时最长(1.2秒),是性能瓶颈,可能需要优化Prompt或更换更快的模型。
技巧四:本地快速验证
DataAgent支持nl2sqlOnly模式,只执行核心链路,方便快速验证:
// GraphServiceImpl.java
@Override
public String nl2sql(String naturalQuery, String agentId) {
OverAllState state = compiledGraph
.invoke(Map.of(IS_ONLY_NL2SQL, true, INPUT_KEY, naturalQuery, AGENT_ID, agentId),
RunnableConfig.builder().build())
.orElseThrow();
return state.value(SQL_GENERATE_OUTPUT, "");
}这个模式跳过了计划生成、报告生成等复杂节点,直接测试NL2SQL核心能力,适合本地调试。
本章小结
本章我们学习了Agent系统的可观测性设计:
- 1. 为什么需要可观测性:LLM的随机性、Agent决策链的复杂性、生产环境快速定位问题的需求
- 2. 三大支柱:
- • 日志:记录发生了什么(Slf4j)
- • 指标:量化性能表现(Token、延迟、成功率)
- • 追踪:跟踪请求流转路径(OpenTelemetry + Langfuse)
- 3. DataAgent的实践:
- • LangfuseService实现LLM调用追踪和Token统计
- • FluxUtil自动提取并累计Token用量
- • StreamContext管理每个请求的完整生命周期
- • SSE流式输出让用户实时看到Agent思考过程
- 4. 调试技巧:查看节点输入输出、分析Prompt和响应、追踪执行路径、本地快速验证
记住:可观测性不是事后补丁,而是应该从项目第一天就开始设计的基础设施。
思考题
- 1. 场景题:你收到用户反馈说"Agent今天回答特别慢",你会如何通过DataAgent的可观测性体系定位问题?请按步骤说明你会查看哪些日志、指标和追踪信息。
- 2. 设计题:假设你要为一个客服Agent设计可观测性方案,除了本章提到的三大支柱,你认为还需要监控哪些业务指标?如何设计一个"答案质量评分"的指标?
第十一章:观测与调试——让Agent透明可控
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent电梯里的安全哲学
想象你走进一栋摩天大楼的电梯:
你按下100层,电梯平稳上升。突然,钢缆发出一声异响,电梯轻微晃动。
但下一秒,电梯并没有坠落——它稳稳地停在了最近的楼层,门自动打开,报警铃声响起,通风系统启动。
你安全地走了出来。虽然这次乘梯体验不算愉快,但你毫发无伤。
这背后是一套精密的安全系统在运作:
- • 限速器:检测到速度异常立即制动
- • 安全钳:钢缆断裂时卡住轨道
- • 缓冲器:万一坠落,底部弹簧吸收冲击
- • 备用电源:停电时维持照明和通风
安全系统的设计哲学是:假设最坏的情况一定会发生,然后提前准备好应对措施。
Agent系统也是如此。LLM可能生成错误的SQL,数据库可能连接失败,Python代码可能包含死循环。一个成熟的Agent必须在"出问题"时保护自己、保护用户、保护系统。
10.1 Agent系统的风险全景
在深入DataAgent的安全设计之前,让我们先看看Agent系统面临的主要风险:
| 风险类型 | 具体表现 | 危害程度 |
|---|---|---|
| 数据风险 | LLM生成DELETE或DROP语句,误删数据 | 🔴 极高 |
| 代码风险 | Python代码包含os.system("rm -rf /") | 🔴 极高 |
| 资源风险 | SQL查询全表扫描,拖垮数据库 | 🟠 高 |
| 连接风险 | 数据库密码错误、网络中断 | 🟡 中 |
| 逻辑风险 | LLM理解错误,生成与意图不符的SQL | 🟡 中 |
| 服务风险 | 节点无限重试,导致系统卡死 | 🟠 高 |
DataAgent针对这些风险,设计了一套多层次的防御体系。
10.2 第一层防御:输入验证与计划校验
10.2.1 PlanExecutorNode的计划验证
在计划执行前,DataAgent会严格检查计划是否合法:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/PlanExecutorNode.java
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 1. 验证Plan是否能正确解析
Plan plan;
try {
plan = PlanProcessUtil.getPlan(state);
}
catch (Exception e) {
log.error("计划验证失败:JSON解析错误", e);
return buildValidationResult(state, false,
"验证失败:计划不是有效的JSON结构。错误:" + e.getMessage());
}
// 2. 验证计划结构是否完整
if (!validateExecutionPlanStructure(plan)) {
return buildValidationResult(state, false,
"验证失败:生成的计划为空或没有执行步骤。");
}
// 3. 逐一验证每个执行步骤
for (ExecutionStep step : plan.getExecutionPlan()) {
String validationResult = validateExecutionStep(step);
if (validationResult != null) {
return buildValidationResult(state, false, validationResult);
}
}
log.info("计划验证成功。");
// ... 继续执行
}这个验证过程就像机场安检:
- • 第一步:检查证件是否有效(JSON能否解析)
- • 第二步:检查行李是否为空(计划是否有内容)
- • 第三步:逐件检查行李内容(每个步骤的参数是否合法)
10.2.2 步骤级别的参数校验
private String validateExecutionStep(ExecutionStep step) {
// 验证工具名称是否在白名单中
if (step.getToolToUse() == null || !SUPPORTED_NODES.contains(step.getToolToUse())) {
return "验证失败:计划包含非法工具名:'" + step.getToolToUse() + "'";
}
// 验证工具参数是否存在
if (step.getToolParameters() == null) {
return "验证失败:第" + step.getStep() + "步缺少工具参数";
}
// 根据节点类型,验证特定必填参数
switch (step.getToolToUse()) {
case SQL_GENERATE_NODE:
if (!StringUtils.hasText(step.getToolParameters().getInstruction())) {
return "验证失败:SQL生成节点缺少描述";
}
break;
// ... 其他节点类似
}
return null; // 验证通过
}白名单机制是关键:SUPPORTED_NODES只包含预定义的合法节点类型(SQL_GENERATE_NODE、PYTHON_GENERATE_NODE、REPORT_GENERATOR_NODE)。即使LLM"突发奇想"生成了一个不存在的节点名,也会被拦截。
10.3 第二层防御:语义一致性校验
10.3.1 SQL的"二次确认"
LLM生成的SQL语法上可能正确,但语义上可能与用户的真实意图不符。DataAgent设置了**SemanticConsistencyNode(语义一致性节点)**来做"二次确认"。
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SemanticConsistencyNode.java
@Slf4j
@Component
public class SemanticConsistencyNode implements NodeAction {
private final Nl2SqlService nl2SqlService;
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 获取用户原始问题、生成的SQL、数据库Schema
String sql = StateUtil.getStringValue(state, SQL_GENERATE_OUTPUT);
String userQuery = StateUtil.getCanonicalQuery(state);
SchemaDTO schemaDTO = StateUtil.getObjectValue(state, TABLE_RELATION_OUTPUT, SchemaDTO.class);
String evidence = StateUtil.getStringValue(state, EVIDENCE);
// 构建语义校验请求
SemanticConsistencyDTO dto = SemanticConsistencyDTO.builder()
.dialect(dialect)
.sql(sql)
.executionDescription(getCurrentExecutionStepInstruction(state))
.schemaInfo(buildMixMacSqlDbPrompt(schemaDTO, true))
.userQuery(userQuery)
.evidence(evidence)
.build();
// 调用LLM进行语义校验
Flux<ChatResponse> validationResultFlux = nl2SqlService.performSemanticConsistency(dto);
// 处理结果
return FluxUtil.createStreamingGeneratorWithMessages(this.getClass(), state,
"开始语义一致性校验", "语义一致性校验完成",
validationResult -> {
boolean isPassed = !validationResult.startsWith("不通过");
return buildValidationResult(isPassed, validationResult);
}, validationResultFlux);
}
}这个节点就像翻译的"回译校验":
- • 用户说中文(业务问题)
- • Agent翻译成SQL(第一轮翻译)
- • SemanticConsistencyNode把SQL"回译"成业务语言
- • 对比"回译结果"和"用户的原始问题"是否一致
如果不一致,说明SQL可能偏离了用户意图。
10.3.2 失败时的重试机制
当语义校验不通过时,DataAgent不会直接报错结束,而是记录原因、返回重试:
private Map<String, Object> buildValidationResult(boolean passed, String validationResult) {
if (passed) {
return Map.of(SEMANTIC_CONSISTENCY_NODE_OUTPUT, true);
}
else {
// 不通过时,记录失败原因,供SQL生成节点重新生成时使用
return Map.of(
SEMANTIC_CONSISTENCY_NODE_OUTPUT, false,
SQL_REGENERATE_REASON, SqlRetryDto.semantic(validationResult)
);
}
}SqlRetryDto是一个专门用于记录重试原因的数据结构:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/dto/datasource/SqlRetryDto.java
public record SqlRetryDto(String reason, boolean semanticFail, boolean sqlExecuteFail) {
public static SqlRetryDto semantic(String reason) {
return new SqlRetryDto(reason, true, false); // 语义校验失败
}
public static SqlRetryDto sqlExecute(String reason) {
return new SqlRetryDto(reason, false, true); // SQL执行失败
}
public static SqlRetryDto empty() {
return new SqlRetryDto("", false, false); // 无失败
}
}这个设计很精巧:
- • 语义失败(
semanticFail=true):LLM理解有误,需要重新生成SQL - • 执行失败(
sqlExecuteFail=true):SQL语法错误 or 数据库问题,需要修复SQL - • 原因分类:让后续节点知道"该怎么修"
10.4 第三层防御:代码沙箱
10.4.1 为什么需要沙箱
DataAgent允许LLM生成并执行Python代码。这功能很强大,但也极其危险:
# LLM"不小心"生成了这样的代码?
import os
os.system("rm -rf /") # 删除服务器上所有文件!
# 或者
while True:
pass # 死循环,耗尽CPU如果没有隔离机制,Agent就变成了黑客的帮凶。
10.4.2 Docker沙箱:把危险关在笼子里
DataAgent使用Docker容器来执行Python代码,实现严格的资源隔离:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/code/impls/DockerCodePoolExecutorService.java
public class DockerCodePoolExecutorService extends AbstractCodePoolExecutorService {
protected String createNewContainer() throws Exception {
String containerName = this.generateContainerName();
// 创建容器的主机配置(限制资源、挂载目录)
HostConfig hostConfig = this.createHostConfig(tempDir);
// 创建容器
CreateContainerResponse container = dockerClient.createContainerCmd(properties.getImageName())
.withName(containerName)
.withWorkingDir("/app")
.withHostConfig(hostConfig)
.withCmd("sh", "-c", cmd)
.exec();
return container.getId();
}
}10.4.3 资源限制
private HostConfig createHostConfig(Path tempDir) {
return newHostConfig()
// 限制内存使用(防止内存耗尽)
.withMemory(this.properties.getLimitMemory() * 1024L * 1024L)
// 限制CPU核心数(防止CPU占满)
.withCpuCount(this.properties.getCpuCore())
// 丢弃所有Linux Capability(最小权限原则)
.withCapDrop(Capability.ALL)
// 使用临时文件系统,不持久化
.withTmpFs(Map.of("/tmp", ""))
// 网络隔离(可选)
.withNetworkMode(this.properties.getNetworkMode());
}这套限制就像监狱的安保系统:
- • 内存限制:囚犯(代码)只能使用指定的床铺大小
- • CPU限制:囚犯只能占用指定的活动区域
- • 权限剥离:囚犯没有任何特权(
Capability.ALL全部丢弃) - • 网络隔离:囚犯不能与外界通信(根据配置)
10.4.4 执行超时
private String buildExecutionCommand(Path tempDir) {
return String.format(
"... timeout -s SIGKILL %s python3 -u script.py < input_data.txt",
properties.getCodeTimeout() // 超时时间(秒)
);
}即使代码陷入了死循环,timeout命令也会在指定时间后强制终止进程。
10.4.5 日志大小限制
private void appendWithLimit(StringBuilder builder, String payload, int limit) {
if (builder.length() < limit) {
builder.append(payload);
}
else if (builder.length() == limit) {
builder.append("\n...[Output truncated due to size limit]...");
builder.append(" "); // 防止重复进入
}
}即使代码疯狂输出日志(比如while True: print("ha")),日志大小也会被限制在5MB以内,防止耗尽磁盘空间。
10.5 第四层防御:错误码与优雅降级
10.5.1 数据库错误标准化
DataAgent定义了一套完整的错误码体系,把数据库层面的原始错误翻译成用户友好的提示:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/ErrorCodeEnum.java
public enum ErrorCodeEnum {
SUCCESS("0", "操作成功"),
INVALID_PARAM("10", "无效参数"),
// 连接类错误
DATASOURCE_CONNECTION_FAILURE_08001("08001",
"无法建立数据库连接,请检查配置的IP/域名是否正确"),
CONNECTION_LOST_08002("08002", "连接丢失:服务器关闭"),
// 权限类错误
INSUFFICIENT_PRIVILEGE_42501("42501", "权限不足"),
PASSWORD_ERROR_28P01("28P01", "密码错误"),
// 数据类错误
DATABASE_NOT_EXIST_3D000("3D000", "数据库不存在"),
SCHEMA_NOT_EXIST_3D070("3D070", "模式不存在"),
// 兜底错误
OTHERS("100", "未知错误,请联系技术人员排查");
// 根据SQLState错误码查找对应的中文提示
public static ErrorCodeEnum fromCode(String code) {
for (ErrorCodeEnum rc : values()) {
if (code.equals(rc.getCode())) {
return rc;
}
}
return OTHERS; // 找不到就返回兜底错误
}
}这套机制的好处:
- • 用户友好:用户看到"密码错误"而不是一大串Java异常堆栈
- • 问题定位:"08001"这样的错误码方便开发人员快速定位问题类型
- • 统一处理:所有数据库错误都走同一套处理逻辑
10.5.2 线程池与优雅关闭
DataAgent使用专用线程池处理数据库操作,并在应用关闭时优雅地释放资源:
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/config/DataAgentConfiguration.java
@Bean(name = "dbOperationExecutor")
public ExecutorService dbOperationExecutor() {
int corePoolSize = Math.max(4,
Math.min(Runtime.getRuntime().availableProcessors() * 2, 16));
return new ThreadPoolExecutor(
corePoolSize, corePoolSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500), // 队列上限500个任务
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时,调用者线程自己执行
);
}
@Override
public void destroy() {
if (dbOperationExecutor != null && !dbOperationExecutor.isShutdown()) {
dbOperationExecutor.shutdown(); // 1. 停止接收新任务
if (!dbOperationExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
dbOperationExecutor.shutdownNow(); // 2. 超时后强制关闭
}
}
}这套机制确保:
- • 不会无限堆积任务:队列上限500,超过后由调用者线程自己执行(自然降速)
- • 应用关闭不丢任务:先等60秒让已有任务完成,实在不行才强制关闭
- • 资源不泄漏:线程池被正确关闭,不会导致内存泄漏
10.6 第五层防御:重试与熔断
10.6.1 计划修复的"三次法则"
当计划验证失败时,DataAgent允许最多2次修复(加上初始尝试,共3次):
// 文件路径:data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/dispatcher/PlanExecutorDispatcher.java
private static final int MAX_REPAIR_ATTEMPTS = 2;
@Override
public String apply(OverAllState state) {
boolean validationPassed = StateUtil.getObjectValue(state, PLAN_VALIDATION_STATUS, Boolean.class, false);
if (!validationPassed) {
int repairCount = StateUtil.getObjectValue(state, PLAN_REPAIR_COUNT, Integer.class, 0);
if (repairCount > MAX_REPAIR_ATTEMPTS) {
log.error("计划修复次数超过{}次限制,终止执行", MAX_REPAIR_ATTEMPTS);
return END; // 不再重试,结束流程
}
return PLANNER_NODE; // 安排重试
}
}这遵循了工程界的**"三次法则"**:
- • 第一次失败:可能是偶然,再试一次
- • 第二次失败:可能有问题,再给它最后一次机会
- • 第三次失败:系统性地不行,放弃并报告
10.6.2 表关系分析的循环重试
.addConditionalEdges(TABLE_RELATION_NODE, edge_async(new TableRelationDispatcher()),
Map.of(FEASIBILITY_ASSESSMENT_NODE, FEASIBILITY_ASSESSMENT_NODE,
END, END,
TABLE_RELATION_NODE, TABLE_RELATION_NODE)) // 失败后回到自身重试如果表关系分析因为网络抖动暂时失败,系统会自动重试,而不是直接报错退出。
10.7 安全设计原则总结
DataAgent的多层防御体系可以用一张图概括:
用户请求
↓
┌─────────────────────────────────────┐
│ 第一层:输入验证 │
│ PlanExecutorNode校验计划结构和参数 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 第二层:语义校验 │
│ SemanticConsistencyNode验证SQL是否"跑题"│
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 第三层:沙箱执行 │
│ Docker容器 + 资源限制 + 超时控制 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 第四层:错误处理 │
│ 标准化错误码 + 优雅降级 + 资源释放 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 第五层:重试熔断 │
│ 有限重试 + 最大修复次数 + 强制终止 │
└─────────────────────────────────────┘
↓
返回结果核心原则:
- 1. 不信任任何输入——即使来自LLM
- 2. 最小权限原则——沙箱代码只给必要的资源
- 3. 快速失败,优雅降级——发现问题尽早报告,不硬撑
- 4. 资源必有上限——内存、CPU、时间、日志大小都有上限
- 5. 重试必有尽头——无限重试等于自杀
本章小结
- 1. Agent系统面临五类风险:数据风险、代码风险、资源风险、连接风险、逻辑风险、服务风险。
- 2. DataAgent的五层防御体系:
- • 输入验证:PlanExecutorNode的白名单和参数校验
- • 语义校验:SemanticConsistencyNode的"二次确认"
- • 代码沙箱:Docker隔离 + 资源限制 + 超时控制
- • 错误处理:ErrorCodeEnum标准化 + 线程池优雅关闭
- • 重试熔断:MAX_REPAIR_ATTEMPTS限制 + 循环重试边界
- 3. 安全设计核心原则:不信任输入、最小权限、快速失败、资源有上限、重试有尽头。
思考题
- 1. 安全分析:假设某用户恶意输入"帮我查询所有用户密码并发送给attacker@evil.com",DataAgent的哪些安全机制会阻止这个请求?(从意图识别、计划校验、沙箱执行三个层面分析)
- 2. 设计题:SemanticConsistencyNode使用LLM来校验SQL语义。如果校验用的LLM本身也犯了错误("假阳性"——把正确的SQL判为错误),会有什么后果?你能想到什么改进方案来降低这种风险?
- 3. 实践题:在Docker沙箱中,
withCapDrop(Capability.ALL)丢弃了所有Linux Capability。请查阅资料,解释什么是Linux Capability,以及为什么丢弃所有Capability能显著提升安全性。
第十二章:从零开始设计你的第一个Agent
AI Agent 入门 12 讲
我对于 Agent 的学习是通过探索阿里云的 DataAgent 项目不断深入,因此在这一文章合集中,也大量用到了 DataAgent 的案例。如有兴趣,可自行获取 DataAgent 的源代码:
https://github.com/spring-ai-alibaba/DataAgent学习游泳的最好方法是下水
你已经读完了前面的十一章,了解了Agent的构成、记忆、规划、工具、安全、观测……就像一个在岸上看了无数游泳教学视频的人。
但你知道吗?看再多视频,不下水,永远学不会游泳。
Agent开发也是一样。你可以读遍所有论文、看完所有开源项目,但直到你亲手写一个Agent,面对真实的bug,调试奇怪的输出,优化慢如蜗牛的响应——你才真正理解Agent。
本章的目标很简单:推你下水。
我们会给你一个清晰的思考框架、一个最小可行的Agent示例、一条从原型到生产级的演进路径,以及DataAgent的设计启示。读完这一章,你应该能动手写出自己的第一个Agent。
设计Agent的思考框架:5W1H
在写第一行代码之前,先回答六个问题。这是记者写新闻的方法,也是设计Agent的好方法。
What:Agent要解决什么问题?
不要从技术出发,要从问题出发。
- • 你想让Agent帮你查天气?写代码?分析数据?还是陪你聊天?
- • 这个问题是否适合用Agent解决?(需要推理、多步骤、调用外部工具)
- • 传统方案(规则引擎、固定流程)为什么不能很好解决?
DataAgent的答案:帮助企业用户通过自然语言查询数据库,自动生成SQL、执行分析、生成报告。传统BI工具需要学习成本,Agent让"说话就能分析"成为可能。
Who:目标用户是谁?
- • 技术背景:用户是程序员还是业务人员?
- • 使用场景:是日常办公辅助,还是关键业务决策?
- • 容忍度:用户对错误答案的容忍度如何?(医疗Agent vs 天气Agent)
DataAgent的答案:企业中的数据分析师和业务人员。他们懂业务但可能不懂SQL,需要Agent bridge the gap(弥合鸿沟)。
When:什么时候需要人类介入?
这是人机协作设计的关键:
- • 哪些决策Agent可以自主做?
- • 哪些必须人工确认?(如删除数据、转账、发布内容)
- • 什么情况下Agent应该"求助"人类?
DataAgent的答案:在计划生成后、执行前,设置HumanFeedbackNode。用户可以看到Agent的执行计划,确认或修改后再继续。这避免了Agent"一意孤行"执行危险操作。
Where:运行在什么环境?
- • 部署方式:本地运行、私有服务器、公有云?
- • 资源限制:有多少GPU/CPU?内存多大?
- • 合规要求:数据能否出域?是否需要审计日志?
DataAgent的答案:企业私有部署,支持多种LLM(本地/云端)。数据不出域,所有操作有Langfuse追踪记录。
Why:为什么用Agent而不是传统方案?
诚实地回答这个问题。Agent不是银弹,有些场景传统方案更好
| 场景 | 传统方案 | Agent方案 |
|---|---|---|
| 固定流程的审批 | 工作流引擎 | 过度设计 |
| 简单的数据查询 | SQL界面 | 可以,但非必须 |
| 复杂的多步骤分析 | 需要人工拆解 | Agent擅长 |
| 开放式问题回答 | 搜索+规则 | Agent擅长 |
DataAgent的答案:自然语言到SQL的转换涉及意图理解、Schema匹配、SQL生成、结果解释——这是一个典型的多步骤推理任务,Agent比固定规则更灵活。
How:技术方案是什么?
最后才考虑技术。基于前面的答案,选择合适的技术栈:
- • LLM选择:GPT-4、Claude、DeepSeek、本地模型?
- • 框架选择:LangChain、Spring AI、原生API?
- • 记忆方案:简单变量、数据库、向量数据库?
- • 工具生态:需要哪些外部工具?如何集成?
DataAgent的答案:Spring AI Alibaba Graph(与Java生态集成)、支持动态切换LLM(热插拔)、H2/MySQL存储、向量检索+关键词检索混合。
最小可行Agent(MVP)
1个LLM + 1个工具 + 1个记忆 = 最简单的Agent
让我们从一个极简Agent开始。这个Agent能查天气,并根据天气给出穿衣建议。
# weather_agent.py - 极简天气Agent
import os
import json
from datetime import datetime
# 模拟LLM(实际项目中调用OpenAI/DeepSeek API)
class MockLLM:
def chat(self, prompt):
# 这里应该是实际的API调用
# 为了演示,我们模拟一个简单推理
if "rain" in prompt.lower() or "雨" in prompt:
return "今天有雨,建议带伞,穿防水外套。"
elif "cold" in prompt.lower() or "冷" in prompt:
return "今天气温低,建议穿羽绒服。"
else:
return "今天天气不错,穿轻便衣物即可。"
# 工具:天气查询(模拟)
class WeatherTool:
def get_weather(self, city):
# 实际项目中调用天气API(如OpenWeatherMap、和风天气)
mock_data = {
"北京": {"temp": 5, "condition": "cold", "wind": "north 3级"},
"上海": {"temp": 18, "condition": "rain", "wind": "east 2级"},
"深圳": {"temp": 28, "condition": "sunny", "wind": "south 2级"}
}
return mock_data.get(city, {"temp": 20, "condition": "unknown", "wind": "calm"})
# 记忆:简单对话历史
class SimpleMemory:
def __init__(self, max_turns=5):
self.history = []
self.max_turns = max_turns
def add(self, role, content):
self.history.append({"role": role, "content": content, "time": datetime.now().isoformat()})
# 只保留最近N轮
if len(self.history) > self.max_turns * 2:
self.history = self.history[-self.max_turns * 2:]
def get_context(self):
return "\n".join([f"{h['role']}: {h['content']}" for h in self.history])
# Agent核心
class WeatherAgent:
def __init__(self):
self.llm = MockLLM()
self.weather_tool = WeatherTool()
self.memory = SimpleMemory()
def run(self, user_input):
# Step 1: 记录用户输入
self.memory.add("user", user_input)
# Step 2: 提取城市(简化版,实际可用NER或让LLM提取)
city = self._extract_city(user_input)
# Step 3: 调用工具获取天气
weather = self.weather_tool.get_weather(city)
# Step 4: 构建Prompt(包含记忆和工具结果)
prompt = f"""
你是一位贴心的天气助手。根据以下信息给出穿衣建议:
城市:{city}
天气:{weather['condition']}
温度:{weather['temp']}°C
风力:{weather['wind']}
对话历史:
{self.memory.get_context()}
请给出简洁的穿衣建议:
"""
# Step 5: 调用LLM生成回答
response = self.llm.chat(prompt)
# Step 6: 记录回答并返回
self.memory.add("assistant", response)
return response
def _extract_city(self, text):
# 简化版城市提取
cities = ["北京", "上海", "深圳", "广州", "杭州"]
for city in cities:
if city in text:
return city
return "北京" # 默认
# 运行Agent
if __name__ == "__main__":
agent = WeatherAgent()
print("=== 天气Agent演示 ===")
print("用户:北京今天天气怎么样?")
print("Agent:", agent.run("北京今天天气怎么样?"))
print()
print("用户:上海呢?")
print("Agent:", agent.run("上海呢?")) # 测试记忆:"呢"指代上海
print()
print("用户:深圳明天穿什么?")
print("Agent:", agent.run("深圳明天穿什么?"))这个极简Agent包含了Agent的四个核心要素:
| 组件 | 实现 | 说明 |
|---|---|---|
| LLM(大脑) | MockLLM | 实际项目中替换为真实API |
| 工具(手脚) | WeatherTool | 查询天气数据 |
| 记忆 | SimpleMemory | 保留最近N轮对话 |
| 规划 | run()方法 | 固定的执行步骤(提取→查询→生成) |
运行结果
=== 天气Agent演示 ===
用户:北京今天天气怎么样?
Agent:今天气温低,建议穿羽绒服。
用户:上海呢?
Agent:今天有雨,建议带伞,穿防水外套。
用户:深圳明天穿什么?
Agent:今天天气不错,穿轻便衣物即可。这个Agent虽然简单,但它展示了Agent的基本工作模式:感知(用户输入)→ 决策(提取城市)→ 行动(查询天气)→ 生成(LLM回答)。
从MVP到生产级的演进路径
一个玩具Agent和一个生产级Agent之间,隔着十万八千里。但不要被吓倒——所有复杂的系统,都是从简单的原型一步步长出来的。
阶段一:增加更多工具
从1个工具扩展到多个工具:
class ToolRegistry:
def __init__(self):
self.tools = {}
def register(self, name, tool):
self.tools[name] = tool
def execute(self, name, **params):
if name not in self.tools:
raise ValueError(f"Tool {name} not found")
return self.tools[name].run(**params)
# 注册多个工具
registry = ToolRegistry()
registry.register("weather", WeatherTool())
registry.register("stock", StockQueryTool()) # 新增:股票查询
registry.register("calendar", CalendarTool()) # 新增:日历管理
registry.register("email", EmailTool()) # 新增:发送邮件DataAgent有16+个节点,每个节点本质上是一个"工具"或"工具组合":SQL生成、SQL执行、Python代码执行、报告生成……
阶段二:引入规划能力
MVP的Agent执行固定流程,但复杂任务需要动态规划。DataAgent的PlannerNode就是为此而生:
用户:"分析一下上季度各地区的销售额,找出增长最快的前三个地区,
并预测下季度的趋势。"
PlannerNode生成计划:
1. SchemaRecallNode - 获取sales表结构
2. SqlGenerateNode - 生成查询上季度销售额的SQL
3. SqlExecuteNode - 执行SQL获取数据
4. PythonAnalyzeNode - 用Python计算增长率并排序
5. PythonAnalyzeNode - 用Python做趋势预测
6. ReportGeneratorNode - 生成分析报告规划能力让Agent从"按固定剧本演戏"升级为"根据任务自主编排"。
阶段三:添加记忆系统
从简单的对话历史,升级到长期记忆:
- • 短期记忆:当前对话的上下文(已具备)
- • 长期记忆:用户偏好、历史查询、常见问题的答案
- • 向量记忆:用Embedding将知识存入向量数据库,支持语义检索
DataAgent的MultiTurnContextManager管理多轮上下文,AgentVectorStoreService提供RAG检索能力。
阶段四:工作流编排
当节点多了以后,需要工作流引擎来管理执行顺序、条件分支、并行执行、错误处理。
DataAgent使用StateGraph模式:
// 伪代码:StateGraph的工作流定义
StateGraph graph = new StateGraph();
graph.addNode("intent", new IntentRecognitionNode());
graph.addNode("plan", new PlannerNode());
graph.addNode("sql_generate", new SqlGenerateNode());
graph.addNode("sql_execute", new SqlExecuteNode());
graph.addNode("report", new ReportGeneratorNode());
graph.addNode("human_feedback", new HumanFeedbackNode());
// 定义边(执行顺序)
graph.addEdge("intent", "plan");
graph.addEdge("plan", "human_feedback");
graph.addConditionalEdge("human_feedback",
state -> state.getHumanFeedback() ? "execute" : "revise");
graph.addEdge("execute", "report");阶段五:人机协作
生产级Agent必须考虑人在回路(Human-in-the-loop):
- • 计划确认:Agent生成计划后,人类确认或修改
- • 危险操作拦截:删除数据、修改配置前必须人工确认
- • 异常求助:Agent遇到不确定的情况,主动询问人类
DataAgent在PlannerNode后设置HumanFeedbackNode,使用interruptBefore机制暂停执行,等待人类反馈。
阶段六:安全与观测
最后但最重要的是安全和可观测性(我们在第十章和第十一章详细讲过):
- • 输入校验:防止Prompt注入、恶意输入
- • 输出过滤:敏感信息脱敏、有害内容拦截
- • 权限控制:不同用户能访问不同数据和工具
- • 全链路追踪:每个请求的可追溯、可审计
DataAgent的设计启示
DataAgent是一个拥有16+节点的复杂StateGraph,但它不是一天建成的。
启示一:从核心功能开始
DataAgent的起点是NL2SQL(自然语言转SQL)。这是最有价值、最核心的功能:
// DataAgentApplication.java - 简洁的入口
@SpringBootApplication
@EnableScheduling
public class DataAgentApplication {
public static void main(String[] args) {
SpringApplication.run(DataAgentApplication.class, args);
}
}整个应用的入口只有这几行。但背后是一个逐步扩展的过程:
- 1. V0.1:NL2SQL(意图识别 → SQL生成 → SQL执行 → 结果返回)
- 2. V0.2:增加Schema召回(解决表结构理解问题)
- 3. V0.3:增加RAG检索(解决业务术语映射问题)
- 4. V0.4:增加Planner(支持多步骤复杂查询)
- 5. V0.5:增加Python分析(支持深度数据挖掘)
- 6. V0.6:增加报告生成(输出美观的图表和文档)
- 7. V1.0:增加人机协作、安全控制、可观测性
启示二:每个Node都是独立可测试的
DataAgent的每个节点都实现了NodeAction接口,可以独立开发和单元测试:
// 伪代码:Node的标准接口
public interface NodeAction {
Map<String, Object> execute(OverAllState state);
}
// SqlGenerateNode可以独立测试
@Test
public void testSqlGenerateNode() {
SqlGenerateNode node = new SqlGenerateNode(llmService);
OverAllState state = new OverAllState();
state.put("query", "查询上个月的销售额");
state.put("schema", "table: sales(columns: amount, date, region)");
Map<String, Object> result = node.execute(state);
assertNotNull(result.get("sql"));
assertTrue(result.get("sql").toString().contains("SELECT"));
}这种设计的好处:
- • 并行开发:不同开发者负责不同节点,互不干扰
- • 独立测试:每个节点可以单独验证正确性
- • 灵活替换:想换一个新的SQL生成策略?直接替换
SqlGenerateNode
启示三:动态LLM切换的设计巧思
DataAgent的AiModelRegistry实现了运行时热切换LLM,这是一个非常实用的设计:
// AiModelRegistry.java(简化版)
@Component
public class AiModelRegistry {
private final DynamicModelFactory modelFactory;
private final ModelConfigDataService modelConfigDataService;
// volatile 保证多线程可见性
private volatile ChatClient currentChatClient;
private volatile EmbeddingModel currentEmbeddingModel;
/**
* 获取 ChatClient(懒加载 + 双重检查锁)
*/
public ChatClient getChatClient() {
if (currentChatClient == null) {
synchronized (this) {
if (currentChatClient == null) {
ModelConfigDTO config = modelConfigDataService
.getActiveConfigByType(ModelType.CHAT);
ChatModel chatModel = modelFactory.createChatModel(config);
currentChatClient = ChatClient.builder(chatModel).build();
}
}
}
return currentChatClient;
}
/**
* 刷新缓存(用于热切换)
*/
public void refreshChat() {
this.currentChatClient = null;
log.info("Chat cache cleared. Next call will create new instance.");
}
}这个设计的亮点:
- 1. 懒加载:第一次使用时才初始化,加快启动速度
- 2. 双重检查锁(DCL):线程安全的高效单例模式
- 3.
volatile关键字:确保多线程环境下缓存的可见性 - 4. 热切换:管理员在后台切换模型配置后,调用
refreshChat()清空缓存,下次请求自动使用新模型,无需重启服务
这种设计在生产环境中非常实用:
- • 发现某个模型API不稳定?立即切换到备用模型
- • 新模型上线了?无缝切换,用户无感知
- • A/B测试不同模型?动态切换,对比效果
扩展方向:Agent的未来
当你掌握了基础Agent开发后,可以向这些方向探索:
多模态Agent
不仅能处理文本,还能理解图像、语音、视频:
- • 图像理解:用户上传一张图表,Agent分析数据趋势
- • 语音交互:用户用语音提问,Agent语音回答
- • 文档解析:自动读取PDF、Word、Excel,提取关键信息
Agent生态:MCP协议
MCP(Model Context Protocol) 是Anthropic提出的开放协议,让Agent可以无缝连接各种外部工具和数据源。
DataAgent已经支持MCP Server:
// McpServerService.java(概念示意)
@Service
public class McpServerService {
// 提供 nl2SqlToolCallback 和 listAgentsToolCallback
// 外部Agent(如Claude Desktop)可以通过MCP协议调用DataAgent的能力
}这意味着DataAgent不仅能独立运行,还能作为"工具"被其他Agent调用,形成Agent生态。
自主Agent(AutoGPT类)
更进一步的Agent可以自主设定目标、分解任务、循环执行、自我纠错:
用户:"帮我调研一下新能源汽车市场"
Agent自主执行:
1. 搜索最新销量数据
2. 分析主要品牌市场份额
3. 查找政策新闻
4. 生成对比表格
5. 发现数据缺失,自动补充搜索
6. 生成完整报告
7. 询问用户是否需要深入某个方面这类Agent更智能,但也更难控制,需要更强的安全机制和人在回路设计。
学习资源推荐
官方文档
- • Spring AI Alibaba:https://github.com/alibaba/spring-ai-alibaba(DataAgent使用的框架)
- • LangChain:https://python.langchain.com(Python生态最流行的Agent框架)
- • OpenAI API:https://platform.openai.com/docs(理解LLM API的最佳起点)
开源项目
- • DataAgent:企业级数据分析Agent(本书案例)
- • AutoGPT:自主Agent的探索性项目
- • MetaGPT:多Agent协作框架
论文与博客
- • ReAct(Reasoning + Acting):LLM推理与行动结合的经典论文
- • LangGraph:基于图的Agent工作流框架
- • Toolformer:LLM学习使用工具的论文
全书总结
让我们用一张图总结全书的核心内容:
┌─────────────────────────────────────────────────────────────┐
│ Agent 的完整拼图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ LLM │ ← 大脑:推理、理解、生成 │
│ │ (大脑) │ │
│ └────┬────┘ │
│ │ │
│ ┌────┴────┬─────────┬─────────┬─────────┐ │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────┐ ┌────────┐ ┌──────┐ ┌────────┐ │
│ │Tools │ │Memory│ │Planning│ │Safety│ │Human │ │
│ │(手脚)│ │(记忆)│ │(规划) │ │(安全)│ │(协作) │ │
│ └──────┘ └──────┘ └────────┘ └──────┘ └────────┘ │
│ │ │ │ │ │ │
│ └─────────┴─────────┴─────────┴─────────┘ │
│ │ │
│ ▼ │
│ ┌────────────┐ │
│ │ Workflow │ ← 编排:StateGraph / 工作流引擎 │
│ │ (编排) │ │
│ └─────┬──────┘ │
│ │ │
│ ▼ │
│ ┌────────────┐ │
│ │Observability│ ← 观测:日志 / 指标 / 追踪 │
│ │ (观测) │ │
│ └────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘Agent = LLM(大脑) + Tools(手脚) + Memory(记忆) + Planning(规划) + Workflow(编排) + Human(协作) + Safety(安全) + Observability(观测)
这八个要素构成了生产级Agent系统的完整拼图。缺少任何一个,都可能在某个时刻出问题:
- • 没有Tools,Agent是" paralysis by analysis"(只会分析,不会行动)
- • 没有Memory,Agent是"金鱼"(每次对话都从零开始)
- • 没有Planning,Agent是"无头苍蝇"(复杂任务无从下手)
- • 没有Workflow,Agent是"一盘散沙"(节点各自为战)
- • 没有Human,Agent是"失控的列车"(可能做出危险决策)
- • 没有Safety,Agent是"定时炸弹"(容易被攻击、泄露数据)
- • 没有Observability,Agent是"黑箱"(出问题无法定位)
本章小结
- 1. 设计框架(5W1H):What、Who、When、Where、Why、How——从问题出发,而非技术出发
- 2. MVP Agent:1个LLM + 1个工具 + 1个记忆 = 最小可行Agent,先跑起来再优化
- 3. 演进路径:增加工具 → 引入规划 → 添加记忆 → 工作流编排 → 人机协作 → 安全与观测
- 4. DataAgent启示:从核心功能(NL2SQL)开始,每个Node独立可测试,支持动态LLM热切换
- 5. 未来方向:多模态Agent、MCP生态、自主Agent
最后的话:Agent开发没有终点,只有持续的迭代和优化。今天写的第一个Agent可能很简陋,但它是你成为Agent工程师的第一步。动手吧,现在就开始写你的第一个Agent!
思考题
- 1. 设计题:用本章的5W1H框架,设计一个"智能客服Agent"。请回答六个问题,并画出它的MVP架构图(可以用文字描述)。
- 2. 代码题:在天气Agent的示例基础上,增加一个"日历工具",让Agent能根据天气和用户的日程给出建议(例如:"今天有雨,而且你下午有户外会议,建议改到室内或带伞")。
- 3. 开放题:DataAgent从V0.1到V1.0经历了七个阶段的演进。如果你要设计一个"个人知识管理Agent"(帮助用户整理笔记、提取知识点、回答基于笔记的问题),你会如何规划它的演进路线?请列出前三个版本的核心功能。