跳过正文
OpenAI API 工程化实践:从 Hello World 到生产

OpenAI API 工程化实践:从 Hello World 到生产

·1678 字·8 分钟·
目录
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

用 OpenAI API 做过几个内部工具,从一个 quick & dirty 的 Python 脚本跑到生产,中间遇到的工程问题比想象中多:版本突然废弃、速率限制打脸、账单失控、Assistants API 改了几次规范。把这些整理成一份可复用的笔记。


安装与客户端配置
#

pip install openai
from openai import OpenAI
import os

client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
    # 可选:通过 Azure OpenAI
    # api_key=os.environ.get("AZURE_OPENAI_KEY"),
    # azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT"),
    # api_version="2024-02-01",
    
    # 超时配置(建议显式设置)
    timeout=30.0,
    max_retries=3,
)

Chat Completions vs Assistants API
#

这是很多人的第一个困惑:什么时候用哪个?

Chat Completions API(推荐首选)
#

无状态,你管理所有状态:

response = client.chat.completions.create(
    model="gpt-5.4-mini",
    messages=[
        {"role": "system", "content": "你是一个代码助手"},
        {"role": "user", "content": "写一个快速排序"},
    ],
    temperature=0.1,
    max_tokens=2048,
)

print(response.choices[0].message.content)
print(f"usage: {response.usage}")

适合场景

  • 单次问答、文本处理
  • 自己管理对话历史
  • 需要可预测的行为和成本
  • 大多数生产应用

Assistants API
#

有状态,OpenAI 管理 Thread(对话历史):

# 创建 Assistant(一次性,持久化)
assistant = client.beta.assistants.create(
    name="代码审查助手",
    instructions="你是专业的代码审查工程师...",
    model="gpt-5.4",
    tools=[{"type": "code_interpreter"}],  # 内置代码执行
)

# 创建对话 Thread
thread = client.beta.threads.create()

# 发送消息
client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content="审查这段代码:..."
)

# 运行(异步,需要轮询)
run = client.beta.threads.runs.create_and_poll(
    thread_id=thread.id,
    assistant_id=assistant.id,
)

if run.status == "completed":
    messages = client.beta.threads.messages.list(thread_id=thread.id)
    print(messages.data[0].content[0].text.value)

适合场景

  • 需要 Code Interpreter(代码执行沙箱)
  • 需要内置的文件搜索(RAG 功能)
  • 长期多会话场景且不想自己管理状态

Assistants API 的缺点

  • 状态在 OpenAI 服务器端,排查问题困难
  • 成本不透明(Thread 存储也收费)
  • 延迟比 Chat Completions 高
  • 对话历史无法精确控制

结论:除非你需要 Code Interpreter 或内置 File Search,否则一律用 Chat Completions,自己管理状态。新项目如需有状态对话管理,建议评估 Responses API(OpenAI 新一代接口,取代部分 Assistants API 场景,延迟更低、状态管理更灵活)。


Function Calling 详解
#

Function Calling 是让 LLM 与外部系统交互的标准方式。

基础用法
#

import json

# 定义工具
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_stock_price",
            "description": "获取股票的实时价格",
            "parameters": {
                "type": "object",
                "properties": {
                    "symbol": {
                        "type": "string",
                        "description": "股票代码,如 AAPL, GOOGL"
                    },
                    "currency": {
                        "type": "string",
                        "enum": ["USD", "CNY"],
                        "description": "返回价格的货币单位",
                    }
                },
                "required": ["symbol"],
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "calculate_portfolio_value",
            "description": "计算投资组合的总价值",
            "parameters": {
                "type": "object",
                "properties": {
                    "holdings": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "symbol": {"type": "string"},
                                "shares": {"type": "number"},
                            }
                        },
                        "description": "持仓列表"
                    }
                },
                "required": ["holdings"],
            }
        }
    }
]

# 工具实现
def get_stock_price(symbol: str, currency: str = "USD") -> dict:
    # 实际项目里调用行情 API
    prices = {"AAPL": 195.5, "GOOGL": 175.2, "MSFT": 420.0}
    price = prices.get(symbol.upper(), 0)
    if currency == "CNY":
        price *= 7.2
    return {"symbol": symbol, "price": price, "currency": currency}

