跳过正文
LangGraph 工作流编排:构建有状态的 AI 应用

LangGraph 工作流编排:构建有状态的 AI 应用

·1324 字·7 分钟·
目录
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

LangChain 的 Chain 解决了"把几个 LLM 调用串联起来"的问题,但遇到需要循环、条件分支、中间等待人工介入、或者需要跨请求保持状态的场景,Chain 就显得力不从心。LangGraph 用状态机模型解决了这些问题。

为什么需要 LangGraph
#

看一个具体的痛点——你想实现一个"先分析问题,如果需要更多信息则继续追问,否则给出答案"的 Agent:

用 LangChain Chain 实现:

# 问题:如果 LLM 决定需要继续追问,你无法在 Chain 内部做循环
chain = prompt | llm | output_parser
# 只能单次执行,无法根据 LLM 的判断决定是否继续

用 LangGraph 实现:

# 可以定义:如果 LLM 输出了 "need_more_info",就跳回收集信息节点
# 直到 LLM 输出 "ready_to_answer" 才前进到答案节点

LangGraph 的核心价值

  1. 循环支持:可以无限迭代直到满足条件
  2. 条件分支:根据状态或 LLM 输出决定走哪条路
  3. Human-in-the-loop:在关键节点暂停等待人工确认
  4. 状态持久化:用 Checkpoint 保存中间状态,支持断点续跑和多轮对话
  5. 并行执行:多个无依赖的节点可以并发跑

核心概念
#

LangGraph 的三要素:

  • State:图的"记忆",是一个类型化的字典,在所有节点间共享和传递
  • Node:普通 Python 函数,接收 State,返回对 State 的更新
  • Edge:节点间的连接,可以是固定边(A → B)或条件边(根据状态决定走哪里)
State = 一个 TypedDict,记录整个工作流的所有数据
Node = def my_node(state: State) -> dict(返回要更新的字段)
Edge = 固定连接 或 条件函数(返回下一个节点的名字)

环境安装
#

pip install langgraph langchain-openai langchain-core

基础示例:问答 + 自我反思
#

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

# 1. 定义 State
class QAState(TypedDict):
    messages: Annotated[list, add_messages]  # add_messages 是追加语义,不是覆盖
    question: str
    answer: str
    needs_revision: bool
    revision_count: int

# 2. 定义节点
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def generate_answer(state: QAState) -> dict:
    """生成初始答案"""
    response = llm.invoke([
        SystemMessage(content="你是一个专业的技术助手,给出准确详细的回答。"),
        HumanMessage(content=state["question"])
    ])
    return {
        "answer": response.content,
        "messages": [response],
        "revision_count": state.get("revision_count", 0)
    }

def review_answer(state: QAState) -> dict:
    """自我审查答案质量"""
    review_prompt = f"""审查以下回答的质量:

问题:{state['question']}
回答:{state['answer']}

如果回答不够完整、有明显错误或需要补充,返回 "needs_revision"。
否则返回 "looks_good"。

只返回这两个选项之一,不要其他内容。"""

    response = llm.invoke([HumanMessage(content=review_prompt)])
    needs_revision = "needs_revision" in response.content.lower()

    return {
        "needs_revision": needs_revision,
        "revision_count": state.get("revision_count", 0)
    }

def revise_answer(state: QAState) -> dict:
    """修改答案"""
    response = llm.invoke([
        SystemMessage(content="你是一个专业的技术助手,请改进你的回答。"),
        HumanMessage(content=f"请改进以下对 '{state['question']}' 的回答,使其更完整准确:\n\n{state['answer']}")
    ])
    return {
        "answer": response.content,
        "revision_count": state["revision_count"] + 1
    }

# 3. 条件边函数
def should_revise(state: QAState) -> Literal["revise", "end"]:
    """决定是否需要修改"""
    if state["needs_revision"] and state.get("revision_count", 0) < 2:
        return "revise"
    return "end"

# 4. 构建图
builder = StateGraph(QAState)

builder.add_node("generate", generate_answer)
builder.add_node("review", review_answer)
builder.add_node("revise", revise_answer)

