跳过正文
Milvus 向量数据库实战:从部署到生产应用

Milvus 向量数据库实战:从部署到生产应用

·895 字·5 分钟·
目录
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

向量数据库已经是构建 RAG 系统的标配组件。选型决策直接影响后期维护成本,本文从实际工程角度讲清楚怎么选、怎么部署、怎么用好 Milvus。

向量数据库选型对比
#

市面上主流的几个方案各有侧重:

方案适用场景优势劣势
Milvus大规模生产性能强、功能完整、社区活跃部署复杂、资源占用高
Qdrant中等规模Rust实现性能好、API简洁生态相对小
WeaviateGraphQL场景内置向量化、schema友好内存消耗大
pgvector已有PostgreSQL运维简单、SQL熟悉亿级数据性能下降明显
Chroma本地开发极简部署不适合生产

选型建议

  • 数据量 < 500万:pgvector 或 Qdrant,省去独立服务运维
  • 数据量 500万~5000万:Milvus Standalone 或 Qdrant
  • 数据量 > 5000万 / 需要高并发读写:Milvus Cluster

pgvector 最容易被低估——如果你已经有 PostgreSQL,百万级数据加上 HNSW 索引,延迟完全可以控制在 10ms 以内,不用引入新的中间件。但它的并发写入性能比不上专用向量数据库,索引构建也会影响在线查询。


Milvus Standalone 部署
#

方式一:Docker Compose(推荐开发和小规模生产)
#

# 下载官方 compose 文件
wget https://github.com/milvus-io/milvus/releases/download/v2.4.6/milvus-standalone-docker-compose.yml \
  -O docker-compose.yml

# 启动
docker-compose up -d

# 确认三个组件都 healthy
docker-compose ps

Milvus Standalone 内部包含三个进程:etcd(元数据)、MinIO(对象存储)、milvus 本身。compose 文件会一并拉起来。

生产注意点

  • 把 etcd 数据和 MinIO 数据挂载到持久化目录
  • 默认端口 19530(gRPC)和 9091(HTTP/metrics)
  • 内存至少 8GB,实际集合越大需要越多
# 关键 volume 配置片段
volumes:
  - /data/milvus/etcd:/etcd
  - /data/milvus/minio:/minio_data
  - /data/milvus/milvus:/var/lib/milvus

方式二:Helm 部署到 Kubernetes
#

helm repo add milvus https://zilliztech.github.io/milvus-helm/
helm repo update

helm install milvus milvus/milvus \
  --namespace milvus \
  --create-namespace \
  --set cluster.enabled=false \
  --set etcd.replicaCount=1 \
  --set minio.mode=standalone \
  --set pulsar.enabled=false \
  -f values-standalone.yaml
# values-standalone.yaml
standalone:
  resources:
    requests:
      memory: "4Gi"
      cpu: "1"
    limits:
      memory: "8Gi"
      cpu: "4"

minio:
  persistence:
    storageClass: "gp3"
    size: 100Gi

etcd:
  persistence:
    storageClass: "gp3"
    size: 10Gi

Collection 设计
#

Collection 相当于关系型数据库的表,设计好 Schema 是后续一切的基础。

Schema 定义
#

from pymilvus import (
    connections, Collection, CollectionSchema,
    FieldSchema, DataType, utility
)

# 连接
connections.connect(
    alias="default",
    host="localhost",
    port="19530"
)

# 定义字段
fields = [
    # 主键,自增或手动指定
    FieldSchema(
        name="id",
        dtype=DataType.INT64,
        is_primary=True,
        auto_id=True
    ),
    # 业务 ID,用于关联原始数据
    FieldSchema(
        name="doc_id",
        dtype=DataType.VARCHAR,
        max_length=128
    ),
    # 文档分块文本(用于返回展示)
    FieldSchema(
        name="text",
        dtype=DataType.VARCHAR,
        max_length=4096
    ),
    # 向量字段,维度取决于 embedding 模型
    # text-embedding-3-small: 1536
    # bge-m3: 1024
    # bge-large-zh: 1024
    FieldSchema(
        name="embedding",
        dtype=DataType.FLOAT_VECTOR,
        dim=1536
    ),
    # 标量字段,用于过滤
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=64),
    FieldSchema(name="chunk_index", dtype=DataType.INT32),
    FieldSchema(name="created_at", dtype=DataType.INT64),  # unix timestamp
]

schema = CollectionSchema(
    fields=fields,
    description="知识库文档分块",
    enable_dynamic_field=True  # 允许插入额外字段,灵活但有开销
)

collection = Collection(
    name="knowledge_base",
    schema=schema,
    consistency_level="Session"  # Strong/Session/Bounded/Eventually
)