def calculate_portfolio_value(holdings: list[dict]) -> dict:
    total = sum(
        get_stock_price(h["symbol"])["price"] * h["shares"]
        for h in holdings
    )
    return {"total_value": round(total, 2), "currency": "USD"}

FUNCTIONS = {
    "get_stock_price": get_stock_price,
    "calculate_portfolio_value": calculate_portfolio_value,
}

# 完整的 Function Calling 循环
def run_with_tools(user_message: str) -> str:
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        response = client.chat.completions.create(
            model="gpt-5.4",
            messages=messages,
            tools=tools,
            tool_choice="auto",
        )
        
        choice = response.choices[0]
        messages.append(choice.message)  # 把 assistant 消息加入历史
        
        if choice.finish_reason == "stop":
            return choice.message.content
        
        elif choice.finish_reason == "tool_calls":
            # 执行所有工具调用
            for tool_call in choice.message.tool_calls:
                func_name = tool_call.function.name
                func_args = json.loads(tool_call.function.arguments)
                
                print(f"调用: {func_name}({func_args})")
                
                result = FUNCTIONS[func_name](**func_args)
                
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": json.dumps(result, ensure_ascii=False),
                })
        else:
            break
    
    return "无法完成请求"


result = run_with_tools("我持有 100 股 AAPL 和 50 股 GOOGL,总价值是多少?")
print(result)

Parallel Tool Calls
#

gpt-5.4 支持同时调用多个工具,可以并行执行:

import asyncio

async def execute_tool_calls_parallel(tool_calls: list) -> list:
    """并行执行多个工具调用"""
    async def execute_single(tool_call):
        func_name = tool_call.function.name
        func_args = json.loads(tool_call.function.arguments)
        
        # 如果工具是异步的,直接 await
        if asyncio.iscoroutinefunction(FUNCTIONS[func_name]):
            result = await FUNCTIONS[func_name](**func_args)
        else:
            # 同步工具在线程池里运行
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                None, lambda: FUNCTIONS[func_name](**func_args)
            )
        
        return {
            "role": "tool",
            "tool_call_id": tool_call.id,
            "content": json.dumps(result, ensure_ascii=False),
        }
    
    return await asyncio.gather(*[execute_single(tc) for tc in tool_calls])

Structured Output(JSON Schema 绑定)
#

OpenAI 的 Structured Output 功能保证输出严格符合给定的 JSON Schema:

使用 Pydantic 模型
#

from pydantic import BaseModel, Field
from typing import Literal
from openai import OpenAI

client = OpenAI()

class BugReport(BaseModel):
    severity: Literal["critical", "high", "medium", "low"]
    component: str = Field(description="出现 bug 的组件或模块")
    description: str = Field(description="问题描述,50字以内")
    reproduction_steps: list[str] = Field(description="复现步骤")
    suggested_fix: str | None = Field(description="建议的修复方案,如果无法确定则为 null")

class CodeReviewResult(BaseModel):
    overall_score: int = Field(ge=1, le=10, description="代码质量评分 1-10")
    bugs: list[BugReport]
    style_issues: list[str] = Field(description="代码风格问题列表")
    approved: bool

response = client.beta.chat.completions.parse(
    model="gpt-5.4",  # gpt-5.4 及以上支持 Structured Output
    messages=[
        {"role": "system", "content": "你是代码审查专家,按照要求的格式输出审查结果。"},
        {"role": "user", "content": f"审查以下代码:\n\n```python\n{code_to_review}\n```"}
    ],
    response_format=CodeReviewResult,
)

result = response.choices[0].message.parsed
print(f"评分: {result.overall_score}/10")
print(f"发现 {len(result.bugs)} 个 bug")
for bug in result.bugs:
    print(f"  [{bug.severity.upper()}] {bug.description}")

处理 refusal
#

模型可能拒绝生成某些内容:

response = client.beta.chat.completions.parse(
    model="gpt-5.4",
    messages=[...],
    response_format=MySchema,
)

choice = response.choices[0]
if choice.message.refusal:
    # 模型拒绝了请求
    print(f"模型拒绝: {choice.message.refusal}")
else:
    result = choice.message.parsed

Embeddings API
#

Embedding 用于将文本转换为向量,是 RAG 系统的基础。

# 单条 embedding
response = client.embeddings.create(
    model="text-embedding-3-small",
    input="这是需要向量化的文本",
    encoding_format="float",
)