builder.set_entry_point("generate")
builder.add_edge("generate", "review")
builder.add_conditional_edges(
    "review",
    should_revise,
    {
        "revise": "revise",
        "end": END
    }
)
builder.add_edge("revise", "review")  # 修改后再审查,形成循环

graph = builder.compile()

# 5. 执行
result = graph.invoke({
    "question": "如何在Kubernetes中实现蓝绿部署?",
    "answer": "",
    "needs_revision": False,
    "revision_count": 0,
    "messages": []
})

print(f"最终答案(经过 {result['revision_count']} 次修改):")
print(result["answer"])

Human-in-the-loop:人工介入节点
#

生产中最重要的功能之一——在执行敏感操作前等待人工确认:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from typing import TypedDict

class OperationState(TypedDict):
    user_request: str
    plan: str          # LLM 制定的执行计划
    approved: bool     # 人工是否批准
    result: str

def plan_operations(state: OperationState) -> dict:
    """LLM 分析请求,制定操作计划"""
    response = llm.invoke([
        SystemMessage(content="""你是运维助手。分析用户请求,制定详细执行计划。
计划必须包含:
1. 影响范围
2. 具体操作步骤
3. 潜在风险
4. 回滚方案"""),
        HumanMessage(content=state["user_request"])
    ])
    return {"plan": response.content}

def execute_operations(state: OperationState) -> dict:
    """执行实际操作(人工批准后才会到达这里)"""
    if not state["approved"]:
        return {"result": "操作已取消"}

    # 实际执行逻辑
    # result = run_kubectl(state["plan"])
    result = f"已按计划执行:{state['plan'][:100]}..."
    return {"result": result}

# 构建需要人工介入的图
builder = StateGraph(OperationState)
builder.add_node("plan", plan_operations)
builder.add_node("execute", execute_operations)
builder.set_entry_point("plan")

# plan → 中断(等待人工) → execute
builder.add_edge("plan", "execute")

# 关键:在 plan 节点后设置中断点
graph = builder.compile(
    checkpointer=MemorySaver(),
    interrupt_after=["plan"]   # 在 plan 节点执行后暂停
)

# ---- 第一阶段:LLM 制定计划 ----
thread_config = {"configurable": {"thread_id": "op-001"}}

state_after_plan = graph.invoke(
    {"user_request": "将 nginx deployment 的副本数从2改为5"},
    config=thread_config
)

print("=== 执行计划 ===")
print(state_after_plan["plan"])
print("\n请确认是否执行?(yes/no)")

# ---- 等待人工输入 ----
user_input = input()

# ---- 第二阶段:根据人工决定继续执行 ----
approved = user_input.lower() == "yes"

# 更新状态中的 approved 字段
graph.update_state(
    thread_config,
    {"approved": approved}
)

# 从中断点继续执行
final_state = graph.invoke(None, config=thread_config)
print(f"\n执行结果:{final_state['result']}")

Checkpoint 持久化
#

生产中必须用持久化存储而不是内存,支持:跨进程恢复、服务重启后继续、多用户对话隔离。

使用 PostgreSQL 持久化
#

pip install langgraph-checkpoint-postgres psycopg2-binary
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg2

# 连接数据库
conn = psycopg2.connect(
    host="localhost",
    database="langgraph",
    user="postgres",
    password="password"
)

# 初始化 checkpoint 表(首次运行)
checkpointer = PostgresSaver(conn)
checkpointer.setup()  # 创建必要的表结构

# 编译图时传入持久化 checkpointer
graph = builder.compile(checkpointer=checkpointer)

# 使用 thread_id 区分不同对话/任务
thread_config = {"configurable": {"thread_id": "user-123-session-456"}}

# 第一次调用
result1 = graph.invoke(initial_state, config=thread_config)

# 程序重启后,用同一个 thread_id 恢复状态
# graph 会从上次中断的地方继续
result2 = graph.invoke(None, config=thread_config)

查看历史快照
#

# 查看某个 thread 的所有历史快照
history = list(graph.get_state_history(thread_config))
for snapshot in history:
    print(f"Step {snapshot.step}: {snapshot.values.keys()}")

