跳过正文
Python 异步编程实战:asyncio 在 AI 应用中的使用

Python 异步编程实战:asyncio 在 AI 应用中的使用

·1613 字·8 分钟·
目录
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

我在做第一个 LLM 应用时犯过一个典型错误:用同步方式调用 OpenAI API,串行处理用户请求。测试的时候没问题,上了 10 个并发用户就开始排队,响应时间从 2 秒飙到 20 秒。

这篇文章从「为什么 AI 应用必须用异步」出发,系统地讲 asyncio 的核心概念和在 AI 应用场景中的实战用法。

同步 vs 异步:本质区别
#

先用一个具体例子说清楚区别。假设你要从 5 个不同的 LLM 获取答案(多模型路由场景):

同步做法(线性等待):

import time
import openai

def call_llm_sync(model, prompt):
    """同步调用,会阻塞线程"""
    client = openai.OpenAI()
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

def get_multi_model_answers(prompt):
    models = ["gpt-4o", "gpt-4o-mini", "gpt-3.5-turbo"]
    results = []

    start = time.time()
    for model in models:
        result = call_llm_sync(model, prompt)  # 串行等待,每个约 2 秒
        results.append(result)

    print(f"总耗时: {time.time() - start:.1f}s")  # ~6 秒
    return results

异步做法(并发等待):

import asyncio
import time
import openai

async def call_llm_async(model, prompt):
    """异步调用,释放线程给其他任务"""
    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def get_multi_model_answers(prompt):
    models = ["gpt-4o", "gpt-4o-mini", "gpt-3.5-turbo"]

    start = time.time()
    results = await asyncio.gather(
        *[call_llm_async(model, prompt) for model in models]  # 并发等待
    )

    print(f"总耗时: {time.time() - start:.1f}s")  # ~2 秒(取最慢的那个)
    return results

从 6 秒降到 2 秒——这就是异步的价值。

本质区别:同步代码在等待 I/O(网络请求、磁盘读写)时,线程会被阻塞,什么都不能做。异步代码在遇到 await 时,会把控制权交还给 event loop,让 event loop 去处理其他任务,I/O 完成后再回来继续执行。

asyncio 核心概念
#

Event Loop:单线程的任务调度器
#

Event loop 是 asyncio 的核心——一个不断循环的调度器,监听各种事件(I/O 完成、定时器到期),并在事件发生时调用对应的回调函数或恢复对应的协程。

import asyncio

# 获取当前 event loop(Python 3.10+)
loop = asyncio.get_event_loop()

# 运行一个协程
asyncio.run(main())  # 这会创建一个新的 event loop,运行完后关闭

关键认知:asyncio 是单线程的。所有协程在同一个线程中运行,通过主动让出控制权(await)来实现并发。这意味着:

  • 没有 GIL 问题(本来就单线程)
  • 协程之间的切换是协作式的,不是抢占式的
  • CPU 密集型任务不适合 asyncio(会阻塞整个 loop)

Coroutine:可暂停的函数
#

async def 定义的函数是协程函数,调用它会返回一个协程对象,不会立即执行。

async def my_coroutine():
    print("开始执行")
    await asyncio.sleep(1)  # 让出控制权 1 秒
    print("继续执行")

# 错误:直接调用不会执行
# my_coroutine()  # 只是创建了一个协程对象,没有运行

# 正确:用 await 或 asyncio.run()
asyncio.run(my_coroutine())

Task:已调度的协程
#

Task 是把协程包装成可以并发运行的任务。asyncio.gatherasyncio.create_task 都会创建 Task。

async def main():
    # 方式一:create_task 立即调度(不等待)
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    # 此时 task1 和 task2 已经在运行了

    # 做其他事情...
    await asyncio.sleep(0)  # 给 task1/task2 一个执行机会

    # 最后等待结果
    result1 = await task1
    result2 = await task2

    # 方式二:gather 并发等待
    results = await asyncio.gather(
        fetch_data("url1"),
        fetch_data("url2"),
        return_exceptions=True  # 某个任务失败不影响其他任务
    )

async/await 语法精要和常见错误
#

基础用法
#

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:  # async with:异步上下文管理器
        return await response.text()