embedding = response.data[0].embedding  # list[float],维度 1536
print(f"向量维度: {len(embedding)}")

# 批量 embedding(更高效)
texts = ["文本1", "文本2", "文本3", ...]

response = client.embeddings.create(
    model="text-embedding-3-small",
    input=texts,  # 最多 2048 条
)

embeddings = [item.embedding for item in response.data]

降维节省成本
#

text-embedding-3 系列支持降维,减少存储和计算成本:

# 使用 256 维代替默认 1536 维(quality 略降,成本和检索速度大幅改善)
response = client.embeddings.create(
    model="text-embedding-3-small",
    input=texts,
    dimensions=256,  # 降维
)

计算语义相似度
#

import numpy as np

def cosine_similarity(a: list[float], b: list[float]) -> float:
    a, b = np.array(a), np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

# 示例:找最相似的文档
def find_most_similar(query: str, documents: list[str]) -> tuple[str, float]:
    all_texts = [query] + documents
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=all_texts,
    )
    
    embeddings = [item.embedding for item in response.data]
    query_emb = embeddings[0]
    doc_embs = embeddings[1:]
    
    similarities = [cosine_similarity(query_emb, doc_emb) for doc_emb in doc_embs]
    best_idx = max(range(len(similarities)), key=lambda i: similarities[i])
    
    return documents[best_idx], similarities[best_idx]

Batch API:批量处理降本 50%
#

对于不需要实时响应的任务(离线标注、批量摘要、数据处理),Batch API 价格是普通 API 的一半,且有 24 小时的处理窗口。

import json
from pathlib import Path

# 准备批量请求文件(JSONL 格式)
requests = []
for idx, text in enumerate(documents_to_summarize):
    requests.append({
        "custom_id": f"doc-{idx}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-5.4-mini",
            "max_tokens": 200,
            "messages": [
                {"role": "system", "content": "用一句话总结以下文本"},
                {"role": "user", "content": text}
            ]
        }
    })

# 写入 JSONL 文件
batch_file_path = Path("/tmp/batch_requests.jsonl")
with batch_file_path.open("w") as f:
    for req in requests:
        f.write(json.dumps(req, ensure_ascii=False) + "\n")

# 上传文件
with batch_file_path.open("rb") as f:
    batch_file = client.files.create(file=f, purpose="batch")

# 创建批量任务
batch = client.batches.create(
    input_file_id=batch_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
    metadata={"description": "文档摘要批量处理"},
)

print(f"Batch ID: {batch.id}")
print(f"状态: {batch.status}")
import time

def wait_for_batch(batch_id: str, poll_interval: int = 60) -> dict:
    """等待 Batch 任务完成并返回结果"""
    while True:
        batch = client.batches.retrieve(batch_id)
        print(f"状态: {batch.status}, 完成: {batch.request_counts.completed}/{batch.request_counts.total}")
        
        if batch.status == "completed":
            # 下载结果
            result_file = client.files.content(batch.output_file_id)
            results = {}
            for line in result_file.text.strip().split("\n"):
                result = json.loads(line)
                custom_id = result["custom_id"]
                if result["error"] is None:
                    content = result["response"]["body"]["choices"][0]["message"]["content"]
                    results[custom_id] = content
                else:
                    results[custom_id] = None
                    print(f"请求 {custom_id} 失败: {result['error']}")
            return results
        
        elif batch.status in ("failed", "expired", "cancelled"):
            raise RuntimeError(f"Batch 任务失败: {batch.status}")
        
        time.sleep(poll_interval)


results = wait_for_batch(batch.id)

流式输出
#

