跳过正文
AI Agent 设计模式:从单步到复杂工作流

AI Agent 设计模式:从单步到复杂工作流

·1310 字·7 分钟·
目录
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

很多工程师对"Agent"的理解是"更聪明的ChatGPT",这个理解偏差会导致对Agent能力的误判——期待过高,或者根本没用到它的核心价值。

Agent的本质是一个循环执行的系统:LLM作为大脑,规划下一步行动;工具函数作为手脚,执行具体操作;观察结果反馈给LLM,继续规划,直到任务完成。下面从运维场景出发把这套机制和常见陷阱讲清楚。


Agent vs 普通LLM调用
#

先弄清楚区别,才知道什么时候该用Agent。

普通LLM调用

用户输入 → LLM → 输出(一次性)

Agent

用户输入 → LLM规划 → 执行工具 → 观察结果 → LLM再规划 → 执行工具 → ... → 输出

适合用Agent的场景特征:

  • 任务需要多步骤才能完成
  • 每一步的输入依赖上一步的结果
  • 需要与外部系统交互(查数据、执行命令、调API)
  • 任务路径事先不确定(要根据中间结果决定下一步)

不适合用Agent的场景:

  • 单次问答(RAG问答、文本生成)
  • 任务流程固定(直接用工作流即可)
  • 对延迟要求极高(Agent的多轮循环会增加延迟)

ReAct推理循环
#

ReAct(Reasoning + Acting)是目前最主流的Agent推理框架,理解它是理解所有Agent框架的基础。

核心循环
#

每一步,LLM接收到这样的输入:

系统提示词(定义Agent角色和可用工具)

历史行动记录:
Thought: [之前的思考]
Action: [之前执行的工具]
Observation: [工具返回的结果]

当前任务:[用户的原始输入]

LLM输出:

Thought: [分析当前情况,决定下一步]
Action: [要调用的工具名称]
Action Input: [工具的输入参数]

系统解析这个输出,执行工具,把结果作为Observation加入历史,然后再次调用LLM,直到LLM输出Final Answer

完整示例
#

任务:检查服务器的CPU使用率,如果超过80%,找出占用最高的进程并发告警。

Thought: 我需要先检查CPU使用率。我会用get_cpu_usage工具。
Action: get_cpu_usage
Action Input: {"host": "prod-web-01"}

Observation: {"cpu_usage": 92.3, "load_avg_1m": 8.5, "load_avg_5m": 7.2}

Thought: CPU使用率是92.3%,超过了80%的阈值。我需要找出占用CPU最高的进程。
Action: get_top_processes
Action Input: {"host": "prod-web-01", "limit": 5}

Observation: [
  {"pid": 12345, "name": "java", "cpu": 45.2, "mem": 2048},
  {"pid": 12346, "name": "python", "cpu": 30.1, "mem": 512},
  ...
]

Thought: 找到了,主要是java进程(PID 12345)占用了45.2%的CPU。
现在需要发送告警,包含:主机名、CPU使用率、主要进程信息。
Action: send_alert
Action Input: {
  "severity": "warning",
  "title": "CPU使用率过高",
  "message": "主机 prod-web-01 CPU使用率 92.3%,主要进程:java(PID:12345, CPU:45.2%)",
  "channel": "ops-alerts"
}

Observation: {"status": "sent", "alert_id": "ALT-20240412-001"}

Thought: 告警已发送成功。任务完成。
Final Answer: prod-web-01 的 CPU 使用率为 92.3%(超过阈值 80%),主要消耗来自 java 进程(PID: 12345,占用 45.2%)。已发送告警到 ops-alerts 频道(告警ID: ALT-20240412-001)。

Tool调用设计
#

工具是Agent的行动能力,设计好工具是Agent工程里最重要的环节。

工具粒度原则
#

原则1:单一职责

每个工具只做一件事。不要设计"超级工具":

# 不好:做了太多事,LLM难以决定何时调用
@tool
def manage_k8s_deployment(action: str, name: str, namespace: str, ...):
    """管理K8s Deployment:可以查看、更新、重启、回滚"""
    ...

# 好:每个操作独立
@tool
def get_deployment_status(name: str, namespace: str) -> dict:
    """获取Deployment的当前状态、副本数、镜像版本"""
    ...

@tool
def scale_deployment(name: str, namespace: str, replicas: int) -> dict:
    """调整Deployment的副本数"""
    ...

@tool
def rollback_deployment(name: str, namespace: str, revision: int = 0) -> dict:
    """回滚Deployment到指定版本,revision=0表示回滚到上一个版本"""
    ...

