用 Celery 跑过几个服务的异步任务——发邮件、生成报表、调三方 API、定时数据同步。这篇把任务定义、重试策略、队列路由、K8s 部署这几块整理下,外加生产里真正踩到的坑。
Celery 架构#
Producer(Django/Flask/脚本)
│
│ 发布任务消息
▼
Broker(Redis / RabbitMQ)
│
│ 消费消息
▼
Worker(多进程/多线程/协程)
│
│ 写结果
▼
Result Backend(Redis / PostgreSQL / MongoDB)
四个核心组件:
- Producer:调用
.delay()或.apply_async()发布任务,不关心谁来执行 - Broker:消息队列,存储待执行的任务消息。Redis 简单够用,RabbitMQ 更可靠(支持持久化、死信队列)
- Worker:真正执行任务的进程,可以水平扩展
- Result Backend:存储任务执行结果,如果业务不关心结果可以不配(减少写压力)
Celery Beat 是独立的调度器进程,负责按 crontab/interval 把定时任务投递到 Broker,然后由普通 Worker 执行。
项目结构与初始化#
myapp/
├── celery_app.py # Celery 实例
├── tasks/
│ ├── __init__.py
│ ├── email.py # 邮件相关任务
│ ├── report.py # 报表生成任务
│ └── sync.py # 数据同步任务
└── beat_schedule.py # 定时任务配置
celery_app.py:
from celery import Celery
from kombu import Queue, Exchange
app = Celery("myapp")
app.conf.update(
# Broker & Backend
broker_url="redis://redis:6379/0",
result_backend="redis://redis:6379/1",
# 序列化(生产环境用 json,不要用 pickle)
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# 时区
timezone="Asia/Shanghai",
enable_utc=True,
# Worker 行为
worker_prefetch_multiplier=1, # 每个 worker 进程一次只取 1 个任务,防止任务堆积在某个 worker 上
task_acks_late=True, # 任务执行完才 ack,防止 worker 崩溃丢任务
# 任务超时
task_soft_time_limit=300, # 软超时:抛 SoftTimeLimitExceeded
task_time_limit=360, # 硬超时:强制 kill
# 队列定义
task_queues=(
Queue("high", Exchange("high"), routing_key="high"),
Queue("default", Exchange("default"), routing_key="default"),
Queue("batch", Exchange("batch"), routing_key="batch"),
),
task_default_queue="default",
task_default_exchange="default",
task_default_routing_key="default",
)
任务定义#
基础任务#
from myapp.celery_app import app
@app.task
def send_email(to: str, subject: str, body: str):
# 调用邮件服务
pass
调用方式:
# 异步执行(推荐)
send_email.delay("user@example.com", "Welcome", "Hello!")
# 带参数的 apply_async
send_email.apply_async(
args=["user@example.com", "Welcome", "Hello!"],
countdown=10, # 10 秒后执行
expires=3600, # 1 小时内没被消费则丢弃
queue="high", # 指定队列
)
bind=True 获取任务实例#
bind=True 让任务方法的第一个参数变成 self(任务实例),可以访问 self.request(任务元信息)和调用 self.retry():
@app.task(bind=True)
def process_order(self, order_id: int):
try:
order = fetch_order(order_id)
charge(order)
except PaymentTemporaryError as exc:
# 手动触发重试
raise self.retry(exc=exc, countdown=60, max_retries=3)
except Exception as exc:
# 记录失败信息到任务元数据
self.update_state(
state="FAILURE",
meta={"order_id": order_id, "error": str(exc)},
)
raise
重试策略#
autoretry_for(推荐)#
比手动 self.retry() 更简洁:
from requests.exceptions import ConnectionError, Timeout
@app.task(
bind=True,
autoretry_for=(ConnectionError, Timeout),
max_retries=5,
retry_backoff=True, # 指数退避:1s, 2s, 4s, 8s, 16s
retry_backoff_max=600, # 最大等待时间 10 分钟
retry_jitter=True, # 加随机抖动,防止重试风暴
)
def call_external_api(self, payload: dict):
resp = requests.post("https://api.example.com/v1/event", json=payload, timeout=30)
resp.raise_for_status()
return resp.json()
retry_backoff=True 开启后每次重试等待时间翻倍,retry_jitter=True 在此基础上加随机抖动,避免大量任务同时重试打垮下游。
区分可重试与不可重试异常#
class TemporaryError(Exception):
"""网络抖动、限流、临时不可用——可以重试"""
class PermanentError(Exception):
"""数据格式错误、业务规则不满足——不应重试"""
@app.task(
autoretry_for=(TemporaryError,),
max_retries=3,
retry_backoff=True,
)
def sync_data(record_id: int):
data = fetch_data(record_id)
if not data:
raise PermanentError(f"record {record_id} not found") # 不会重试
push_to_remote(data) # 可能抛 TemporaryError,会自动重试
任务路由#
按优先级把任务分发到不同队列,再启动不同数量的 Worker 消费:
# celery_app.py 中的路由配置
app.conf.task_routes = {
"myapp.tasks.email.*": {"queue": "high"}, # 用户感知的操作优先处理
"myapp.tasks.report.*": {"queue": "batch"}, # 报表生成放批量队列
"myapp.tasks.sync.*": {"queue": "default"},
}
Worker 启动时指定消费哪个队列:
# 高优先 worker,2 个并发
celery -A myapp worker -Q high -c 2 --loglevel=info
# 批量 worker,4 个并发
celery -A myapp worker -Q batch -c 4 --loglevel=info
# 默认 worker
celery -A myapp worker -Q default -c 4 --loglevel=info
Celery Beat 定时任务#
# beat_schedule.py
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# 每天凌晨 2 点生成日报
"daily-report": {
"task": "myapp.tasks.report.generate_daily_report",
"schedule": crontab(hour=2, minute=0),
"args": (),
"options": {"queue": "batch"},
},
# 每 5 分钟同步一次外部数据
"sync-external": {
"task": "myapp.tasks.sync.sync_external_data",
"schedule": 300, # 秒数
"options": {"queue": "default"},
},
# 每周一早 8 点发周报
"weekly-report": {
"task": "myapp.tasks.report.generate_weekly_report",
"schedule": crontab(day_of_week=1, hour=8, minute=0),
"options": {"queue": "batch"},
},
}
在 celery_app.py 中引用:
app.conf.beat_schedule = CELERYBEAT_SCHEDULE
app.conf.beat_scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
# 或使用文件锁调度器(简单场景):
# app.conf.beat_scheduler = "celery.beat:PersistentScheduler"
K8s 部署#
Worker 和 Beat 分开部署,Beat 只能跑单副本。
Worker Deployment(支持 HPA 横向扩展):
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker
spec:
replicas: 3
selector:
matchLabels:
app: celery-worker
template:
metadata:
labels:
app: celery-worker
spec:
containers:
- name: worker
image: myapp:latest
command:
- celery
- -A
- myapp
- worker
- -Q
- default,high
- -c
- "4"
- --loglevel=info
- --without-heartbeat # K8s 里心跳可能造成误判,关掉
env:
- name: BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 2
memory: 1Gi
lifecycle:
preStop:
exec:
command: ["celery", "-A", "myapp", "control", "shutdown"]
Beat Deployment(replicas 必须为 1):
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-beat
spec:
replicas: 1 # 严禁多副本!
selector:
matchLabels:
app: celery-beat
template:
metadata:
labels:
app: celery-beat
spec:
containers:
- name: beat
image: myapp:latest
command:
- celery
- -A
- myapp
- beat
- --loglevel=info
- -s
- /data/celerybeat-schedule # 调度状态持久化
volumeMounts:
- name: beat-data
mountPath: /data
volumes:
- name: beat-data
persistentVolumeClaim:
claimName: celery-beat-pvc
HPA 自动扩缩容(根据队列积压深度):
队列深度指标需要通过 celery-exporter 暴露给 Prometheus,再用 KEDA 或自定义 HPA 配置:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: celery-worker-scaler
spec:
scaleTargetRef:
name: celery-worker
minReplicaCount: 2
maxReplicaCount: 20
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus:9090
metricName: celery_queue_length
query: celery_queue_length{queue="default"}
threshold: "10" # 队列积压超过 10 就扩容
监控#
Flower 监控面板#
celery -A myapp flower --port=5555 --broker=redis://redis:6379/0
访问 http://flower:5555 可以看到 Worker 状态、任务历史、失败率。
Prometheus 指标采集#
安装 celery-exporter 后,可以采集如下指标:
celery_tasks_total{state="SUCCESS|FAILURE|RETRY"}:任务执行状态计数celery_queue_length{queue="..."}:队列积压深度celery_worker_up{hostname="..."}:Worker 存活状态
告警规则示例:
- alert: CeleryQueueBacklog
expr: celery_queue_length{queue="high"} > 100
for: 5m
annotations:
summary: "高优先队列积压超过 100 条"
- alert: CeleryWorkerDown
expr: celery_worker_up == 0
for: 2m
annotations:
summary: "Celery Worker 已下线"
踩坑记录#
任务序列化:pickle vs json
Celery 默认序列化格式是 pickle,可以传任意 Python 对象,但安全风险极大(反序列化漏洞),并且跨语言、跨版本不兼容。生产环境务必设置 task_serializer="json" 并在 accept_content 中只允许 json。任务参数只传基础类型(str、int、dict、list),不要传 ORM 对象或 dataclass 实例。
Worker 内存泄漏
长期运行的 Worker 进程可能因任务逻辑中的内存泄漏而不断膨胀。Celery 提供了 --max-tasks-per-child 参数,Worker 子进程执行 N 个任务后自动重启:
celery worker --max-tasks-per-child=100
K8s 环境里也可以设置 Pod 的 resources.limits.memory,让 OOM 时自动重启。
Beat 多副本重复执行
Beat 如果不小心启了两个副本(比如滚动更新时短暂重叠),会导致定时任务重复执行。解决方案:
- 配置
podDisruptionBudget确保同一时刻只有 1 个 Beat Pod - 用
django-celery-beat的 DatabaseScheduler,配合 Redis 分布式锁,只让一个实例真正调度(redbeat库) - 任务本身做幂等,即使重复执行也不产生副作用
task_acks_late 与消息重复
开启 task_acks_late=True 后,Worker 崩溃时任务会被重新投递,可能导致重复执行。任务要做好幂等设计,或者维护已处理任务 ID 的去重集合(Redis Set)。
worker_prefetch_multiplier 的影响
默认值是 4,意味着每个 Worker 进程会预取 4 个任务放到本地内存队列。对于执行时间差异很大的任务(比如有的 1 秒,有的 10 分钟),预取会导致任务分配不均。建议设为 1,让任务完成后才取下一个,配合 task_acks_late=True 使用。






