跳过正文
Celery 异步任务详解:任务队列、重试策略与分布式部署

Celery 异步任务详解:任务队列、重试策略与分布式部署

·803 字·4 分钟·
目录

用 Celery 跑过几个服务的异步任务——发邮件、生成报表、调三方 API、定时数据同步。这篇把任务定义、重试策略、队列路由、K8s 部署这几块整理下,外加生产里真正踩到的坑。

Celery 架构
#

Producer(Django/Flask/脚本)
    │
    │  发布任务消息
    ▼
Broker(Redis / RabbitMQ)
    │
    │  消费消息
    ▼
Worker(多进程/多线程/协程)
    │
    │  写结果
    ▼
Result Backend(Redis / PostgreSQL / MongoDB)

四个核心组件:

  • Producer:调用 .delay().apply_async() 发布任务,不关心谁来执行
  • Broker:消息队列,存储待执行的任务消息。Redis 简单够用,RabbitMQ 更可靠(支持持久化、死信队列)
  • Worker:真正执行任务的进程,可以水平扩展
  • Result Backend:存储任务执行结果,如果业务不关心结果可以不配(减少写压力)

Celery Beat 是独立的调度器进程,负责按 crontab/interval 把定时任务投递到 Broker,然后由普通 Worker 执行。


项目结构与初始化
#

myapp/
├── celery_app.py       # Celery 实例
├── tasks/
│   ├── __init__.py
│   ├── email.py        # 邮件相关任务
│   ├── report.py       # 报表生成任务
│   └── sync.py         # 数据同步任务
└── beat_schedule.py    # 定时任务配置

celery_app.py

from celery import Celery
from kombu import Queue, Exchange

app = Celery("myapp")

app.conf.update(
    # Broker & Backend
    broker_url="redis://redis:6379/0",
    result_backend="redis://redis:6379/1",

    # 序列化(生产环境用 json,不要用 pickle)
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],

    # 时区
    timezone="Asia/Shanghai",
    enable_utc=True,

    # Worker 行为
    worker_prefetch_multiplier=1,    # 每个 worker 进程一次只取 1 个任务,防止任务堆积在某个 worker 上
    task_acks_late=True,             # 任务执行完才 ack,防止 worker 崩溃丢任务

    # 任务超时
    task_soft_time_limit=300,   # 软超时:抛 SoftTimeLimitExceeded
    task_time_limit=360,        # 硬超时:强制 kill

    # 队列定义
    task_queues=(
        Queue("high",    Exchange("high"),    routing_key="high"),
        Queue("default", Exchange("default"), routing_key="default"),
        Queue("batch",   Exchange("batch"),   routing_key="batch"),
    ),
    task_default_queue="default",
    task_default_exchange="default",
    task_default_routing_key="default",
)

任务定义
#

基础任务
#

from myapp.celery_app import app

@app.task
def send_email(to: str, subject: str, body: str):
    # 调用邮件服务
    pass

调用方式:

# 异步执行(推荐)
send_email.delay("user@example.com", "Welcome", "Hello!")

# 带参数的 apply_async
send_email.apply_async(
    args=["user@example.com", "Welcome", "Hello!"],
    countdown=10,          # 10 秒后执行
    expires=3600,          # 1 小时内没被消费则丢弃
    queue="high",          # 指定队列
)

bind=True 获取任务实例
#

bind=True 让任务方法的第一个参数变成 self(任务实例),可以访问 self.request(任务元信息)和调用 self.retry()

@app.task(bind=True)
def process_order(self, order_id: int):
    try:
        order = fetch_order(order_id)
        charge(order)
    except PaymentTemporaryError as exc:
        # 手动触发重试
        raise self.retry(exc=exc, countdown=60, max_retries=3)
    except Exception as exc:
        # 记录失败信息到任务元数据
        self.update_state(
            state="FAILURE",
            meta={"order_id": order_id, "error": str(exc)},
        )
        raise

重试策略
#

autoretry_for(推荐)
#

比手动 self.retry() 更简洁:

from requests.exceptions import ConnectionError, Timeout