原则2:工具描述要精确

LLM靠工具的描述(docstring)决定什么时候调用哪个工具。描述含糊会导致工具被错误调用:

# 不好:太模糊
@tool
def check_k8s(name: str, namespace: str) -> dict:
    """检查K8s资源"""
    ...

# 好:明确说明用途、参数、返回值
@tool
def get_pod_logs(
    pod_name: str,
    namespace: str,
    container: str = "",
    previous: bool = False,
    tail_lines: int = 100
) -> str:
    """
    获取Pod的容器日志。
    
    参数:
    - pod_name: Pod名称(完整名称,不是Deployment名)
    - namespace: 命名空间
    - container: 容器名,如果Pod只有一个容器可以不填
    - previous: True表示获取上次崩溃前的日志(用于排查CrashLoopBackOff)
    - tail_lines: 获取最后N行,默认100,最大1000
    
    返回:日志文本,如果Pod不存在返回错误信息
    
    注意:不要用这个工具获取运行中服务的实时流式日志,只用于事后分析
    """
    ...

原则3:工具要有防御性

工具函数是Agent直接作用于真实系统的接口,需要比普通代码更健壮:

@tool
def execute_kubectl_command(command: str, dry_run: bool = True) -> dict:
    """
    执行kubectl命令。
    警告:dry_run=False时会真实修改集群,谨慎使用。
    建议:先用dry_run=True验证命令正确性。
    """
    import subprocess
    import shlex
    
    # 安全检查:不允许某些危险操作
    dangerous_patterns = ['delete', 'drain', 'cordon', 'taint']
    if any(p in command.lower() for p in dangerous_patterns) and not dry_run:
        return {
            "success": False,
            "error": f"危险操作需要明确确认。请先用dry_run=True验证,然后由人工确认后执行。"
        }
    
    # 在dry_run模式下添加--dry-run=client
    if dry_run and 'apply' in command or 'create' in command:
        command = command + ' --dry-run=client'
    
    try:
        result = subprocess.run(
            shlex.split(f"kubectl {command}"),
            capture_output=True,
            text=True,
            timeout=30
        )
        return {
            "success": result.returncode == 0,
            "stdout": result.stdout[:5000],  # 限制输出长度
            "stderr": result.stderr[:1000],
            "dry_run": dry_run
        }
    except subprocess.TimeoutExpired:
        return {"success": False, "error": "命令执行超时(30秒)"}
    except Exception as e:
        return {"success": False, "error": str(e)}

错误处理
#

工具返回的错误信息对Agent的后续决策至关重要。错误信息要有足够信息量:

# 不好:Agent不知道该怎么办
return {"error": "failed"}

# 好:Agent能根据错误信息调整策略
return {
    "success": False,
    "error_type": "not_found",
    "error": f"Pod '{pod_name}' 在 namespace '{namespace}' 中不存在",
    "suggestion": "请用 list_pods(namespace='{namespace}') 查看可用的Pod列表"
}

Multi-Agent协作模式
#

单个Agent处理复杂任务时可能力不从心:上下文太长(工具调用历史很占token)、任务需要并行处理、不同子任务需要不同的专业角色。

Multi-Agent把大任务分解给多个专门的Agent处理。

Orchestrator-Worker模式
#

最常见的模式:一个Orchestrator负责任务分解和协调,多个Worker负责执行具体子任务。

from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.tools import tool

class OrchestratorAgent:
    """负责任务分解和协调"""
    
    def __init__(self, llm, worker_agents):
        self.llm = llm
        self.workers = worker_agents
        # Orchestrator的工具是调用各个Worker
        self.tools = [self._create_worker_tool(name, agent) 
                      for name, agent in worker_agents.items()]
    
    def _create_worker_tool(self, name: str, agent: AgentExecutor):
        @tool(name=f"assign_to_{name}")
        def worker_tool(task: str) -> str:
            f"""将子任务分配给{name}Agent处理。
            适用场景:{agent.agent.llm_chain.prompt.template[:200]}
            """
            result = agent.invoke({"input": task})
            return result["output"]
        return worker_tool
    
    def run(self, task: str) -> str:
        agent = create_react_agent(self.llm, self.tools, orchestrator_prompt)
        executor = AgentExecutor(agent=agent, tools=self.tools, verbose=True)
        result = executor.invoke({"input": task})
        return result["output"]

# 使用示例
log_analysis_agent = create_log_analysis_agent(llm, log_tools)
metric_analysis_agent = create_metric_analysis_agent(llm, metric_tools)
alert_agent = create_alert_agent(llm, alert_tools)