# 回滚到某个历史状态
old_snapshot = history[-3]  # 三步前
graph.update_state(thread_config, old_snapshot.values)

实战:运维诊断工作流
#

整合以上所有概念,构建一个完整的运维诊断 Agent:

收集基本信息 → 分析症状 → [需要更多信息?] → 循环收集
                              ↓ 信息足够
                          生成诊断结论
                              ↓
                     [操作风险高?] → 人工确认 → 执行修复
                              ↓ 低风险
                          自动执行修复
                              ↓
                          验证结果 → [是否成功?] → 结束/重试
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import subprocess

llm = ChatOpenAI(model="gpt-4o", temperature=0)

class DiagnosticState(TypedDict):
    # 问题描述
    issue_description: str
    # 收集到的诊断信息
    collected_info: list[str]
    # 是否需要更多信息
    need_more_info: bool
    # 需要收集什么信息
    info_to_collect: list[str]
    # 诊断结论
    diagnosis: str
    # 修复方案
    fix_plan: str
    # 操作风险等级
    risk_level: Literal["low", "medium", "high"]
    # 是否已人工批准
    human_approved: bool
    # 执行结果
    execution_result: str
    # 是否成功
    is_resolved: bool
    # 迭代次数
    iteration_count: int

def collect_basic_info(state: DiagnosticState) -> dict:
    """自动收集基础诊断信息"""
    collected = []

    # 收集 K8s 状态(实际场景中替换为真实命令)
    commands = {
        "pods_status": "kubectl get pods -A --field-selector=status.phase!=Running 2>/dev/null | head -20",
        "recent_events": "kubectl get events -A --sort-by=.lastTimestamp 2>/dev/null | tail -20",
        "node_status": "kubectl get nodes 2>/dev/null",
    }

    for name, cmd in commands.items():
        try:
            result = subprocess.run(
                cmd, shell=True, capture_output=True, text=True, timeout=10
            )
            if result.stdout.strip():
                collected.append(f"[{name}]\n{result.stdout.strip()}")
        except Exception as e:
            collected.append(f"[{name}] 收集失败: {e}")

    return {"collected_info": collected}

def analyze_and_plan(state: DiagnosticState) -> dict:
    """分析症状,制定诊断和修复计划"""
    info_text = "\n\n".join(state["collected_info"])

    response = llm.invoke([
        SystemMessage(content="""你是资深SRE工程师,根据收集到的运维信息进行诊断。

返回JSON格式:
{
    "need_more_info": false,
    "info_to_collect": [],
    "diagnosis": "根本原因分析",
    "fix_plan": "具体修复步骤",
    "risk_level": "low|medium|high",
    "explanation": "诊断说明"
}

risk_level判断:
- low: 只读操作或查询命令
- medium: 配置变更或滚动重启
- high: 删除资源、扩缩容、涉及生产数据库"""),
        HumanMessage(content=f"""问题描述:{state['issue_description']}

已收集的信息:
{info_text}

请分析并给出诊断方案。""")
    ])

    import json, re
    result_text = response.content
    json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
    if json_match:
        result = json.loads(json_match.group())
        return {
            "need_more_info": result.get("need_more_info", False),
            "info_to_collect": result.get("info_to_collect", []),
            "diagnosis": result.get("diagnosis", ""),
            "fix_plan": result.get("fix_plan", ""),
            "risk_level": result.get("risk_level", "high"),
            "iteration_count": state.get("iteration_count", 0) + 1
        }
    return {"diagnosis": result_text, "risk_level": "high", "iteration_count": state.get("iteration_count", 0) + 1}