async def fetch_multiple():
    urls = [
        "https://api.example.com/data/1",
        "https://api.example.com/data/2",
        "https://api.example.com/data/3",
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

常见错误 1:忘记 await
#

# 错误:没有 await,response 是一个协程对象,不是结果
async def wrong():
    response = client.chat.completions.create(...)  # 忘记 await
    print(response.choices)  # AttributeError: coroutine has no attribute 'choices'

# 正确
async def correct():
    response = await client.chat.completions.create(...)
    print(response.choices[0].message.content)

常见错误 2:在异步函数中调用同步阻塞函数
#

import time
import asyncio

# 错误:time.sleep 会阻塞整个 event loop!
async def wrong_sleep():
    await some_async_work()
    time.sleep(2)  # 这 2 秒内,整个程序都被冻结
    await more_async_work()

# 正确:用 asyncio.sleep
async def correct_sleep():
    await some_async_work()
    await asyncio.sleep(2)  # 让出控制权,其他任务可以运行
    await more_async_work()

# 对于无法避免的阻塞调用(如 CPU 密集计算、旧的同步库),用线程池
async def run_blocking_in_thread():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None,  # 使用默认 ThreadPoolExecutor
        blocking_function,  # 阻塞函数
        arg1, arg2
    )
    return result

并发调用多个 LLM API:asyncio.gather 实战
#

这是 AI 应用中最常见的异步模式——同时向多个模型发请求,或者并发执行多个独立的 LLM 任务。

多模型并发与结果聚合
#

import asyncio
import anthropic
from openai import AsyncOpenAI
from typing import Optional

openai_client = AsyncOpenAI()
anthropic_client = anthropic.AsyncAnthropic()

async def call_openai(prompt: str) -> Optional[str]:
    try:
        response = await openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            timeout=30
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"OpenAI 调用失败: {e}")
        return None

async def call_claude(prompt: str) -> Optional[str]:
    try:
        response = await anthropic_client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.content[0].text
    except Exception as e:
        print(f"Claude 调用失败: {e}")
        return None

async def ensemble_query(prompt: str) -> dict:
    """并发调用多个模型,返回所有结果"""
    results = await asyncio.gather(
        call_openai(prompt),
        call_claude(prompt),
        return_exceptions=True  # 不因一个失败而中断其他
    )

    return {
        "gpt4o": results[0] if not isinstance(results[0], Exception) else None,
        "claude": results[1] if not isinstance(results[1], Exception) else None,
    }

# 使用
async def main():
    result = await ensemble_query("用三句话解释量子纠缠")
    for model, answer in result.items():
        print(f"\n=== {model} ===")
        print(answer)

asyncio.run(main())

带并发限制的批量处理
#

并发调用 API 时要注意速率限制(Rate Limit),用 asyncio.Semaphore 控制并发数:

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def process_single(semaphore: asyncio.Semaphore, text: str) -> str:
    async with semaphore:  # 信号量控制并发数
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": f"总结以下文本:{text}"}],
        )
        return response.choices[0].message.content

async def batch_summarize(texts: list[str], max_concurrency: int = 5) -> list[str]:
    """批量总结,最多 5 个并发请求"""
    semaphore = asyncio.Semaphore(max_concurrency)
    tasks = [process_single(semaphore, text) for text in texts]
    return await asyncio.gather(*tasks, return_exceptions=True)

# 处理 100 条文本,每次最多 5 个并发
async def main():
    texts = [f"这是第 {i} 段需要总结的文本..." for i in range(100)]
    results = await batch_summarize(texts, max_concurrency=5)
    print(f"处理完成,共 {len(results)} 条")

流式输出(SSE/Streaming)的异步处理
#

LLM 流式输出是 AI 应用的标配——用户不需要等全部生成完才看到内容,可以逐 token 看到输出。

import asyncio
import anthropic

client = anthropic.AsyncAnthropic()

async def stream_response(prompt: str):
    """流式接收 LLM 输出,逐块打印"""
    async with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
    print()  # 换行
    return await stream.get_final_message()

asyncio.run(stream_response("写一首关于异步编程的诗"))

FastAPI 中的 SSE 流式返回
#

在 Web 应用中,流式输出通常通过 Server-Sent Events(SSE)推送给前端:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json

app = FastAPI()
client = anthropic.AsyncAnthropic()

async def generate_stream(prompt: str):
    """异步生成器:产生 SSE 格式的数据"""
    async with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            # SSE 格式:data: {...}\n\n
            yield f"data: {json.dumps({'text': text, 'done': False})}\n\n"

        # 发送结束信号
        final = await stream.get_final_message()
        usage = {
            "input_tokens": final.usage.input_tokens,
            "output_tokens": final.usage.output_tokens
        }
        yield f"data: {json.dumps({'done': True, 'usage': usage})}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: dict):
    prompt = request.get("prompt", "")
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
        }
    )

前端接收 SSE:

const response = await fetch('/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ prompt: userInput })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split('\n').filter(l => l.startsWith('data: '));

    for (const line of lines) {
        const data = JSON.parse(line.slice(6));
        if (!data.done) {
            appendToOutput(data.text);
        }
    }
}

异步上下文管理器(async with)
#

async with 用于需要异步初始化和清理的资源,比如数据库连接池、HTTP 会话:

import asyncio
import asyncpg

class DatabaseManager:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None

    async def __aenter__(self):
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=2,
            max_size=10
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.pool.close()

    async def fetch_user(self, user_id: int) -> dict:
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT * FROM users WHERE id = $1", user_id
            )
            return dict(row) if row else None

# 使用
async def main():
    async with DatabaseManager("postgresql://user:pass@localhost/db") as db:
        user = await db.fetch_user(123)
        print(user)

异步数据库操作
#

AI 应用常用的异步数据库库:

数据库同步库异步库
PostgreSQLpsycopg2asyncpg / psycopg3
MySQLPyMySQLaiomysql
MongoDBpymongomotor
Redisredis-pyaioredis / redis.asyncio
import asyncio
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient

# PostgreSQL:存储对话历史
async def save_conversation(pool, session_id: str, messages: list):
    async with pool.acquire() as conn:
        await conn.execute(
            """
            INSERT INTO conversations (session_id, messages, created_at)
            VALUES ($1, $2::jsonb, NOW())
            ON CONFLICT (session_id)
            DO UPDATE SET messages = $2::jsonb, updated_at = NOW()
            """,
            session_id,
            messages  # asyncpg 自动序列化为 JSON
        )

# MongoDB:存储非结构化的 LLM 交互日志
async def log_llm_call(mongo_client, prompt: str, response: str, metadata: dict):
    db = mongo_client.llm_logs
    collection = db.interactions
    await collection.insert_one({
        "prompt": prompt,
        "response": response,
        "metadata": metadata,
        "timestamp": asyncio.get_event_loop().time()
    })

# Redis:缓存 Embedding 结果
import redis.asyncio as aioredis

async def get_or_compute_embedding(redis_client, text: str) -> list[float]:
    cache_key = f"emb:{hash(text)}"

    # 先查缓存
    cached = await redis_client.get(cache_key)
    if cached:
        import json
        return json.loads(cached)

    # 缓存未命中,计算 Embedding
    embedding = await compute_embedding_async(text)

    # 写入缓存,TTL 1 天
    await redis_client.setex(cache_key, 86400, json.dumps(embedding))
    return embedding

FastAPI 异步路由实战
#

FastAPI 本身是基于 asyncio 的,路由函数声明为 async def 就能利用异步优势:

from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager
import asyncpg
import anthropic

# 应用启动时初始化连接池
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时
    app.state.db_pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/app_db",
        min_size=5,
        max_size=20
    )
    app.state.llm_client = anthropic.AsyncAnthropic()
    yield
    # 关闭时
    await app.state.db_pool.close()

app = FastAPI(lifespan=lifespan)

async def get_db_pool(request):
    return request.app.state.db_pool

async def get_llm_client(request):
    return request.app.state.llm_client

@app.post("/ask")
async def ask_question(
    request: dict,
    pool: asyncpg.Pool = Depends(get_db_pool),
    llm: anthropic.AsyncAnthropic = Depends(get_llm_client)
):
    question = request.get("question", "")
    user_id = request.get("user_id")

    if not question:
        raise HTTPException(status_code=400, detail="question 不能为空")

    # 并发执行:获取用户历史 + 调用 LLM
    async with pool.acquire() as conn:
        history_task = asyncio.create_task(
            conn.fetch(
                "SELECT role, content FROM chat_history WHERE user_id = $1 ORDER BY created_at DESC LIMIT 10",
                user_id
            )
        )

    # 获取历史记录
    history = await history_task
    messages = [{"role": r["role"], "content": r["content"]} for r in reversed(history)]
    messages.append({"role": "user", "content": question})

    # 调用 LLM
    response = await llm.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=messages
    )
    answer = response.content[0].text

    # 保存对话记录
    async with pool.acquire() as conn:
        await asyncio.gather(
            conn.execute(
                "INSERT INTO chat_history (user_id, role, content) VALUES ($1, 'user', $2)",
                user_id, question
            ),
            conn.execute(
                "INSERT INTO chat_history (user_id, role, content) VALUES ($1, 'assistant', $2)",
                user_id, answer
            )
        )

    return {"answer": answer}

常见陷阱排查
#

陷阱 1:在同步代码中运行异步函数
#

import asyncio

async def async_func():
    await asyncio.sleep(1)
    return "done"