orchestrator = OrchestratorAgent(
    llm=llm,
    worker_agents={
        "log_analyst": log_analysis_agent,
        "metric_analyst": metric_analysis_agent,
        "alert_sender": alert_agent,
    }
)

result = orchestrator.run("prod-api服务出现5xx错误激增,请分析原因并发送告警")

并行执行模式
#

对于相互独立的子任务,并行执行可以显著减少总耗时:

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def parallel_analysis(service_name: str):
    """并行执行多个独立分析任务"""
    
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(max_workers=3)
    
    # 三个独立任务并行执行
    tasks = [
        loop.run_in_executor(executor, analyze_logs, service_name),
        loop.run_in_executor(executor, analyze_metrics, service_name),
        loop.run_in_executor(executor, check_dependencies, service_name),
    ]
    
    log_result, metric_result, dep_result = await asyncio.gather(*tasks)
    
    # 综合分析
    summary_agent = create_summary_agent(llm)
    final_result = summary_agent.run(
        f"服务:{service_name}\n"
        f"日志分析:{log_result}\n"
        f"指标分析:{metric_result}\n"
        f"依赖检查:{dep_result}\n"
        "请综合以上信息给出根因分析和处置建议"
    )
    return final_result

Human-in-the-loop设计
#

完全自动化的Agent有时候不是最好的选择,特别是涉及生产环境操作时。Human-in-the-loop在关键步骤加入人工审批。

需要人工审批的场景
#

  • 向生产环境写入/修改数据
  • 重启生产服务
  • 变更网络/防火墙规则
  • 删除任何资源
  • 超过一定金额的费用操作

实现方式
#

方式1:工具层拦截

class HumanApprovalRequired(Exception):
    def __init__(self, action: str, details: dict):
        self.action = action
        self.details = details
        super().__init__(f"此操作需要人工审批:{action}")

@tool
def restart_production_service(service_name: str, namespace: str) -> dict:
    """重启生产服务(需要人工审批)"""
    
    # 发送审批请求
    approval_id = send_approval_request(
        action="restart_service",
        details={"service": service_name, "namespace": namespace},
        timeout_seconds=300  # 5分钟内审批
    )
    
    # 等待审批结果
    approved = wait_for_approval(approval_id, timeout=300)
    
    if not approved:
        raise HumanApprovalRequired(
            action="restart_service",
            details={"service": service_name, "approved": False}
        )
    
    # 执行实际操作
    result = _do_restart_service(service_name, namespace)
    return {"success": True, "approval_id": approval_id, **result}

方式2:Plan-then-Execute模式

Agent先规划整个执行方案,人工审批后再执行:

def run_with_approval(task: str, llm, tools) -> str:
    # 第一阶段:只规划,不执行
    planner_prompt = """
    分析任务并给出执行计划,只描述步骤,不要实际执行任何操作。
    对每个步骤标注风险等级(低/中/高)。
    """
    plan = planner_agent.run(task)
    
    # 输出计划,等待人工审批
    print("=== 执行计划 ===")
    print(plan)
    
    # 这里可以集成Slack/钉钉审批
    approval = input("确认执行以上计划?(yes/no): ")
    
    if approval.lower() != 'yes':
        return "操作已取消"
    
    # 第二阶段:执行
    executor_prompt = "按照以下计划执行操作:\n" + plan
    return executor_agent.run(executor_prompt)

内存与状态管理
#

Agent的"记忆"影响多轮对话和长任务的效果。

对话历史(Short-term Memory)
#

默认的ConversationBufferMemory会保留所有历史,长任务后context会爆炸。用滑动窗口或摘要:

from langchain.memory import ConversationSummaryBufferMemory

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # 超过这个就压缩
    return_messages=True
)

任务状态(Working Memory)
#

对于多步骤任务,用结构化状态跟踪进度:

from dataclasses import dataclass, field
from typing import List, Dict, Any
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class AgentState:
    task_id: str
    original_task: str
    status: TaskStatus = TaskStatus.PENDING
    completed_steps: List[str] = field(default_factory=list)
    findings: Dict[str, Any] = field(default_factory=dict)
    errors: List[str] = field(default_factory=list)
    
    def add_finding(self, key: str, value: Any):
        self.findings[key] = value
    
    def to_context(self) -> str:
        """生成给LLM的状态上下文"""
        return f"""
当前任务:{self.original_task}
已完成步骤:{', '.join(self.completed_steps) if self.completed_steps else '无'}
发现:{self.findings}
错误:{self.errors if self.errors else '无'}
"""

外部记忆(Long-term Memory)
#

