向量数据库已经是构建 RAG 系统的标配组件。选型决策直接影响后期维护成本,本文从实际工程角度讲清楚怎么选、怎么部署、怎么用好 Milvus。
向量数据库选型对比#
市面上主流的几个方案各有侧重:
| 方案 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| Milvus | 大规模生产 | 性能强、功能完整、社区活跃 | 部署复杂、资源占用高 |
| Qdrant | 中等规模 | Rust实现性能好、API简洁 | 生态相对小 |
| Weaviate | GraphQL场景 | 内置向量化、schema友好 | 内存消耗大 |
| pgvector | 已有PostgreSQL | 运维简单、SQL熟悉 | 亿级数据性能下降明显 |
| Chroma | 本地开发 | 极简部署 | 不适合生产 |
选型建议:
- 数据量 < 500万:pgvector 或 Qdrant,省去独立服务运维
- 数据量 500万~5000万:Milvus Standalone 或 Qdrant
- 数据量 > 5000万 / 需要高并发读写:Milvus Cluster
pgvector 最容易被低估——如果你已经有 PostgreSQL,百万级数据加上 HNSW 索引,延迟完全可以控制在 10ms 以内,不用引入新的中间件。但它的并发写入性能比不上专用向量数据库,索引构建也会影响在线查询。
Milvus Standalone 部署#
方式一:Docker Compose(推荐开发和小规模生产)#
# 下载官方 compose 文件
wget https://github.com/milvus-io/milvus/releases/download/v2.4.6/milvus-standalone-docker-compose.yml \
-O docker-compose.yml
# 启动
docker-compose up -d
# 确认三个组件都 healthy
docker-compose ps
Milvus Standalone 内部包含三个进程:etcd(元数据)、MinIO(对象存储)、milvus 本身。compose 文件会一并拉起来。
生产注意点:
- 把 etcd 数据和 MinIO 数据挂载到持久化目录
- 默认端口 19530(gRPC)和 9091(HTTP/metrics)
- 内存至少 8GB,实际集合越大需要越多
# 关键 volume 配置片段
volumes:
- /data/milvus/etcd:/etcd
- /data/milvus/minio:/minio_data
- /data/milvus/milvus:/var/lib/milvus
方式二:Helm 部署到 Kubernetes#
helm repo add milvus https://zilliztech.github.io/milvus-helm/
helm repo update
helm install milvus milvus/milvus \
--namespace milvus \
--create-namespace \
--set cluster.enabled=false \
--set etcd.replicaCount=1 \
--set minio.mode=standalone \
--set pulsar.enabled=false \
-f values-standalone.yaml
# values-standalone.yaml
standalone:
resources:
requests:
memory: "4Gi"
cpu: "1"
limits:
memory: "8Gi"
cpu: "4"
minio:
persistence:
storageClass: "gp3"
size: 100Gi
etcd:
persistence:
storageClass: "gp3"
size: 10Gi
Collection 设计#
Collection 相当于关系型数据库的表,设计好 Schema 是后续一切的基础。
Schema 定义#
from pymilvus import (
connections, Collection, CollectionSchema,
FieldSchema, DataType, utility
)
# 连接
connections.connect(
alias="default",
host="localhost",
port="19530"
)
# 定义字段
fields = [
# 主键,自增或手动指定
FieldSchema(
name="id",
dtype=DataType.INT64,
is_primary=True,
auto_id=True
),
# 业务 ID,用于关联原始数据
FieldSchema(
name="doc_id",
dtype=DataType.VARCHAR,
max_length=128
),
# 文档分块文本(用于返回展示)
FieldSchema(
name="text",
dtype=DataType.VARCHAR,
max_length=4096
),
# 向量字段,维度取决于 embedding 模型
# text-embedding-3-small: 1536
# bge-m3: 1024
# bge-large-zh: 1024
FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=1536
),
# 标量字段,用于过滤
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="chunk_index", dtype=DataType.INT32),
FieldSchema(name="created_at", dtype=DataType.INT64), # unix timestamp
]
schema = CollectionSchema(
fields=fields,
description="知识库文档分块",
enable_dynamic_field=True # 允许插入额外字段,灵活但有开销
)
collection = Collection(
name="knowledge_base",
schema=schema,
consistency_level="Session" # Strong/Session/Bounded/Eventually
)
consistency_level 选择:
Strong:每次读都能看到最新写入,性能最低Session:当前会话内强一致,通常够用Bounded:允许一定延迟,适合高吞吐写场景Eventually:最终一致,追求极致读性能时用
索引构建#
索引类型的选择对性能影响极大:
# HNSW:精度高、查询快,内存占用大,适合大多数场景
index_params_hnsw = {
"metric_type": "COSINE", # 余弦相似度,适合文本
"index_type": "HNSW",
"params": {
"M": 16, # 每个节点的最大连接数,越大精度越高但内存越多
"efConstruction": 200 # 构建时的搜索范围,越大索引质量越好
}
}
# IVF_FLAT:分区倒排索引,内存友好,适合大数据集
index_params_ivf = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {
"nlist": 1024 # 聚类中心数,建议 sqrt(数据量)
}
}
# 通常文本用 COSINE + HNSW 组合
collection.create_index(
field_name="embedding",
index_params=index_params_hnsw
)
# 加载到内存(查询前必须)
collection.load()
print(f"Collection loaded, entity count: {collection.num_entities}")
HNSW 参数经验值:
M=16:平衡精度和内存的默认值,可从这里开始M=32:高精度要求时用,内存翻倍efConstruction=200:离线建索引时可以开大,提升质量
Python SDK CRUD 操作#
插入数据#
import numpy as np
from typing import List
def batch_insert(
collection: Collection,
texts: List[str],
embeddings: List[List[float]],
doc_ids: List[str],
sources: List[str],
batch_size: int = 1000
):
"""批量插入,避免单次请求过大"""
total = len(texts)
inserted = 0
for i in range(0, total, batch_size):
batch_texts = texts[i:i+batch_size]
batch_embeddings = embeddings[i:i+batch_size]
batch_doc_ids = doc_ids[i:i+batch_size]
batch_sources = sources[i:i+batch_size]
import time
data = [
batch_doc_ids,
batch_texts,
batch_embeddings,
batch_sources,
[0] * len(batch_texts), # chunk_index
[int(time.time())] * len(batch_texts),
]
result = collection.insert(data)
inserted += len(result.primary_keys)
print(f"Inserted {inserted}/{total}")
# 插入后手动 flush 确保持久化(生产中可以不立即 flush)
collection.flush()
return inserted
向量搜索#
def vector_search(
collection: Collection,
query_embedding: List[float],
top_k: int = 10,
filters: str = None,
output_fields: List[str] = None
) -> List[dict]:
"""
基础向量搜索
filters 示例: "source == 'wiki' and created_at > 1700000000"
"""
search_params = {
"metric_type": "COSINE",
"params": {
"ef": 64 # 查询时的搜索范围,越大召回越准但越慢
}
}
if output_fields is None:
output_fields = ["doc_id", "text", "source", "chunk_index"]
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=top_k,
expr=filters, # 标量过滤条件
output_fields=output_fields
)
hits = []
for hit in results[0]:
hits.append({
"id": hit.id,
"score": hit.score,
"doc_id": hit.entity.get("doc_id"),
"text": hit.entity.get("text"),
"source": hit.entity.get("source"),
})
return hits
混合搜索(向量 + 标量过滤)#
这是实际业务中最常用的模式——不能让用户搜索到不属于他们的数据:
def hybrid_search(
collection: Collection,
query_embedding: List[float],
user_id: str,
knowledge_base_ids: List[str],
top_k: int = 5
) -> List[dict]:
"""
混合搜索:向量相似度 + 权限过滤
"""
# 构建过滤条件(Milvus 使用类 Python 表达式语法)
kb_ids_str = '", "'.join(knowledge_base_ids)
filter_expr = f'doc_id in ["{kb_ids_str}"]'
# 也可以用更复杂的条件
# filter_expr = f'source == "internal" and created_at > {cutoff_ts}'
return vector_search(
collection=collection,
query_embedding=query_embedding,
top_k=top_k,
filters=filter_expr,
output_fields=["doc_id", "text", "source", "chunk_index", "created_at"]
)
标量过滤的性能陷阱:过滤条件命中的数据比例太低(比如 0.1%)时,Milvus 需要扫描大量节点才能凑够 top_k 个结果,性能会急剧下降。解决方案是对高频过滤字段建 scalar index:
# 对 source 字段建标量索引
collection.create_index(
field_name="source",
index_params={"index_type": "Trie"} # VARCHAR 用 Trie,INT 用 STL_SORT/INVERTED
)
删除操作#
# 按主键删除
collection.delete(expr="id in [1, 2, 3]")
# 按业务字段删除(需要先建标量索引才高效)
collection.delete(expr='doc_id == "doc-abc-123"')
# 注意:Milvus 的删除是软删除 + 后台合并,不会立即释放磁盘空间
# 可以手动触发压缩
collection.compact()
生产调优#
内存配置#
Milvus 把整个索引加载到内存,内存不够直接 OOM。估算公式:
内存需求 ≈ 向量数量 × 维度 × 4字节 × (1 + HNSW_M/8) × 1.2(缓冲)
举例:1000万条 1536 维向量,HNSW M=16:
1000万 × 1536 × 4 × (1 + 16/8) × 1.2 ≈ 220GB
所以大规模场景要么用 IVF 系列(支持磁盘索引),要么上 Milvus Cluster 做分片。
DiskANN 索引(磁盘友好)#
# 对于超大数据集,用 DISKANN 把部分索引放磁盘
index_params_diskann = {
"metric_type": "COSINE",
"index_type": "DISKANN",
"params": {
"search_cache_budget_gb": 4, # 热数据缓存大小
"num_threads": 4,
}
}
查询性能监控#
import time
def monitored_search(collection, query_embedding, top_k=10):
start = time.time()
results = vector_search(collection, query_embedding, top_k)
elapsed = (time.time() - start) * 1000
# 记录到你的监控系统
print(f"Search latency: {elapsed:.1f}ms, results: {len(results)}")
return results
Milvus 也暴露了 Prometheus metrics,在 9091 端口,可以直接接入 Grafana:
# prometheus scrape config
- job_name: 'milvus'
static_configs:
- targets: ['milvus-svc:9091']
metrics_path: '/metrics'
常见问题#
问题1:查询召回率低
ef 参数太小。搜索时把 ef 调大(比如 128 或 256),以延迟换召回率:
search_params = {"metric_type": "COSINE", "params": {"ef": 256}}
问题2:写入后立刻查不到
Milvus 默认有写入缓冲,需要 flush 或等自动刷盘。开发环境调用 collection.flush(),生产环境接受最终一致即可。
问题3:Collection load 很慢
大索引加载耗时,可以在服务启动时预加载,而不是每次请求时检查。也可以用 load_balance 配置让 Milvus 分批加载。
问题4:删除后磁盘没释放
# 触发手动压缩
collection.compact()
# 查看压缩状态
from pymilvus import utility
plans = utility.get_compaction_plans(collection.name)
完整 RAG 集成示例#
from openai import OpenAI
from pymilvus import connections, Collection
client = OpenAI()
def get_embedding(text: str) -> List[float]:
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def rag_query(question: str, collection: Collection) -> str:
# 1. 向量化问题
query_embedding = get_embedding(question)
# 2. 检索相关文档
hits = vector_search(
collection=collection,
query_embedding=query_embedding,
top_k=5
)
# 3. 构建上下文
context = "\n\n".join([
f"[来源: {h['source']}]\n{h['text']}"
for h in hits
])
# 4. 调用 LLM 生成答案
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "你是一个知识库问答助手,根据提供的上下文回答问题。"
},
{
"role": "user",
"content": f"上下文:\n{context}\n\n问题:{question}"
}
]
)
return response.choices[0].message.content
Milvus 生产落地的核心是:索引类型要根据数据规模选对、标量过滤要建索引、内存要提前规划好。其他细节在实际运行中踩坑修正就好。