@app.task(
    bind=True,
    autoretry_for=(ConnectionError, Timeout),
    max_retries=5,
    retry_backoff=True,         # 指数退避:1s, 2s, 4s, 8s, 16s
    retry_backoff_max=600,      # 最大等待时间 10 分钟
    retry_jitter=True,          # 加随机抖动,防止重试风暴
)
def call_external_api(self, payload: dict):
    resp = requests.post("https://api.example.com/v1/event", json=payload, timeout=30)
    resp.raise_for_status()
    return resp.json()

retry_backoff=True 开启后每次重试等待时间翻倍,retry_jitter=True 在此基础上加随机抖动,避免大量任务同时重试打垮下游。

区分可重试与不可重试异常
#

class TemporaryError(Exception):
    """网络抖动、限流、临时不可用——可以重试"""

class PermanentError(Exception):
    """数据格式错误、业务规则不满足——不应重试"""


@app.task(
    autoretry_for=(TemporaryError,),
    max_retries=3,
    retry_backoff=True,
)
def sync_data(record_id: int):
    data = fetch_data(record_id)
    if not data:
        raise PermanentError(f"record {record_id} not found")  # 不会重试
    push_to_remote(data)   # 可能抛 TemporaryError,会自动重试

任务路由
#

按优先级把任务分发到不同队列,再启动不同数量的 Worker 消费:

# celery_app.py 中的路由配置
app.conf.task_routes = {
    "myapp.tasks.email.*":  {"queue": "high"},     # 用户感知的操作优先处理
    "myapp.tasks.report.*": {"queue": "batch"},    # 报表生成放批量队列
    "myapp.tasks.sync.*":   {"queue": "default"},
}

Worker 启动时指定消费哪个队列:

# 高优先 worker,2 个并发
celery -A myapp worker -Q high -c 2 --loglevel=info

# 批量 worker,4 个并发
celery -A myapp worker -Q batch -c 4 --loglevel=info

# 默认 worker
celery -A myapp worker -Q default -c 4 --loglevel=info

Celery Beat 定时任务
#

# beat_schedule.py
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # 每天凌晨 2 点生成日报
    "daily-report": {
        "task": "myapp.tasks.report.generate_daily_report",
        "schedule": crontab(hour=2, minute=0),
        "args": (),
        "options": {"queue": "batch"},
    },
    # 每 5 分钟同步一次外部数据
    "sync-external": {
        "task": "myapp.tasks.sync.sync_external_data",
        "schedule": 300,   # 秒数
        "options": {"queue": "default"},
    },
    # 每周一早 8 点发周报
    "weekly-report": {
        "task": "myapp.tasks.report.generate_weekly_report",
        "schedule": crontab(day_of_week=1, hour=8, minute=0),
        "options": {"queue": "batch"},
    },
}

celery_app.py 中引用:

app.conf.beat_schedule = CELERYBEAT_SCHEDULE
app.conf.beat_scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
# 或使用文件锁调度器(简单场景):
# app.conf.beat_scheduler = "celery.beat:PersistentScheduler"

K8s 部署
#

Worker 和 Beat 分开部署,Beat 只能跑单副本。

Worker Deployment(支持 HPA 横向扩展):

apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: celery-worker
  template:
    metadata:
      labels:
        app: celery-worker
    spec:
      containers:
        - name: worker
          image: myapp:latest
          command:
            - celery
            - -A
            - myapp
            - worker
            - -Q
            - default,high
            - -c
            - "4"
            - --loglevel=info
            - --without-heartbeat   # K8s 里心跳可能造成误判,关掉
          env:
            - name: BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
          resources:
            requests:
              cpu: 500m
              memory: 512Mi
            limits:
              cpu: 2
              memory: 1Gi
          lifecycle:
            preStop:
              exec:
                command: ["celery", "-A", "myapp", "control", "shutdown"]

Beat Deployment(replicas 必须为 1):

apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-beat
spec:
  replicas: 1   # 严禁多副本!
  selector:
    matchLabels:
      app: celery-beat
  template:
    metadata:
      labels:
        app: celery-beat
    spec:
      containers:
        - name: beat
          image: myapp:latest
          command:
            - celery
            - -A
            - myapp
            - beat
            - --loglevel=info
            - -s
            - /data/celerybeat-schedule   # 调度状态持久化
          volumeMounts:
            - name: beat-data
              mountPath: /data
      volumes:
        - name: beat-data
          persistentVolumeClaim:
            claimName: celery-beat-pvc