把重要的发现和解决方案存入知识库,下次遇到类似问题可以检索:

@tool
def save_to_knowledge_base(title: str, problem: str, solution: str, tags: list) -> dict:
    """
    将本次解决的问题和方案保存到知识库,供以后参考。
    在每次成功解决问题后调用。
    """
    entry = {
        "title": title,
        "problem": problem,
        "solution": solution,
        "tags": tags,
        "timestamp": datetime.now().isoformat()
    }
    # 存入向量数据库
    vector_store.add_texts(
        texts=[f"{title}\n{problem}\n{solution}"],
        metadatas=[entry]
    )
    return {"saved": True, "id": entry["timestamp"]}

运维Agent实战
#

告警分析Agent
#

接收告警,自动分析根因并给出处置建议:

from langchain_anthropic import ChatAnthropic
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import tool
import subprocess

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

@tool
def get_pod_status(namespace: str, label_selector: str = "") -> str:
    """获取指定namespace的Pod状态列表"""
    cmd = f"kubectl get pods -n {namespace}"
    if label_selector:
        cmd += f" -l {label_selector}"
    cmd += " -o wide"
    result = subprocess.run(cmd.split(), capture_output=True, text=True, timeout=15)
    return result.stdout or result.stderr

@tool
def get_recent_events(namespace: str, pod_name: str = "") -> str:
    """获取最近的K8s事件,按时间倒序"""
    if pod_name:
        cmd = f"kubectl get events -n {namespace} --field-selector involvedObject.name={pod_name} --sort-by=.lastTimestamp"
    else:
        cmd = f"kubectl get events -n {namespace} --sort-by=.lastTimestamp"
    result = subprocess.run(cmd.split(), capture_output=True, text=True, timeout=15)
    return result.stdout[-3000:] if result.stdout else result.stderr  # 只返回最近的

@tool
def get_pod_logs(namespace: str, pod_name: str, tail: int = 100, previous: bool = False) -> str:
    """获取Pod日志"""
    cmd = f"kubectl logs -n {namespace} {pod_name} --tail={tail}"
    if previous:
        cmd += " --previous"
    result = subprocess.run(cmd.split(), capture_output=True, text=True, timeout=30)
    return result.stdout[-4000:] if result.stdout else result.stderr

@tool
def get_deployment_info(namespace: str, deployment_name: str) -> str:
    """获取Deployment详情,包括replica状态和最近的更新历史"""
    result = subprocess.run(
        f"kubectl describe deployment {deployment_name} -n {namespace}".split(),
        capture_output=True, text=True, timeout=15
    )
    return result.stdout[:4000]

@tool
def notify_slack(channel: str, message: str, severity: str = "warning") -> dict:
    """发送Slack通知"""
    # 实际接入Slack Webhook
    webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
    color_map = {"critical": "#FF0000", "warning": "#FFA500", "info": "#36A64F"}
    payload = {
        "channel": channel,
        "attachments": [{
            "color": color_map.get(severity, "#FFA500"),
            "text": message,
            "footer": "AlertAnalysisAgent"
        }]
    }
    # requests.post(webhook_url, json=payload)
    return {"sent": True, "channel": channel}

# Agent系统提示词
ALERT_AGENT_PROMPT = """你是一个K8s运维专家Agent,负责分析告警并给出处置建议。

可用工具:
{tools}

工具名称列表:{tool_names}

工作流程:
1. 分析告警信息,理解告警的含义和可能的影响
2. 使用工具收集更多信息(Pod状态、事件、日志)
3. 根据收集到的信息分析根因
4. 给出明确的处置建议
5. 如果告警严重,发送Slack通知

重要规则:
- 不要执行任何修改操作,只做分析
- 如果信息不足,继续收集,不要猜测
- 最终给出:根因(一句话)、影响范围、处置建议(具体步骤)

格式:
{{
  "root_cause": "...",
  "impact": "...",
  "recommended_actions": ["步骤1", "步骤2", ...],
  "urgency": "critical/high/medium/low"
}}

使用这个格式:

Question: {{input}}
Thought: {{agent_scratchpad}}
"""

tools = [get_pod_status, get_recent_events, get_pod_logs, 
         get_deployment_info, notify_slack]

from langchain import hub
from langchain.prompts import PromptTemplate

prompt = PromptTemplate.from_template(ALERT_AGENT_PROMPT)
agent = create_react_agent(llm, tools, prompt)
alert_analyzer = AgentExecutor(
    agent=agent, 
    tools=tools, 
    verbose=True,
    max_iterations=10,
    handle_parsing_errors=True
)