consistency_level 选择

  • Strong:每次读都能看到最新写入,性能最低
  • Session:当前会话内强一致,通常够用
  • Bounded:允许一定延迟,适合高吞吐写场景
  • Eventually:最终一致,追求极致读性能时用

索引构建
#

索引类型的选择对性能影响极大:

# HNSW:精度高、查询快,内存占用大,适合大多数场景
index_params_hnsw = {
    "metric_type": "COSINE",  # 余弦相似度,适合文本
    "index_type": "HNSW",
    "params": {
        "M": 16,           # 每个节点的最大连接数,越大精度越高但内存越多
        "efConstruction": 200  # 构建时的搜索范围,越大索引质量越好
    }
}

# IVF_FLAT:分区倒排索引,内存友好,适合大数据集
index_params_ivf = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",
    "params": {
        "nlist": 1024  # 聚类中心数,建议 sqrt(数据量)
    }
}

# 通常文本用 COSINE + HNSW 组合
collection.create_index(
    field_name="embedding",
    index_params=index_params_hnsw
)

# 加载到内存(查询前必须)
collection.load()
print(f"Collection loaded, entity count: {collection.num_entities}")

HNSW 参数经验值

  • M=16:平衡精度和内存的默认值,可从这里开始
  • M=32:高精度要求时用,内存翻倍
  • efConstruction=200:离线建索引时可以开大,提升质量

Python SDK CRUD 操作
#

插入数据
#

import numpy as np
from typing import List

def batch_insert(
    collection: Collection,
    texts: List[str],
    embeddings: List[List[float]],
    doc_ids: List[str],
    sources: List[str],
    batch_size: int = 1000
):
    """批量插入,避免单次请求过大"""
    total = len(texts)
    inserted = 0

    for i in range(0, total, batch_size):
        batch_texts = texts[i:i+batch_size]
        batch_embeddings = embeddings[i:i+batch_size]
        batch_doc_ids = doc_ids[i:i+batch_size]
        batch_sources = sources[i:i+batch_size]

        import time
        data = [
            batch_doc_ids,
            batch_texts,
            batch_embeddings,
            batch_sources,
            [0] * len(batch_texts),  # chunk_index
            [int(time.time())] * len(batch_texts),
        ]

        result = collection.insert(data)
        inserted += len(result.primary_keys)
        print(f"Inserted {inserted}/{total}")

    # 插入后手动 flush 确保持久化(生产中可以不立即 flush)
    collection.flush()
    return inserted

向量搜索
#

def vector_search(
    collection: Collection,
    query_embedding: List[float],
    top_k: int = 10,
    filters: str = None,
    output_fields: List[str] = None
) -> List[dict]:
    """
    基础向量搜索
    filters 示例: "source == 'wiki' and created_at > 1700000000"
    """
    search_params = {
        "metric_type": "COSINE",
        "params": {
            "ef": 64  # 查询时的搜索范围,越大召回越准但越慢
        }
    }

    if output_fields is None:
        output_fields = ["doc_id", "text", "source", "chunk_index"]

    results = collection.search(
        data=[query_embedding],
        anns_field="embedding",
        param=search_params,
        limit=top_k,
        expr=filters,  # 标量过滤条件
        output_fields=output_fields
    )

    hits = []
    for hit in results[0]:
        hits.append({
            "id": hit.id,
            "score": hit.score,
            "doc_id": hit.entity.get("doc_id"),
            "text": hit.entity.get("text"),
            "source": hit.entity.get("source"),
        })

    return hits

混合搜索(向量 + 标量过滤)
#

这是实际业务中最常用的模式——不能让用户搜索到不属于他们的数据:

def hybrid_search(
    collection: Collection,
    query_embedding: List[float],
    user_id: str,
    knowledge_base_ids: List[str],
    top_k: int = 5
) -> List[dict]:
    """
    混合搜索:向量相似度 + 权限过滤
    """
    # 构建过滤条件(Milvus 使用类 Python 表达式语法)
    kb_ids_str = '", "'.join(knowledge_base_ids)
    filter_expr = f'doc_id in ["{kb_ids_str}"]'

    # 也可以用更复杂的条件
    # filter_expr = f'source == "internal" and created_at > {cutoff_ts}'

    return vector_search(
        collection=collection,
        query_embedding=query_embedding,
        top_k=top_k,
        filters=filter_expr,
        output_fields=["doc_id", "text", "source", "chunk_index", "created_at"]
    )

标量过滤的性能陷阱:过滤条件命中的数据比例太低(比如 0.1%)时,Milvus 需要扫描大量节点才能凑够 top_k 个结果,性能会急剧下降。解决方案是对高频过滤字段建 scalar index:

# 对 source 字段建标量索引
collection.create_index(
    field_name="source",
    index_params={"index_type": "Trie"}  # VARCHAR 用 Trie,INT 用 STL_SORT/INVERTED
)

删除操作
#

# 按主键删除
collection.delete(expr="id in [1, 2, 3]")

# 按业务字段删除(需要先建标量索引才高效)
collection.delete(expr='doc_id == "doc-abc-123"')

# 注意:Milvus 的删除是软删除 + 后台合并,不会立即释放磁盘空间
# 可以手动触发压缩
collection.compact()

生产调优
#

内存配置
#

Milvus 把整个索引加载到内存,内存不够直接 OOM。估算公式:

内存需求 ≈ 向量数量 × 维度 × 4字节 × (1 + HNSW_M/8) × 1.2(缓冲)

举例:1000万条 1536 维向量,HNSW M=16:

1000万 × 1536 × 4 × (1 + 16/8) × 1.2 ≈ 220GB

所以大规模场景要么用 IVF 系列(支持磁盘索引),要么上 Milvus Cluster 做分片。

DiskANN 索引(磁盘友好)
#

# 对于超大数据集,用 DISKANN 把部分索引放磁盘
index_params_diskann = {
    "metric_type": "COSINE",
    "index_type": "DISKANN",
    "params": {
        "search_cache_budget_gb": 4,  # 热数据缓存大小
        "num_threads": 4,
    }
}

查询性能监控
#

import time

def monitored_search(collection, query_embedding, top_k=10):
    start = time.time()
    results = vector_search(collection, query_embedding, top_k)
    elapsed = (time.time() - start) * 1000

    # 记录到你的监控系统
    print(f"Search latency: {elapsed:.1f}ms, results: {len(results)}")
    return results

Milvus 也暴露了 Prometheus metrics,在 9091 端口,可以直接接入 Grafana:

# prometheus scrape config
- job_name: 'milvus'
  static_configs:
    - targets: ['milvus-svc:9091']
  metrics_path: '/metrics'

常见问题
#

问题1:查询召回率低

ef 参数太小。搜索时把 ef 调大(比如 128 或 256),以延迟换召回率:

search_params = {"metric_type": "COSINE", "params": {"ef": 256}}

问题2:写入后立刻查不到

Milvus 默认有写入缓冲,需要 flush 或等自动刷盘。开发环境调用 collection.flush(),生产环境接受最终一致即可。

问题3:Collection load 很慢

大索引加载耗时,可以在服务启动时预加载,而不是每次请求时检查。也可以用 load_balance 配置让 Milvus 分批加载。

问题4:删除后磁盘没释放

# 触发手动压缩
collection.compact()
# 查看压缩状态
from pymilvus import utility
plans = utility.get_compaction_plans(collection.name)

完整 RAG 集成示例
#

from openai import OpenAI
from pymilvus import connections, Collection

client = OpenAI()

def get_embedding(text: str) -> List[float]:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

def rag_query(question: str, collection: Collection) -> str:
    # 1. 向量化问题
    query_embedding = get_embedding(question)

    # 2. 检索相关文档
    hits = vector_search(
        collection=collection,
        query_embedding=query_embedding,
        top_k=5
    )

    # 3. 构建上下文
    context = "\n\n".join([
        f"[来源: {h['source']}]\n{h['text']}"
        for h in hits
    ])

    # 4. 调用 LLM 生成答案
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": "你是一个知识库问答助手,根据提供的上下文回答问题。"
            },
            {
                "role": "user",
                "content": f"上下文:\n{context}\n\n问题:{question}"
            }
        ]
    )

    return response.choices[0].message.content

Milvus 生产落地的核心是:索引类型要根据数据规模选对标量过滤要建索引内存要提前规划好。其他细节在实际运行中踩坑修正就好。

Wenzhuo Huang
作者
Wenzhuo Huang
搞运维的工程师,写代码的运维人。专注 Kubernetes、AWS、GitOps 与基础设施可靠性。这个博客既是我的技术笔记本,也是我踩过的坑的受害者档案。
AI 工程化实战 - 这篇文章属于一个选集。
§ : 本文

相关文章

Python 异步编程实战:asyncio 在 AI 应用中的使用

·1613 字·8 分钟
AI 应用天然是 I/O 密集型的:等 LLM 响应、等向量数据库检索、等多个工具调用返回。同步写法在这里是性能杀手。这篇文章从 event loop 原理讲到实际的 AI 应用模式,重点是 asyncio.gather 并发调用、SSE 流式输出处理和常见陷阱排查。