HPA 自动扩缩容(根据队列积压深度):

队列深度指标需要通过 celery-exporter 暴露给 Prometheus,再用 KEDA 或自定义 HPA 配置:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: celery-worker-scaler
spec:
  scaleTargetRef:
    name: celery-worker
  minReplicaCount: 2
  maxReplicaCount: 20
  triggers:
    - type: prometheus
      metadata:
        serverAddress: http://prometheus:9090
        metricName: celery_queue_length
        query: celery_queue_length{queue="default"}
        threshold: "10"   # 队列积压超过 10 就扩容

监控
#

Flower 监控面板
#

celery -A myapp flower --port=5555 --broker=redis://redis:6379/0

访问 http://flower:5555 可以看到 Worker 状态、任务历史、失败率。

Prometheus 指标采集
#

安装 celery-exporter 后,可以采集如下指标:

  • celery_tasks_total{state="SUCCESS|FAILURE|RETRY"}:任务执行状态计数
  • celery_queue_length{queue="..."}:队列积压深度
  • celery_worker_up{hostname="..."}:Worker 存活状态

告警规则示例:

- alert: CeleryQueueBacklog
  expr: celery_queue_length{queue="high"} > 100
  for: 5m
  annotations:
    summary: "高优先队列积压超过 100 条"

- alert: CeleryWorkerDown
  expr: celery_worker_up == 0
  for: 2m
  annotations:
    summary: "Celery Worker 已下线"

踩坑记录
#

任务序列化:pickle vs json

Celery 默认序列化格式是 pickle,可以传任意 Python 对象,但安全风险极大(反序列化漏洞),并且跨语言、跨版本不兼容。生产环境务必设置 task_serializer="json" 并在 accept_content 中只允许 json。任务参数只传基础类型(str、int、dict、list),不要传 ORM 对象或 dataclass 实例。

Worker 内存泄漏

长期运行的 Worker 进程可能因任务逻辑中的内存泄漏而不断膨胀。Celery 提供了 --max-tasks-per-child 参数,Worker 子进程执行 N 个任务后自动重启:

celery worker --max-tasks-per-child=100

K8s 环境里也可以设置 Pod 的 resources.limits.memory,让 OOM 时自动重启。

Beat 多副本重复执行

Beat 如果不小心启了两个副本(比如滚动更新时短暂重叠),会导致定时任务重复执行。解决方案:

  1. 配置 podDisruptionBudget 确保同一时刻只有 1 个 Beat Pod
  2. django-celery-beat 的 DatabaseScheduler,配合 Redis 分布式锁,只让一个实例真正调度(redbeat 库)
  3. 任务本身做幂等,即使重复执行也不产生副作用

task_acks_late 与消息重复

开启 task_acks_late=True 后,Worker 崩溃时任务会被重新投递,可能导致重复执行。任务要做好幂等设计,或者维护已处理任务 ID 的去重集合(Redis Set)。

worker_prefetch_multiplier 的影响

默认值是 4,意味着每个 Worker 进程会预取 4 个任务放到本地内存队列。对于执行时间差异很大的任务(比如有的 1 秒,有的 10 分钟),预取会导致任务分配不均。建议设为 1,让任务完成后才取下一个,配合 task_acks_late=True 使用。

Wenzhuo Huang
作者
Wenzhuo Huang
搞运维的工程师,写代码的运维人。专注 Kubernetes、AWS、GitOps 与基础设施可靠性。这个博客既是我的技术笔记本,也是我踩过的坑的受害者档案。

相关文章

Python 自动化运维:从脚本到完整工具的工程化实践

·1559 字·8 分钟
系统梳理 Python 运维自动化的工程化方法:boto3 操作 AWS 资源、Kubernetes Python SDK 使用、Click/Typer CLI 框架选型、数据库批量运维脚本、钉钉 Webhook 集成,以及类型注解与错误处理的实践经验。