def collect_targeted_info(state: DiagnosticState) -> dict:
    """根据 LLM 要求收集特定信息"""
    new_info = state["collected_info"].copy()

    for info_request in state["info_to_collect"]:
        # 让 LLM 生成具体的 kubectl 命令
        cmd_response = llm.invoke([
            HumanMessage(content=f"生成一个 kubectl 命令来获取以下信息(只返回命令本身):{info_request}")
        ])
        cmd = cmd_response.content.strip().replace("```bash", "").replace("```", "").strip()

        try:
            result = subprocess.run(
                cmd, shell=True, capture_output=True, text=True, timeout=15
            )
            new_info.append(f"[{info_request}]\n{result.stdout.strip() or result.stderr.strip()}")
        except Exception as e:
            new_info.append(f"[{info_request}] 执行失败: {e}")

    return {"collected_info": new_info, "need_more_info": False}

def execute_fix(state: DiagnosticState) -> dict:
    """执行修复操作(需要人工批准或低风险自动执行)"""
    if state["risk_level"] == "high" and not state.get("human_approved"):
        return {"execution_result": "等待人工审批", "is_resolved": False}

    # 实际执行修复命令
    # result = run_fix_commands(state["fix_plan"])
    execution_result = f"已执行修复计划。风险等级:{state['risk_level']}"

    return {"execution_result": execution_result, "is_resolved": True}

def verify_fix(state: DiagnosticState) -> dict:
    """验证修复是否有效"""
    # 重新检查状态
    verify_result = subprocess.run(
        "kubectl get pods -A --field-selector=status.phase!=Running 2>/dev/null | wc -l",
        shell=True, capture_output=True, text=True
    )
    count = int(verify_result.stdout.strip() or "0")
    is_resolved = count <= 1  # 0条结果 + 1条header = 1

    return {"is_resolved": is_resolved}

# 条件边函数
def need_more_info_or_analyze(state: DiagnosticState) -> Literal["collect_targeted", "execute"]:
    if state["need_more_info"] and state["iteration_count"] < 3:
        return "collect_targeted"
    return "execute"

def check_risk_level(state: DiagnosticState) -> Literal["auto_execute", "wait_approval"]:
    if state["risk_level"] == "low":
        return "auto_execute"
    return "wait_approval"

def check_resolution(state: DiagnosticState) -> Literal["resolved", "retry"]:
    if state["is_resolved"]:
        return "resolved"
    return "retry"

# 构建图
builder = StateGraph(DiagnosticState)

builder.add_node("collect_basic", collect_basic_info)
builder.add_node("analyze", analyze_and_plan)
builder.add_node("collect_targeted", collect_targeted_info)
builder.add_node("execute", execute_fix)
builder.add_node("verify", verify_fix)

builder.set_entry_point("collect_basic")
builder.add_edge("collect_basic", "analyze")
builder.add_conditional_edges("analyze", need_more_info_or_analyze, {
    "collect_targeted": "collect_targeted",
    "execute": "execute"
})
builder.add_edge("collect_targeted", "analyze")  # 收集后重新分析
builder.add_edge("execute", "verify")
builder.add_conditional_edges("verify", check_resolution, {
    "resolved": END,
    "retry": "analyze"   # 修复无效,重新分析
})

# 在高风险操作前设置中断点
graph = builder.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["execute"]
)

# 使用工作流
def run_diagnostic(issue: str):
    thread_config = {"configurable": {"thread_id": f"diag-{hash(issue)}"}}

    # 阶段1:收集信息和分析
    state = graph.invoke(
        {
            "issue_description": issue,
            "collected_info": [],
            "iteration_count": 0,
            "human_approved": False,
            "is_resolved": False,
        },
        config=thread_config
    )

    print(f"\n=== 诊断结论 ===")
    print(f"根本原因:{state['diagnosis']}")
    print(f"\n=== 修复方案 ===")
    print(f"{state['fix_plan']}")
    print(f"\n风险等级:{state['risk_level']}")

    # 阶段2:人工确认(对于中/高风险操作)
    if state["risk_level"] in ["medium", "high"]:
        confirm = input("\n是否执行修复?(yes/no): ")
        if confirm.lower() != "yes":
            print("操作已取消")
            return

        graph.update_state(thread_config, {"human_approved": True})

    # 阶段3:执行修复
    final_state = graph.invoke(None, config=thread_config)
    print(f"\n执行结果:{final_state['execution_result']}")
    print(f"是否解决:{'是' if final_state['is_resolved'] else '否'}")