# 使用:接到告警时调用
def handle_alert(alert: dict):
    alert_text = f"""
    告警名称:{alert['alertname']}
    严重级别:{alert['severity']}
    命名空间:{alert.get('namespace', 'unknown')}
    Pod:{alert.get('pod', 'unknown')}
    描述:{alert.get('summary', '')}
    触发时间:{alert.get('startsAt', '')}
    """
    
    result = alert_analyzer.invoke({"input": alert_text})
    return result["output"]

巡检Agent
#

定时巡检集群健康状态,输出结构化报告:

INSPECTION_PROMPT = """
你是一个K8s集群巡检Agent。

你的任务是系统性地检查集群健康状态,按照以下顺序检查:
1. 节点状态(有无NotReady节点)
2. 系统命名空间的Pod状态(kube-system, monitoring)
3. 业务命名空间的异常Pod(CrashLoopBackOff, OOMKilled)
4. 最近1小时的Warning级别事件
5. PVC使用状态

每项检查完成后记录结果,最终生成巡检报告。

报告格式:
## 巡检时间
## 总体健康度(正常/需关注/异常)
## 各项检查结果
## 需要跟进的问题
## 建议操作
"""

# 巡检任务通常通过cron触发
def run_daily_inspection():
    agent_executor = AgentExecutor(
        agent=create_react_agent(llm, inspection_tools, inspection_prompt),
        tools=inspection_tools,
        verbose=False,
        max_iterations=20
    )
    
    result = agent_executor.invoke({
        "input": "执行今日集群巡检,检查所有生产namespace"
    })
    
    # 保存报告
    with open(f"/reports/inspection_{date.today()}.md", "w") as f:
        f.write(result["output"])
    
    # 发送到钉钉
    send_dingtalk_report(result["output"])

常见陷阱
#

陷阱1:工具太多

给Agent提供超过15个工具时,LLM选择工具的准确率会明显下降。解决:

  • 把工具分组,用不同的专门Agent处理不同类别的任务
  • 或者用动态工具选择(先让LLM选择需要哪些工具,再实际提供)

陷阱2:无限循环

Agent陷入循环:A工具返回错误 → 换B工具 → B也报错 → 换回A…

防止方法:

  • 设置max_iterations上限(通常10-15次足够)
  • 在工具里检测并拒绝明显错误的参数
  • 在提示词里明确"如果连续3次失败,停止尝试并报告原因"

陷阱3:上下文膨胀

长任务后工具调用历史会占满上下文,导致后续行动质量下降。

解决:

  • 使用ConversationSummaryBufferMemory定期压缩历史
  • 设计Agent在关键节点主动总结已知信息

陷阱4:对工具结果过度信任

LLM倾向于相信工具返回的结果,即使结果明显有问题。

在工具里加断言:

@tool
def get_cpu_usage(host: str) -> dict:
    usage = _fetch_cpu_usage(host)
    # 断言结果合理
    assert 0 <= usage <= 100, f"CPU usage {usage} out of range [0, 100]"
    return {"host": host, "cpu_usage": usage}
Wenzhuo Huang
作者
Wenzhuo Huang
搞运维的工程师,写代码的运维人。专注 Kubernetes、AWS、GitOps 与基础设施可靠性。这个博客既是我的技术笔记本,也是我踩过的坑的受害者档案。
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

相关文章

LLM 微调入门:LoRA 让大模型适配私有场景

·767 字·4 分钟
什么时候该微调、什么时候该用提示工程?本文给出决策框架,然后用Unsloth+QLoRA实战微调Qwen2.5-7B,覆盖数据格式、训练监控、权重合并、部署到vLLM测试,以及10个真实踩坑记录。

2026 大模型全景:主力模型横评与选型指南

·788 字·4 分钟
GPT-5.4、Claude Opus 4.6、Gemini 2.5 Pro、Llama 4 Scout、DeepSeek V3.2——2026年4月的大模型格局已经和一年前完全不同。本文从工程师视角梳理当前主力模型的真实规格与适用边界,给出场景化选型矩阵,并讨论开源追平闭源、推理模型标配化、agent workload 崛起这三个2026年的核心判断。

大模型核心概念:工程师需要理解的 LLM 基础

·786 字·4 分钟
同事第一次用 GPT-4 API 写代码时问我:为什么我发了一段中文,token 消耗比英文多那么多?为什么模型有时候会一本正经地胡说八道?这篇文章把我认为工程师必须理解的 LLM 概念系统整理了一遍,不涉及 Transformer 数学,只讲对你写代码有帮助的部分。