# 错误:在普通函数中直接 await
def sync_caller():
    result = await async_func()  # SyntaxError!

# 常见误区:在 Jupyter Notebook 中(有自己的 event loop)
# asyncio.run(async_func())  # RuntimeError: cannot run nested event loop

# Notebook 中正确做法:直接 await(Jupyter 支持顶层 await)
# result = await async_func()

# 在普通同步代码中调用异步函数:
result = asyncio.run(async_func())  # 创建新 loop 运行

陷阱 2:未捕获的异步异常
#

import asyncio

async def failing_task():
    await asyncio.sleep(0.1)
    raise ValueError("任务失败")

# 危险:异常被静默丢弃
async def dangerous():
    task = asyncio.create_task(failing_task())
    # 如果不 await task,异常会变成 unhandled exception warning
    await asyncio.sleep(1)  # task 已经失败,但没人知道

# 安全做法:设置异常回调或及时 await
async def safe():
    task = asyncio.create_task(failing_task())
    task.add_done_callback(
        lambda t: print(f"任务失败: {t.exception()}") if t.exception() else None
    )
    await asyncio.sleep(1)

陷阱 3:asyncio.gather 中一个失败导致其他取消
#

import asyncio

async def task_a():
    await asyncio.sleep(1)
    return "A 完成"

async def task_b():
    await asyncio.sleep(0.5)
    raise RuntimeError("B 失败")

# 默认行为:B 失败会让 gather 抛出异常,A 可能被取消
async def wrong():
    try:
        results = await asyncio.gather(task_a(), task_b())
    except RuntimeError as e:
        print(f"异常: {e}")  # A 的结果丢失了

# 正确:return_exceptions=True 让失败作为返回值
async def correct():
    results = await asyncio.gather(task_a(), task_b(), return_exceptions=True)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(f"任务 {i} 成功: {result}")

asyncio.run(correct())

调试技巧
#

import asyncio
import logging

# 开启 asyncio 调试模式,检测慢速协程和未等待的协程
asyncio.run(main(), debug=True)

# 或在环境变量中设置
# PYTHONASYNCIODEBUG=1 python your_app.py

# 用 asyncio.current_task() 追踪当前任务
async def trace_task():
    task = asyncio.current_task()
    print(f"当前任务: {task.get_name()}")

# 检测 event loop 是否被阻塞(超过 100ms 视为问题)
import time

async def monitor_loop_latency():
    while True:
        start = time.monotonic()
        await asyncio.sleep(0.1)
        elapsed = time.monotonic() - start
        if elapsed > 0.2:  # 超过预期的 2 倍
            print(f"警告:event loop 延迟 {elapsed*1000:.0f}ms,可能有阻塞调用")

asyncio 的学习曲线不假,但一旦接受了 event loop 的协作式调度模型,后面的坑基本都能按图索骥。AI 应用大头就是 I/O 等待,从同步改异步通常一毛钱资源不加、并发就能翻 5-10 倍。

Wenzhuo Huang
作者
Wenzhuo Huang
搞运维的工程师,写代码的运维人。专注 Kubernetes、AWS、GitOps 与基础设施可靠性。这个博客既是我的技术笔记本,也是我踩过的坑的受害者档案。
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

相关文章

Python 自动化运维:从脚本到完整工具的工程化实践

·1559 字·8 分钟
系统梳理 Python 运维自动化的工程化方法:boto3 操作 AWS 资源、Kubernetes Python SDK 使用、Click/Typer CLI 框架选型、数据库批量运维脚本、钉钉 Webhook 集成,以及类型注解与错误处理的实践经验。

MongoDB 分片集群实战:从 shard key 设计到 chunk 均衡的全链路

·1406 字·7 分钟
很多团队把 MongoDB 分片当成"设个 shard key 就完事",结果上线半年后发现 80% 数据在一个 shard 上、balancer 每天搬几十 GB 却怎么都追不上、某个 collection 出现 jumbo chunk 无法分裂。这篇文章把我在几套 MongoDB 分片集群上的经验整理出来,希望能让你在分片之前少走一些弯路。

Redis Cluster 扩缩容与数据迁移实战:从 SETSLOT 到 Atomic Slot Migration

·1346 字·7 分钟
很多团队把 Redis Cluster 当成"开箱即用"的分布式 Redis,直到要做扩缩容或数据迁移时才发现:SETSLOT 协议里有十几种状态,迁移过程中客户端重定向要么不生效要么风暴,migrate 卡住没法断,big key 直接把迁移拖垮。这篇文章把我在几套千亿级 Cluster 上做过的扩缩容、迁移、救火全过一遍。