# 触发诊断
run_diagnostic("生产环境有多个Pod处于CrashLoopBackOff状态")

并行节点执行
#

对于独立的诊断任务,可以并行执行:

from langgraph.graph import StateGraph
from typing import TypedDict

class ParallelState(TypedDict):
    query: str
    pod_status: str
    node_status: str
    service_status: str
    summary: str

def check_pods(state: ParallelState) -> dict:
    result = subprocess.run("kubectl get pods -A", shell=True, capture_output=True, text=True)
    return {"pod_status": result.stdout}

def check_nodes(state: ParallelState) -> dict:
    result = subprocess.run("kubectl get nodes", shell=True, capture_output=True, text=True)
    return {"node_status": result.stdout}

def check_services(state: ParallelState) -> dict:
    result = subprocess.run("kubectl get svc -A", shell=True, capture_output=True, text=True)
    return {"service_status": result.stdout}

def summarize(state: ParallelState) -> dict:
    # 汇总三个并行检查的结果
    summary = llm.invoke([
        HumanMessage(content=f"""分析以下K8s集群状态:

Pods: {state['pod_status'][:500]}
Nodes: {state['node_status'][:500]}
Services: {state['service_status'][:500]}

给出简短的健康状态摘要。""")
    ])
    return {"summary": summary.content}

builder = StateGraph(ParallelState)
builder.add_node("check_pods", check_pods)
builder.add_node("check_nodes", check_nodes)
builder.add_node("check_services", check_services)
builder.add_node("summarize", summarize)

# 从 START 并行发散到三个检查节点
builder.set_entry_point("check_pods")  # 不能直接从 START 并行,需要用 fan-out

# 三个检查节点都完成后汇聚到 summarize
builder.add_edge("check_pods", "summarize")
builder.add_edge("check_nodes", "summarize")
builder.add_edge("check_services", "summarize")
builder.add_edge("summarize", END)

与 LangChain 的关系
#

LangGraph 是 LangChain 生态的一部分,可以在 Node 里直接用 LangChain 的所有组件:

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# 在 Node 函数里用 LCEL chain
def my_node(state: MyState) -> dict:
    chain = ChatPromptTemplate.from_messages([
        ("system", "你是一个专家"),
        ("human", "{input}")
    ]) | ChatOpenAI() | StrOutputParser()

    result = chain.invoke({"input": state["user_input"]})
    return {"result": result}

什么时候用 LangGraph,什么时候用普通 LangChain

  • 单次线性流程(A→B→C):用 LangChain LCEL 就够
  • 需要循环、分支、状态保持:用 LangGraph
  • 需要人工介入、恢复执行:必须用 LangGraph
Wenzhuo Huang
作者
Wenzhuo Huang
搞运维的工程师,写代码的运维人。专注 Kubernetes、AWS、GitOps 与基础设施可靠性。这个博客既是我的技术笔记本,也是我踩过的坑的受害者档案。
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

相关文章

LangChain 从入门到实战:构建 LLM 应用的工程框架

·1045 字·5 分钟
LangChain 是构建 LLM 应用最流行的框架,但也是踩坑最多的框架之一。本文从 LCEL 表达式、ReAct Agent、LangGraph 工作流到生产部署,梳理真正有用的部分,并指出哪些功能实际工程中应该避免。

Langfuse:LLM 应用可观测性平台实战

·836 字·4 分钟
讲清楚为什么LLM应用必须要可观测性,以及如何用Langfuse从链路追踪、Prompt版本管理、评估实验到成本分析做到全覆盖,包含Docker自托管部署和Python SDK完整集成示例。

RAG 系统设计与实战:检索增强生成完全指南

·1157 字·6 分钟
RAG(检索增强生成)是目前企业落地 LLM 最主流的方式。本文覆盖 RAG 系统的完整设计:文档处理管线、分块策略、向量检索与关键词混合检索、Rerank 重排序、上下文压缩,以及用 RAGAS 框架评估 RAG 质量,最后分享生产环境踩坑记录。