# 同步流式
with client.chat.completions.stream(
    model="gpt-5.4-mini",
    messages=[{"role": "user", "content": "写一篇关于云原生的文章"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# 获取最终统计
final = stream.get_final_completion()
print(f"\nTokens: {final.usage}")
# 异步流式(FastAPI 集成)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI

async_client = AsyncOpenAI()
app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(message: str):
    async def generate():
        async with async_client.chat.completions.stream(
            model="gpt-5.4-mini",
            messages=[{"role": "user", "content": message}],
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {text}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

错误处理与重试
#

import time
import logging
from openai import (
    OpenAI,
    RateLimitError,
    APIConnectionError,
    APITimeoutError,
    APIStatusError,
    AuthenticationError,
    BadRequestError,
)

logger = logging.getLogger(__name__)

class OpenAIClient:
    def __init__(self):
        self.client = OpenAI(
            max_retries=0,  # 关闭 SDK 内置重试,自己控制
            timeout=30.0,
        )
    
    def chat(
        self,
        messages: list[dict],
        model: str = "gpt-5.4-mini",
        max_retries: int = 5,
        **kwargs
    ) -> str:
        for attempt in range(max_retries):
            try:
                response = self.client.chat.completions.create(
                    model=model,
                    messages=messages,
                    **kwargs
                )
                return response.choices[0].message.content
            
            except AuthenticationError:
                # API Key 无效,不重试
                logger.error("OpenAI API Key 无效")
                raise
            
            except BadRequestError as e:
                # 请求格式错误,不重试
                logger.error(f"请求格式错误: {e}")
                raise
            
            except RateLimitError as e:
                if attempt == max_retries - 1:
                    raise
                # 429 错误,等待后重试
                retry_after = int(
                    e.response.headers.get("x-ratelimit-reset-requests", "10")
                    if hasattr(e, 'response') and e.response else "10"
                )
                wait = min(retry_after, 2 ** attempt * 5)
                logger.warning(f"速率限制,等待 {wait}s (attempt {attempt + 1})")
                time.sleep(wait)
            
            except (APIConnectionError, APITimeoutError) as e:
                if attempt == max_retries - 1:
                    raise
                wait = 2 ** attempt
                logger.warning(f"连接/超时错误,{wait}s 后重试: {e}")
                time.sleep(wait)
            
            except APIStatusError as e:
                if e.status_code >= 500:
                    if attempt == max_retries - 1:
                        raise
                    wait = 2 ** attempt * 5
                    logger.warning(f"服务器错误 {e.status_code}{wait}s 后重试")
                    time.sleep(wait)
                else:
                    raise

成本优化技巧
#

1. 选择合适的模型
#

# 根据任务复杂度自动选择模型
# 注意:GPT-4o 已于 2026 年 2 月 13 日退役,请使用 gpt-5.4 系列
MODEL_ROUTING = {
    "classification": "gpt-5.4-nano",   # 最轻量,适合简单分类
    "extraction": "gpt-5.4-mini",
    "summarization": "gpt-5.4-mini",
    "code_generation": "gpt-5.4",       # 旗舰,参考官方最新定价
    "complex_reasoning": "o4-mini",     # 推理模型,取代旧版 o1-mini
    "analysis": "gpt-5.4",
}

2. 精确控制 max_tokens
#

import tiktoken

def count_tokens(text: str, model: str = "gpt-5.4") -> int:
    encoder = tiktoken.encoding_for_model(model)
    return len(encoder.encode(text))

def estimate_output_tokens(task_type: str) -> int:
    """根据任务类型估算输出 tokens"""
    estimates = {
        "classification": 10,
        "extraction": 100,
        "summarization": 200,
        "code_generation": 1000,
    }
    return estimates.get(task_type, 500)

3. 缓存重复请求
#

import hashlib
import json
from functools import lru_cache

class CachedOpenAIClient:
    def __init__(self):
        self.client = OpenAI()
        self._cache = {}  # 生产中用 Redis
    
    def chat(self, messages: list[dict], **kwargs) -> str:
        # 生成缓存键
        cache_key = hashlib.md5(
            json.dumps({"messages": messages, **kwargs}, sort_keys=True).encode()
        ).hexdigest()
        
        if cache_key in self._cache:
            return self._cache[cache_key]
        
        response = self.client.chat.completions.create(
            messages=messages, **kwargs
        )
        result = response.choices[0].message.content
        
        # 缓存结果(对于确定性任务,temperature=0 的结果可以缓存)
        if kwargs.get("temperature", 1.0) == 0:
            self._cache[cache_key] = result
        
        return result

4. 监控成本
#

from dataclasses import dataclass, field
from collections import defaultdict

@dataclass
class UsageTracker:
    model_usage: dict = field(default_factory=lambda: defaultdict(lambda: {"input": 0, "output": 0}))
    
    # 价格以官方最新定价为准(https://openai.com/pricing)
    # GPT-4o 已于 2026 年 2 月 13 日退役
    PRICES = {
        "gpt-5.4": {"input": 0, "output": 0},        # 参考官方最新定价
        "gpt-5.4-mini": {"input": 0, "output": 0},   # 参考官方最新定价
        "gpt-5.4-nano": {"input": 0, "output": 0},   # 参考官方最新定价
        "o3": {"input": 0, "output": 0},              # 参考官方最新定价
        "o4-mini": {"input": 0, "output": 0},         # 参考官方最新定价
        "text-embedding-3-small": {"input": 0.02, "output": 0},
    }
    
    def record(self, model: str, input_tokens: int, output_tokens: int):
        self.model_usage[model]["input"] += input_tokens
        self.model_usage[model]["output"] += output_tokens
    
    def total_cost(self) -> float:
        total = 0
        for model, usage in self.model_usage.items():
            prices = self.PRICES.get(model, {"input": 0, "output": 0})
            total += usage["input"] * prices["input"] / 1_000_000
            total += usage["output"] * prices["output"] / 1_000_000
        return total
    
    def report(self):
        print(f"{'模型':<25} {'输入':<12} {'输出':<12} {'费用':<10}")
        print("-" * 60)
        for model, usage in self.model_usage.items():
            prices = self.PRICES.get(model, {"input": 0, "output": 0})
            cost = (
                usage["input"] * prices["input"] / 1_000_000 +
                usage["output"] * prices["output"] / 1_000_000
            )
            print(f"{model:<25} {usage['input']:<12} {usage['output']:<12} ${cost:.4f}")
        print(f"\n总费用: ${self.total_cost():.4f}")

tracker = UsageTracker()

完整生产示例:文档问答系统
#

from openai import OpenAI
from pathlib import Path
import json

class DocumentQA:
    def __init__(self):
        self.client = OpenAI()
        self.documents = []
        self.embeddings = []
    
    def add_document(self, text: str, metadata: dict = None):
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        self.documents.append({"text": text, "metadata": metadata or {}})
        self.embeddings.append(response.data[0].embedding)
    
    def query(self, question: str, top_k: int = 3) -> str:
        import numpy as np
        
        # Embed 问题
        q_response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=question,
        )
        q_emb = np.array(q_response.data[0].embedding)
        
        # 计算相似度,取 top_k
        all_embs = np.array(self.embeddings)
        similarities = all_embs @ q_emb / (
            np.linalg.norm(all_embs, axis=1) * np.linalg.norm(q_emb)
        )
        top_indices = np.argsort(similarities)[::-1][:top_k]
        
        # 组装上下文
        context = "\n\n---\n\n".join(
            self.documents[i]["text"] for i in top_indices
        )
        
        # 调用 Chat Completions
        response = self.client.chat.completions.create(
            model="gpt-5.4-mini",
            temperature=0,
            max_tokens=500,
            messages=[
                {
                    "role": "system",
                    "content": "基于提供的文档回答问题。如果文档中没有相关信息,明确说明。"
                },
                {
                    "role": "user",
                    "content": f"文档内容:\n{context}\n\n问题:{question}"
                }
            ]
        )
        
        return response.choices[0].message.content


# 使用示例
qa = DocumentQA()
qa.add_document("Kubernetes 是一个容器编排系统...", {"source": "k8s-intro.md"})
qa.add_document("RAG 系统通过检索增强生成质量...", {"source": "rag-guide.md"})

answer = qa.query("什么是 RAG?")
print(answer)
Wenzhuo Huang
作者
Wenzhuo Huang
搞运维的工程师,写代码的运维人。专注 Kubernetes、AWS、GitOps 与基础设施可靠性。这个博客既是我的技术笔记本,也是我踩过的坑的受害者档案。
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

相关文章

Claude API 开发完全指南:从调用到生产应用

·1316 字·7 分钟
Claude API 的设计哲学和 OpenAI 有些不同,但一旦理解其模式,就会发现它在长文本、代码生成和工具调用上非常可靠。本文覆盖从 SDK 配置到 Prompt Caching、Tool Use、Vision 的完整开发实践,以及生产中的错误处理与成本控制策略。

LangChain 从入门到实战:构建 LLM 应用的工程框架

·1045 字·5 分钟
LangChain 是构建 LLM 应用最流行的框架,但也是踩坑最多的框架之一。本文从 LCEL 表达式、ReAct Agent、LangGraph 工作流到生产部署,梳理真正有用的部分,并指出哪些功能实际工程中应该避免。