跳过正文
Playbook:AWS MSK Serverless 迁回 Provisioned——什么时候、为什么、怎么迁

Playbook:AWS MSK Serverless 迁回 Provisioned——什么时候、为什么、怎么迁

·3870 字·19 分钟·
目录
实战手册 - 这篇文章属于一个选集。
§ : 本文

元信息

  • 适用规模:4 个及以上非生产环境共用 MSK Serverless 的中型团队
  • 适用云:AWS(MSK Serverless / MSK Provisioned)
  • 运维负担:实施期 1-2 个工作日/集群,长期与 Serverless 接近
  • 月成本变化:单集群 $540 → ~$75(kafka.t3.small × 2),实测降幅约 86%
  • 最后验证:2026-04-30,AWS MSK 2.8.x + sarama 1.42 + aws-msk-iam-auth 2.2.0

适用场景
#

满足下面任意两条,建议按本 Playbook 评估迁回 Provisioned:

  • 单个 MSK Serverless 集群月费 ≥ $400,但实际 topic 流量长期低于 1 MB/s。
  • 同一组业务在 dev/qa/pre/staging/prod 各开了一个 Serverless 集群,整体支出超过预算。
  • 已经踩到 Serverless 的硬上限:每集群最多 120 个 partition、单 partition 最大 5 MB/s ingress。
  • 客户端是 Go sarama 或某些 Java 老版本,跟 Serverless 的多 broker bootstrap 字符串配合不顺。

不适用:流量持续 > 10 MB/s 且峰谷比超过 5 倍的真实业务主干、团队没有任何 Kafka 运维经验、单集群仅服务一个低频 topic 且月费 < $100。

核心问题
#

「无脑选 Serverless」的代价
#

MSK Serverless 的官方定价由四部分组成:

  • 集群基础费:每个集群 $0.75/小时,相当于固定 $540/月。
  • 吞吐:写入 $0.10/GB、读出 $0.05/GB。
  • 存储:$0.10/GB-月。
  • 消费者最低费:每个活跃 IAM principal $0.0015/小时(约 $1.08/月),按 partition × consumer 计算。

对一个跑 dev/test 工作负载的集群,吞吐和存储几乎可以忽略,账单 90% 以上来自第一项的固定 $540。把这笔钱跟一个 kafka.t3.small × 2 的 Provisioned 集群对比:

项目MSK ServerlessMSK Provisioned (kafka.t3.small × 2)
Broker不可见2 × $0.0456/小时 ≈ $66/月
EBS 100 GB gp3含在吞吐$8/月
跨 AZ 流量实测 < $2/月(低流量场景)
总计~$540/月~$75/月

非生产环境流量稳定在 1 GB/月级别时,Serverless 的「按用量付费」完全不成立——你交的几乎全是占位费。如果再算上每个非生产环境单独开一个集群的常见做法,dev/qa/pre/staging 四份固定费就累计 $2,160 一个月,比一个中型业务系统的 RDS 还贵,但 Kafka 实际只承担一些低频信令传递。账单视角下还有一个常被忽略的副效应:占位费按小时计入 *-Hours 这一项,而真实业务流量分散进 *-Throughput*-Storage 子项,财务侧只看「Amazon MSK 这一行涨了多少」时根本看不出钱花在哪里,月度成本复盘时往往是开发同学自己拉 usage type 拆分才能复盘清楚。这种结构化的钱在团队规模扩张时会复利累积,而且很难单独通过「优化 topic」的手段降下来——正解只能是换计费模型本身。

Serverless 的隐藏天花板
#

这些数字在 AWS 文档里翻得到,但项目立项时没人会在意,往往要等到撞墙才会回头查:

  • 单集群 partition 数上限 120,混用 dev/qa/pre/staging 几个环境很容易撞到。
  • 单 partition ingress 上限 5 MB/s,业务高峰会被悄悄限速。
  • 不支持 Kafka 配置参数自定义(min.insync.replicasretention.ms 例外,其他都锁死)。
  • 不支持 Schema Registry 互通——必须额外用 Glue Schema Registry,迁移时数据不会跟着走。
  • Bootstrap 字符串只暴露 IAM 协议端口,不能改 SASL/SCRAM 或 mTLS 调试。
  • 不暴露真实 broker 列表,前置一个由 AWS 维护的 NLB,客户端断连重连的 backoff 行为不可预期,遇到节点漂移时偶发数秒级中断。
  • 不开放 JMX / metrics endpoint,只能依赖 CloudWatch 上有限的几个指标,遇到分区分布不均、消费者 lag 异常时排障手段非常有限。

真正想要的东西
#

把上面的痛点反过来写,需求很清楚:

  1. 月成本随真实吞吐缩放,而不是为占位费买单。
  2. partition、retention、segment 等参数可调。
  3. 客户端兼容性可控(broker 数量稳定、bootstrap 字符串格式可预期)。
  4. 支持双写、灰度切换,万一回滚不要丢消息。

Provisioned 集群从 kafka.t3.small 起步就能满足,再大的业务可以平滑升 kafka.m5.large 或更大实例,后续按需扩 broker。把这四条要求写到方案评审纪要里,再去筛候选方案,可以避免被「Serverless 才是云原生未来」这种口号干扰决策;选型本质上是一次容量与计费模型的契合度评估,而不是新旧之争。

方案对比
#

方案 A:维持 MSK Serverless
#

适用:单环境、流量稳定且 < 1 MB/s、不在乎每月几百美元差距、团队没人有 Kafka 运维经验。

淘汰理由:本案例下 4 个非生产环境总月费 $2,160,其中约 85% 是固定占位费;同时已经撞到 partition 数预算。

方案 B:迁到 MSK Provisioned(推荐)
#

适用:

  • 流量低且稳定的非生产环境(用 kafka.t3.small × 2)。
  • 流量中等的生产环境(用 kafka.m5.large × 3,跨 3 AZ)。
  • 想自定义 partition / retention / replica.factor 等参数。

成本和容量在低流量段碾压 Serverless,运维负担只比 Serverless 多一点(broker patch 是托管的,磁盘扩容点几下控制台)。

方案 C:完全自建 Kafka(EC2 / EKS)
#

适用:成本极致敏感、团队已有专职 SRE 维护 Kafka、需要独家定制版本。

淘汰理由:算上跨 AZ 流量、EBS、运维人力、补丁窗口、ZooKeeper(或 KRaft)升级成本,自建 Kafka 在 < 100 MB/s 吞吐量级里没经济优势。同时需要自己实现 IAM 鉴权或回退到 SCRAM/mTLS,认证方案下沉到客户端配置和 K8s Secret,整体复杂度比 Provisioned 高一档。

方案 D:换 Confluent Cloud / Aiven
#

适用:想要 Schema Registry / Connect / ksqlDB 一站式、跨云部署、可接受比 MSK 高 30-50% 的单价。

淘汰理由(在本案例):技术栈已经全在 AWS、不需要 KSQL 之类的高阶组件、计费颗粒度更粗(按 cluster size + throughput),低流量小集群依然不便宜。

推荐架构
#

迁移采用 双集群并行 + 顺序切换 模式,保留旧集群 24 小时随时回滚。下面两张图分别给出双集群迁移的阶段切换示意,和单个客户端 SDK 调用栈在 IAM SASL 认证链路上的位置。

flowchart LR
    subgraph S1["阶段 1: 消费者订阅双集群"]
      P1[Producer]
      C1[Consumer]
      OK1[(Old MSK)]
      NK1[(New MSK)]
      P1 -->|write| OK1
      OK1 -->|read| C1
      NK1 -.->|read empty| C1
    end
    subgraph S2["阶段 2: 生产者双写"]
      P2[Producer]
      C2[Consumer]
      OK2[(Old MSK)]
      NK2[(New MSK)]
      P2 -->|dual write| OK2
      P2 -->|dual write| NK2
      OK2 -->|read| C2
      NK2 -->|read| C2
    end
    subgraph S3["阶段 3: 消费者切到新集群"]
      P3[Producer]
      C3[Consumer]
      OK3[(Old MSK)]
      NK3[(New MSK)]
      P3 -->|dual write| OK3
      P3 -->|dual write| NK3
      NK3 -->|read| C3
    end
    subgraph S4["阶段 4: 生产者只写新集群"]
      P4[Producer]
      C4[Consumer]
      OK4[(Old MSK<br/>idle)]
      NK4[(New MSK)]
      P4 -->|write| NK4
      NK4 -->|read| C4
    end
    S1 --> S2 --> S3 --> S4
flowchart TB
    App["应用代码<br/>producer.send / consumer.poll"]
    SDK["Kafka 客户端 SDK<br/>sarama / aiokafka / confluent-kafka"]
    SASL["SASL_SSL Handler<br/>OAUTHBEARER 模式"]
    IAM["aws-msk-iam-sasl-signer<br/>SigV4 签名"]
    STS["AWS STS<br/>AssumeRoleWithWebIdentity (IRSA)"]
    Broker["MSK Broker :9098"]
    App --> SDK
    SDK --> SASL
    SASL --> IAM
    IAM --> STS
    IAM -->|Authorization Token| Broker
    SDK -->|Produce/Fetch RPC| Broker

关键决策点:

  • 是否做生产者双写取决于业务能否接受迁移期间丢若干条消息。本案例 Kafka 不是业务主干(业务主干是 RabbitMQ),可以跳过双写直接「停旧、起新」。本文示例脚本两种模式都给出。
  • 消费者用静态 partition 绑定时,每个 Pod 通过 ordinal 序号绑 partition,所以 StatefulSet 副本数必须 ≥ topic partition 数。这个约束决定了迁移时 partition 数只能持平或扩,不能缩。
  • 配置走 Nacos 集中管理。多个 dataId 一次性 publish,避免 Pod 之间出现「半数连旧、半数连新」的分裂状态。
  • IAM 鉴权改用 IRSA,admin / producer / consumer 各一个 role,不要一把通配符梭哈。这件事看起来跟成本无关,但权限分离换来的最大收益是迁移本身——只要每个 role 的 policy 都明确列了集群 ARN,迁移就能在 IAM 层先做一次端到端 dry-run(建临时 Pod、试连、试写、试读),把潜在的握手错误前置到正式切换之前。

实施步骤
#

1. 拉账单 + 估算新集群成本
#

在动手之前先用 Cost Explorer 拉过去 30 天的实际花费,再对照 Provisioned 估算,避免拍脑袋。

#!/bin/bash
# msk-cost-pull.sh - 用 Cost Explorer 拉过去 30 天 MSK 账单按 usage type 分桶
# 用法:./msk-cost-pull.sh [region]
# 前置:aws cli v2,IAM 拥有 ce:GetCostAndUsage 权限

set -euo pipefail

REGION="${1:-ap-southeast-1}"
END=$(date -u +%Y-%m-%d)
START=$(date -u -d '30 days ago' +%Y-%m-%d)

command -v jq >/dev/null || { echo "需要 jq"; exit 1; }

aws ce get-cost-and-usage \
  --time-period "Start=${START},End=${END}" \
  --granularity DAILY \
  --metrics "UnblendedCost" \
  --filter '{"And":[{"Dimensions":{"Key":"SERVICE","Values":["Amazon Managed Streaming for Apache Kafka"]}},{"Dimensions":{"Key":"REGION","Values":["'"${REGION}"'"]}}]}' \
  --group-by Type=DIMENSION,Key=USAGE_TYPE \
  --output json \
  | jq -r '
      .ResultsByTime
      | map(.Groups[])
      | group_by(.Keys[0])
      | map({usage_type: .[0].Keys[0], cost_usd: (map(.Metrics.UnblendedCost.Amount | tonumber) | add | .*100 | round / 100)})
      | sort_by(-.cost_usd)
      | (["usage_type","cost_usd"] | @tsv),
        (.[] | [.usage_type, .cost_usd] | @tsv)
    ' \
  | column -t -s $'\t'

期望输出形如:

usage_type                            cost_usd
APS1-Kafka-Serverless-Hours           540.00
APS1-Kafka-Serverless-WriteThroughput 0.18
APS1-Kafka-Serverless-Storage         0.04

90% 以上集中在 *-Hours 这一行就是典型的「占位费」信号。

接下来用一个本地估算器对比新方案:

#!/usr/bin/env python3
# msk_estimator.py - Serverless vs Provisioned 月成本估算
# 用法:python3 msk_estimator.py --brokers 2 --instance kafka.t3.small --ebs-gb 100

import argparse

# 价格参考(2026-04,ap-southeast-1,不含税)
HOUR = 730
BROKER_HOURLY = {
    "kafka.t3.small":   0.0456,
    "kafka.m5.large":   0.21,
    "kafka.m5.xlarge":  0.42,
    "kafka.m5.2xlarge": 0.84,
}
EBS_GP3_PER_GB_MONTH = 0.0928
SERVERLESS_CLUSTER_HOURLY = 0.75
SERVERLESS_INGRESS_PER_GB = 0.10
SERVERLESS_EGRESS_PER_GB  = 0.05
SERVERLESS_STORAGE_GB_MO  = 0.10
SERVERLESS_PARTITION_HOUR = 0.0015

def parse_args():
    p = argparse.ArgumentParser()
    p.add_argument("--brokers", type=int, default=2)
    p.add_argument("--instance", default="kafka.t3.small")
    p.add_argument("--ebs-gb", type=int, default=100)
    p.add_argument("--ingress-gb-mo", type=float, default=1.0)
    p.add_argument("--egress-gb-mo",  type=float, default=1.0)
    p.add_argument("--storage-gb",    type=float, default=2.0)
    p.add_argument("--partitions",    type=int,   default=6)
    return p.parse_args()

def provisioned(args):
    if args.instance not in BROKER_HOURLY:
        raise SystemExit(f"unknown instance {args.instance}")
    broker = BROKER_HOURLY[args.instance] * HOUR * args.brokers
    ebs    = args.ebs_gb * EBS_GP3_PER_GB_MONTH * args.brokers
    return {"broker": broker, "ebs": ebs, "total": broker + ebs}

def serverless(args):
    cluster   = SERVERLESS_CLUSTER_HOURLY * HOUR
    ingress   = args.ingress_gb_mo * SERVERLESS_INGRESS_PER_GB
    egress    = args.egress_gb_mo  * SERVERLESS_EGRESS_PER_GB
    storage   = args.storage_gb    * SERVERLESS_STORAGE_GB_MO
    partition = args.partitions    * SERVERLESS_PARTITION_HOUR * HOUR
    return {
        "cluster": cluster, "ingress": ingress, "egress": egress,
        "storage": storage, "partition": partition,
        "total": cluster + ingress + egress + storage + partition,
    }

def main():
    args = parse_args()
    p = provisioned(args)
    s = serverless(args)
    print(f"Provisioned ({args.brokers}x {args.instance}, {args.ebs_gb}GB EBS):")
    for k, v in p.items():
        print(f"  {k:10s} ${v:7.2f}")
    print(f"\nServerless (ingress {args.ingress_gb_mo}GB, partitions {args.partitions}):")
    for k, v in s.items():
        print(f"  {k:10s} ${v:7.2f}")
    delta = s["total"] - p["total"]
    pct   = delta / s["total"] * 100 if s["total"] else 0
    print(f"\n月节省 ${delta:.2f} ({pct:.1f}%)")

if __name__ == "__main__":
    main()

低流量场景跑一次输出大致是 Provisioned total ≈ 79Serverless total ≈ 559、节省 86%,跟我们实测的差别在 1-2 美元以内。

2. 选择 broker 实例规格
#

参考标准:

流量峰值推荐机型broker 数量备注
< 5 MB/s(dev/qa/pre)kafka.t3.small2单 AZ 也行,跨 AZ 更稳
5-30 MB/s(中型 prod)kafka.m5.large3必须跨 3 AZ
30-100 MB/skafka.m5.xlarge3-6关注 EBS IOPS
> 100 MB/skafka.m5.2xlarge+6+单独评估

Provisioned 集群创建后不能改 broker 数量(只能扩,不能缩;机型可以原地变更),所以起步规划要留余量。本案例 4 个非生产环境统一选 kafka.t3.small × 2,topic partition 一律取所有环境消费者副本数的最大值(6),跨环境复用一份 IaC。

容量反向估算公式(实战里够用就行,不用太精细):

broker 数 ≥ ceil(峰值吞吐 MB/s / 单 broker 可承载 MB/s) × replication_factor / partition_per_broker
单 broker 可承载 ≈ 实例规格 × 0.6(留 burst 余量)
partition 数 ≥ max(消费者副本数, broker 数 × 2, 峰值吞吐 / 5 MB/s)

3. 创建 Provisioned 集群
#

把所有参数写成 JSON 提交,避免控制台点击漏字段。

{
  "ClusterName": "svc-foo-qa-kafka-v2",
  "KafkaVersion": "2.8.1",
  "NumberOfBrokerNodes": 2,
  "BrokerNodeGroupInfo": {
    "InstanceType": "kafka.t3.small",
    "ClientSubnets": [
      "subnet-aaaaaaaaaaaaaaaaa",
      "subnet-bbbbbbbbbbbbbbbbb"
    ],
    "SecurityGroups": [
      "sg-xxxxxxxxxxxxxxxxx"
    ],
    "StorageInfo": {
      "EbsStorageInfo": {
        "VolumeSize": 100,
        "ProvisionedThroughput": {
          "Enabled": false
        }
      }
    },
    "ConnectivityInfo": {
      "PublicAccess": {
        "Type": "DISABLED"
      }
    }
  },
  "ClientAuthentication": {
    "Sasl": {
      "Iam": { "Enabled": true }
    },
    "Unauthenticated": { "Enabled": false }
  },
  "EncryptionInfo": {
    "EncryptionAtRest": {
      "DataVolumeKMSKeyId": "alias/aws/kafka"
    },
    "EncryptionInTransit": {
      "ClientBroker": "TLS",
      "InCluster": true
    }
  },
  "EnhancedMonitoring": "PER_TOPIC_PER_BROKER",
  "OpenMonitoring": {
    "Prometheus": {
      "JmxExporter":  { "EnabledInBroker": true },
      "NodeExporter": { "EnabledInBroker": true }
    }
  },
  "LoggingInfo": {
    "BrokerLogs": {
      "CloudWatchLogs": {
        "Enabled": true,
        "LogGroup": "/aws/msk/svc-foo-qa-kafka-v2"
      }
    }
  },
  "Tags": {
    "Environment": "qa",
    "Service": "svc-foo",
    "ManagedBy": "playbook"
  }
}

提交并轮询创建状态:

#!/bin/bash
# msk-create.sh - 创建 Provisioned 集群并等待 ACTIVE
# 用法:./msk-create.sh cluster-config.json [region]
# 前置:kafka:CreateCluster / DescribeCluster 权限;ClientSubnets 至少 2 个不同 AZ

set -euo pipefail

CFG="${1:?cluster-config.json}"
REGION="${2:-ap-southeast-1}"

command -v jq >/dev/null || { echo "需要 jq"; exit 1; }

ARN=$(aws kafka create-cluster --cli-input-json "file://${CFG}" --region "${REGION}" \
        --query 'ClusterArn' --output text)
echo "已创建 ClusterArn=${ARN}"

while true; do
  STATE=$(aws kafka describe-cluster --cluster-arn "${ARN}" --region "${REGION}" \
            --query 'ClusterInfo.State' --output text)
  echo "$(date -u +%H:%M:%S) state=${STATE}"
  case "${STATE}" in
    ACTIVE)  break ;;
    FAILED)  echo "创建失败"; exit 2 ;;
    CREATING|UPDATING) sleep 30 ;;
    *)       echo "未知状态 ${STATE}"; exit 3 ;;
  esac
done

aws kafka get-bootstrap-brokers --cluster-arn "${ARN}" --region "${REGION}" \
  --query 'BootstrapBrokerStringSaslIam' --output text

期望输出:

已创建 ClusterArn=arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:cluster/svc-foo-qa-kafka-v2/...
14:02:11 state=CREATING
...(约 18-25 分钟)...
14:21:43 state=ACTIVE
b-1.svcfooqakafkav2.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-2...

回滚:创建过程中可以 aws kafka delete-cluster --cluster-arn ${ARN},几分钟内消失。

4. 拆分 IRSA role(admin / producer / consumer)
#

每类身份一个 SA,对应一个 IAM role,policy 中明确列出新集群 ARN,不要用 *

Service Account
#

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: kafka-admin
  namespace: svc-foo
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::<ACCOUNT_ID>:role/svc-foo-msk-admin
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: kafka-producer
  namespace: svc-foo
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::<ACCOUNT_ID>:role/svc-foo-msk-producer
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: kafka-consumer
  namespace: svc-foo
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::<ACCOUNT_ID>:role/svc-foo-msk-consumer

Trust Policy(每个 role 一份)
#

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Federated": "arn:aws:iam::<ACCOUNT_ID>:oidc-provider/oidc.eks.ap-southeast-1.amazonaws.com/id/<OIDC_ID>"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "oidc.eks.ap-southeast-1.amazonaws.com/id/<OIDC_ID>:sub": "system:serviceaccount:svc-foo:kafka-admin",
          "oidc.eks.ap-southeast-1.amazonaws.com/id/<OIDC_ID>:aud": "sts.amazonaws.com"
        }
      }
    }
  ]
}

Permission Policy
#

admin(建 topic、改配置、看 metadata):

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:Connect",
        "kafka-cluster:DescribeCluster",
        "kafka-cluster:AlterCluster",
        "kafka-cluster:DescribeTopic",
        "kafka-cluster:CreateTopic",
        "kafka-cluster:DeleteTopic",
        "kafka-cluster:AlterTopic",
        "kafka-cluster:DescribeTopicDynamicConfiguration",
        "kafka-cluster:AlterTopicDynamicConfiguration",
        "kafka-cluster:DescribeGroup",
        "kafka-cluster:AlterGroup",
        "kafka-cluster:DeleteGroup"
      ],
      "Resource": [
        "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:cluster/svc-foo-qa-kafka-v2/*",
        "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:topic/svc-foo-qa-kafka-v2/*/*",
        "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:group/svc-foo-qa-kafka-v2/*/*"
      ]
    }
  ]
}

producer(连集群 + 写指定前缀的 topic):

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["kafka-cluster:Connect"],
      "Resource": "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:cluster/svc-foo-qa-kafka-v2/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:WriteData",
        "kafka-cluster:DescribeTopic",
        "kafka-cluster:WriteDataIdempotently"
      ],
      "Resource": "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:topic/svc-foo-qa-kafka-v2/*/message-*"
    }
  ]
}

consumer(连集群 + 读指定前缀的 topic + 提交 offset):

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["kafka-cluster:Connect"],
      "Resource": "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:cluster/svc-foo-qa-kafka-v2/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:ReadData",
        "kafka-cluster:DescribeTopic"
      ],
      "Resource": "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:topic/svc-foo-qa-kafka-v2/*/message-*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:DescribeGroup",
        "kafka-cluster:AlterGroup"
      ],
      "Resource": "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:group/svc-foo-qa-kafka-v2/*/*"
    }
  ]
}

验证

# 列出 ns 内所有 SA → role 映射,确认 3 个 SA 都注解到位
kubectl -n svc-foo get sa -o json | python3 -c "
import json, sys
for i in json.load(sys.stdin)['items']:
    name = i['metadata']['name']
    role = i['metadata'].get('annotations', {}).get('eks.amazonaws.com/role-arn', 'NONE')
    if role != 'NONE':
        print(f'{name}: {role}')
"
# 起一个临时 Pod 用 admin SA 试连,能 list 即通
kubectl -n svc-foo run iam-check --rm -it --restart=Never \
  --image=amazon/aws-cli:2.15.0 \
  --overrides='{"spec":{"serviceAccountName":"kafka-admin"}}' \
  -- sts get-caller-identity

期望第二条命令输出 Arn: arn:aws:sts::<ACCOUNT_ID>:assumed-role/svc-foo-msk-admin/...,证明 IRSA 链路打通。

5. 建 topic
#

#!/bin/bash
# msk-create-topic.sh - 在新集群上建 topic
# 用法:./msk-create-topic.sh <new-bootstrap> <topic> <partitions>
# 前置:当前 kubectl context 指向迁移目标 EKS

set -euo pipefail
NEW_BS="${1:?bootstrap}"
TOPIC="${2:?topic}"
PART="${3:?partitions}"

kubectl -n svc-foo run kafka-cli --image=confluentinc/cp-kafka:7.5.0 \
  --restart=Never \
  --overrides='{"spec":{"serviceAccountName":"kafka-admin"}}' \
  --command -- sleep 600

trap 'kubectl -n svc-foo delete pod kafka-cli --ignore-not-found' EXIT

kubectl -n svc-foo wait --for=condition=Ready pod/kafka-cli --timeout=60s

kubectl -n svc-foo exec kafka-cli -- bash -c "
  set -e
  curl -sL -o /tmp/iam.jar \
    https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar
  export CLASSPATH=/tmp/iam.jar
  cat > /tmp/client.properties <<'EOF'
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF
  kafka-topics --bootstrap-server '${NEW_BS}' --command-config /tmp/client.properties \
    --create --topic '${TOPIC}' --partitions '${PART}' --replication-factor 2 \
    --config retention.ms=604800000 --config min.insync.replicas=1
  kafka-topics --bootstrap-server '${NEW_BS}' --command-config /tmp/client.properties --describe --topic '${TOPIC}'
"

--replication-factor 2 在 broker = 2 时是上限;min.insync.replicas=1 允许在单 broker 故障时仍可写入,适合非关键链路;生产环境应保持 replication-factor=3 + min.insync.replicas=2

6. 客户端配置(Java / Go / Python 三栈)
#

Java(Spring Kafka)
#

application.yaml

spring:
  kafka:
    bootstrap-servers:
      - b-1.svcfooqakafkav2.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098
      - b-2.svcfooqakafkav2.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: AWS_MSK_IAM
      sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required;
      sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
    consumer:
      group-id: svc-foo-consumer
      auto-offset-reset: earliest
      enable-auto-commit: false
    producer:
      acks: all
      retries: 5
      properties:
        enable.idempotence: true

pom.xml 关键依赖:

<dependency>
  <groupId>software.amazon.msk</groupId>
  <artifactId>aws-msk-iam-auth</artifactId>
  <version>2.2.0</version>
</dependency>

如果不想用官方 callback 类,自己写一个最小实现:

public class MskIamCallbackHandler implements AuthenticateCallbackHandler {
    private String region;

    @Override
    public void configure(Map<String, ?> cfg, String mechanism, List<AppConfigurationEntry> jaas) {
        this.region = (String) cfg.getOrDefault(AWSConfigConstants.AWS_REGION, "ap-southeast-1");
    }

    @Override
    public void handle(Callback[] callbacks) throws IOException {
        for (Callback cb : callbacks) {
            if (cb instanceof OAuthBearerTokenCallback) {
                String token = MSKAuthTokenProvider.generateAuthToken(Region.of(region));
                ((OAuthBearerTokenCallback) cb).token(new BasicOAuthBearerToken(token, ...));
            }
        }
    }

    @Override public void close() {}
}

Go(sarama)
#

package kafkaclient

import (
	"context"
	"strings"
	"time"

	"github.com/IBM/sarama"
	"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
)

type mskTokenProvider struct {
	region string
}

func (m *mskTokenProvider) Token() (*sarama.AccessToken, error) {
	token, _, err := signer.GenerateAuthToken(context.Background(), m.region)
	if err != nil {
		return nil, err
	}
	return &sarama.AccessToken{Token: token}, nil
}

// NewConfig 注意:brokers 必须是逗号分隔字符串切分后的 []string,不要把整段塞进一个元素
func NewConfig(brokersCSV, region string) (sarama.Client, error) {
	brokers := strings.Split(brokersCSV, ",")
	for i := range brokers {
		brokers[i] = strings.TrimSpace(brokers[i])
	}

	cfg := sarama.NewConfig()
	cfg.Version = sarama.V2_8_1_0
	cfg.Net.SASL.Enable = true
	cfg.Net.SASL.Mechanism = sarama.SASLTypeOAuth
	cfg.Net.SASL.TokenProvider = &mskTokenProvider{region: region}
	cfg.Net.TLS.Enable = true
	cfg.Net.DialTimeout = 10 * time.Second
	cfg.Producer.RequiredAcks = sarama.WaitForAll
	cfg.Producer.Idempotent = true
	cfg.Producer.Retry.Max = 5
	cfg.Net.MaxOpenRequests = 1

	return sarama.NewClient(brokers, cfg)
}

如果配置文件里写的是 YAML list,viper.GetStringSlice("Kafka.Addrs") 直接拿到 []string,跳过 strings.Split

Python(confluent-kafka)
#

# kafka_client.py
import socket
from typing import Tuple

from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from confluent_kafka import Consumer, Producer

REGION = "ap-southeast-1"

def oauth_cb(_oauth_config) -> Tuple[str, float]:
    token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(REGION)
    # confluent-kafka 期望返回 (token, expiry_seconds_since_epoch)
    return token, expiry_ms / 1000.0

def make_producer(bootstrap: str) -> Producer:
    return Producer({
        "bootstrap.servers": bootstrap,
        "security.protocol": "SASL_SSL",
        "sasl.mechanism":    "OAUTHBEARER",
        "oauth_cb":          oauth_cb,
        "client.id":         socket.gethostname(),
        "enable.idempotence": True,
        "acks": "all",
        "compression.type": "lz4",
    })

def make_consumer(bootstrap: str, group_id: str) -> Consumer:
    return Consumer({
        "bootstrap.servers": bootstrap,
        "security.protocol": "SASL_SSL",
        "sasl.mechanism":    "OAUTHBEARER",
        "oauth_cb":          oauth_cb,
        "group.id":          group_id,
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
    })

confluent-kafka Python 接受逗号分隔字符串,aiokafka 也接受;只有 sarama 严格要求 []string,这是踩坑 2 的根因。

7. 双集群双写迁移五阶段
#

说明:本案例最终选了「不双写、停旧切新」的简化路径,因为 Kafka 不是业务主干。下面给出的是完整双写五阶段流程,供消息绝不丢的场景参考。简化路径只走阶段 1 + 阶段 4 + 阶段 5(消费者切到双订阅 → 生产者一次切到新集群 → 旧集群下线),跳过双写。

阶段 1:消费者订阅新集群(保持订阅旧集群)
#

把消费者改成同时订阅新旧两个集群(不同的 client,相同的 group.id 或不同 group.id 都行)。新集群 topic 已经建好但暂时没消息。

执行:

# Nacos publish 新增字段
kafka.bootstrap_servers_new=<NEW_BS>
kafka.dual_consume=true

# rollout consumer
kubectl -n svc-foo rollout restart sts/consumer
kubectl -n svc-foo rollout status sts/consumer --timeout=10m

验证:

# 旧集群 lag 正常(offset 增长)
kafka-consumer-groups --bootstrap-server $OLD_BS --command-config /tmp/old.properties \
  --describe --group svc-foo-consumer

# 新集群 lag = 0(暂时没消息)
kafka-consumer-groups --bootstrap-server $NEW_BS --command-config /tmp/new.properties \
  --describe --group svc-foo-consumer

阶段 2:生产者双写
#

生产者代码层启动 dual write,一次成功视为成功:

# producer dual write 简化示意
def send(payload: bytes, key: bytes):
    fut_old = old_producer.produce(TOPIC, value=payload, key=key)
    fut_new = new_producer.produce(TOPIC, value=payload, key=key)
    old_producer.poll(0)
    new_producer.poll(0)
    # 任意一边成功即视为成功,避免拖慢主流程

或者直接配置双 endpoint,靠 SDK 内部 dual write(如果 SDK 不支持,必须代码层处理)。

执行:

# Nacos
kafka.bootstrap_servers_new=<NEW_BS>
kafka.dual_produce=true
kubectl -n svc-foo rollout restart deploy/producer

验证:

# 新集群 BytesInPerSec 起来
aws cloudwatch get-metric-statistics --namespace AWS/Kafka \
  --metric-name BytesInPerSec --dimensions \
  Name="Cluster Name",Value=svc-foo-qa-kafka-v2 \
  Name="Broker ID",Value=1 \
  --start-time $(date -u -d '5 minutes ago' +%FT%TZ) \
  --end-time $(date -u +%FT%TZ) \
  --period 60 --statistics Sum --region ap-southeast-1

# 新集群消费 lag 应该开始增长,说明消息进得来

阶段 3:消费者只订阅新集群
#

确认新集群消费者已经能完整处理消息(业务侧抽样校验)后,关掉旧集群订阅:

# Nacos
kafka.dual_consume=false
kafka.bootstrap_servers=<NEW_BS>     # 主 endpoint 切到新集群
kubectl -n svc-foo rollout restart sts/consumer

验证:

# 旧集群消费 lag 暂停增长但 offset 不动(因为没人订阅了),约 30 分钟后过期消失
kafka-consumer-groups --bootstrap-server $OLD_BS --command-config /tmp/old.properties \
  --describe --group svc-foo-consumer
# 输出 has no active members 是正常的

阶段 4:生产者只写新集群
#

# Nacos
kafka.dual_produce=false
kafka.bootstrap_servers=<NEW_BS>
kubectl -n svc-foo rollout restart deploy/producer

验证:

# 旧集群 BytesInPerSec 应该 30 分钟内归零
aws cloudwatch get-metric-statistics --namespace AWS/Kafka \
  --metric-name BytesInPerSec --dimensions \
  Name="Cluster Name",Value=svc-foo-qa-kafka-old \
  --start-time $(date -u -d '30 minutes ago' +%FT%TZ) \
  --end-time $(date -u +%FT%TZ) \
  --period 60 --statistics Sum --region ap-southeast-1

# 应用日志中无 KafkaConnectionError、SASL handshake failed
kubectl -n svc-foo logs -l app=producer --tail=200 | grep -iE 'kafka|sasl|error' || echo OK

阶段 5:旧集群下线
#

至少观察 24 小时,确认无任何流量、无业务报错:

# 检查任何 pod 是否还在连旧 broker
kubectl -n svc-foo exec deploy/producer -- bash -c \
  'cat /proc/net/tcp | awk "{print \$3}" | sort -u' | python3 -c '
import sys
for line in sys.stdin:
    line=line.strip()
    if not line or line=="local_address": continue
    ip = ".".join(str(int(line.split(":")[0][i:i+2],16)) for i in (6,4,2,0))
    print(ip)
' | sort -u

# 删除旧集群
aws kafka delete-cluster --cluster-arn $OLD_ARN --region ap-southeast-1

8. Schema Registry 迁移
#

如果旧链路用了 Glue Schema Registry,schema 数据本身不会跟着集群走。需要一次手工导出导入。

导出
#

#!/bin/bash
# schema-export.sh - 把一个 Glue Registry 下所有 schema 导出到本地 JSON
# 用法:./schema-export.sh <registry-name> <out-dir> [region]

set -euo pipefail
REG="${1:?registry}"
OUT="${2:?out-dir}"
REGION="${3:-ap-southeast-1}"

mkdir -p "${OUT}"
command -v jq >/dev/null || { echo "需要 jq"; exit 1; }

aws glue list-schemas --registry-id RegistryName="${REG}" --region "${REGION}" \
    --max-results 100 \
    --query 'Schemas[].SchemaName' --output json | jq -r '.[]' | while read -r SCHEMA; do
  echo "导出 ${SCHEMA}"
  aws glue list-schema-versions \
      --schema-id RegistryName="${REG}",SchemaName="${SCHEMA}" \
      --region "${REGION}" --max-results 100 \
      --query 'Schemas[].VersionNumber' --output json | jq -r '.[]' | while read -r VER; do
    aws glue get-schema-version \
        --schema-id RegistryName="${REG}",SchemaName="${SCHEMA}" \
        --schema-version-number VersionNumber="${VER}" \
        --region "${REGION}" \
        > "${OUT}/${SCHEMA}.v${VER}.json"
  done
done

echo "完成,文件落 ${OUT}/"
ls -1 "${OUT}/"

导入到新 Glue Registry
#

#!/bin/bash
# schema-import.sh - 按版本顺序导入到目标 Registry
# 用法:./schema-import.sh <new-registry> <in-dir> [region]

set -euo pipefail
REG="${1:?registry}"
IN="${2:?in-dir}"
REGION="${3:-ap-southeast-1}"

aws glue create-registry --registry-name "${REG}" --region "${REGION}" 2>/dev/null || true

# 按 schema 名字 + 版本号排序,逐个 register / put
ls "${IN}" | sed -E 's/\.v[0-9]+\.json$//' | sort -u | while read -r SCHEMA; do
  ls "${IN}" | grep "^${SCHEMA}\.v" | sort -t v -k2 -n | while read -r FILE; do
    DEF=$(jq -r '.SchemaDefinition' "${IN}/${FILE}")
    DT=$(jq -r '.DataFormat'        "${IN}/${FILE}")
    if aws glue get-schema --schema-id RegistryName="${REG}",SchemaName="${SCHEMA}" \
         --region "${REGION}" >/dev/null 2>&1; then
      aws glue register-schema-version \
        --schema-id RegistryName="${REG}",SchemaName="${SCHEMA}" \
        --schema-definition "${DEF}" --region "${REGION}" >/dev/null
    else
      aws glue create-schema \
        --registry-id RegistryName="${REG}" \
        --schema-name "${SCHEMA}" --data-format "${DT}" \
        --compatibility BACKWARD --schema-definition "${DEF}" \
        --region "${REGION}" >/dev/null
    fi
    echo "imported ${FILE}"
  done
done

客户端切换
#

把客户端配置里的 registry name 改到新 registry,序列化器同步指向:

# Java/Spring 端示意
spring:
  kafka:
    properties:
      schema.registry.url: ""    # 走 AWS Glue 而非 Confluent SR 时留空
      registry.name:        svc-foo-prod-registry
      avro.serialization.required.field: true

如果换到 Confluent Schema Registry(独立部署 / Confluent Cloud),改成:

spring:
  kafka:
    properties:
      schema.registry.url: https://schema-registry.example.aws.com
      basic.auth.credentials.source: USER_INFO
      basic.auth.user.info: <key>:<secret>

验证:业务侧选一个写入 + 消费的 schema id,比对新旧 registry 中的 schema definition sha256sum,要求一致。

9. 回滚脚本
#

旧集群保留 24 小时期间,回滚就是把 Nacos 的 endpoint 改回去 + rollout。中途任何阶段卡住都可执行。

#!/bin/bash
# msk-rollback.sh - 回滚到旧集群
# 用法:./msk-rollback.sh <env> [phase]
# phase: 1|2|3|4 表示当前所处阶段,未传则全量回滚

set -euo pipefail
ENV="${1:?env}"
PHASE="${2:-full}"
BACKUP_DIR="${HOME}/ops-archive/$(date +%F)-msk-rollback-${ENV}"
mkdir -p "${BACKUP_DIR}"

case "${PHASE}" in
  1)
    # 阶段 1 = 仅消费者改了双订阅,回滚只需关闭 dual_consume
    nacos-cli set "${ENV}/svc-foo/kafka.dual_consume" false
    kubectl -n svc-foo rollout restart sts/consumer
    ;;
  2|3)
    # 阶段 2/3 = 已经 dual write 或消费者切新,回滚需要把所有 endpoint 切回旧 + 关 dual
    nacos-cli set "${ENV}/svc-foo/kafka.bootstrap_servers" "${OLD_BS}"
    nacos-cli set "${ENV}/svc-foo/kafka.dual_produce" false
    nacos-cli set "${ENV}/svc-foo/kafka.dual_consume" false
    kubectl -n svc-foo rollout restart deploy/producer sts/consumer
    ;;
  4|full)
    # 阶段 4 = 已经只写新,旧集群可能没流量但 IAM 还在,全量回滚
    nacos-cli set "${ENV}/svc-foo/kafka.bootstrap_servers" "${OLD_BS}"
    nacos-cli set "${ENV}/svc-foo/kafka.dual_produce" false
    nacos-cli set "${ENV}/svc-foo/kafka.dual_consume" false
    kubectl -n svc-foo rollout restart deploy/producer sts/consumer
    # 确认旧 IAM policy 仍含旧集群 ARN(追加模式不需要改)
    aws iam get-role-policy --role-name svc-foo-msk-producer \
      --policy-name msk-access --query 'PolicyDocument' \
      --output json > "${BACKUP_DIR}/producer-policy.json"
    ;;
  *)
    echo "unknown phase ${PHASE}"; exit 1 ;;
esac

echo "回滚动作已下发,验证:"
echo "  kubectl -n svc-foo logs -l app=consumer --tail=200 | grep -iE 'kafka|sasl|error'"
echo "  kafka-consumer-groups --bootstrap-server ${OLD_BS} --describe --group svc-foo-consumer"

预防一切的关键是 IAM policy 追加 而非 替换:始终保留旧集群 ARN,新 Pod 起来就能连旧集群,5 分钟内完全回滚。

踩过的坑
#

坑 1:多个 IRSA role 共存,policy 加漏一个就 50% Pod CrashLoop
#

现象:迁移完成 5 分钟后,某个生产者服务和网关服务的新 Pod 全部 CrashLoop,日志显示 KafkaConnectionError: Connection closed(SASL handshake 被 broker 拒绝)。同集群的消费者一切正常。

根因:同一个 namespace 下其实有两个 ServiceAccount 各自映射不同的 IAM role:

svc-foo-msk SA → role/svc-foo-msk-access      (consumer 用,policy 含通配符 *)
msk-sa       SA → role/svc-foo-prod-msk-role  (producer/gateway 用,硬编码旧集群 ARN)

第二个 role 的 policy 只写了旧 Serverless 集群的 UUID,新集群 ARN 没追加进去,所以 SASL 握手在 IAM 层被直接拒绝。

修复

# 列出 ns 下所有 SA → role 映射
kubectl -n svc-foo get sa -o json | python3 -c "
import json, sys
for i in json.load(sys.stdin)['items']:
    name = i['metadata']['name']
    role = i['metadata'].get('annotations', {}).get('eks.amazonaws.com/role-arn', 'NONE')
    if role != 'NONE':
        print(f'{name}: {role}')
"

# 对每个非 NONE 的 role 都追加新集群 ARN 到 policy(追加模式,旧 ARN 保留)
for ROLE in svc-foo-msk-access svc-foo-prod-msk-role; do
  aws iam get-role-policy --role-name "${ROLE}" --policy-name msk-access \
    --query 'PolicyDocument' --output json > "/tmp/${ROLE}.json"
  jq '.Statement[].Resource += [
        "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:cluster/svc-foo-qa-kafka-v2/*",
        "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:topic/svc-foo-qa-kafka-v2/*/*",
        "arn:aws:kafka:ap-southeast-1:<ACCOUNT_ID>:group/svc-foo-qa-kafka-v2/*/*"
      ]' "/tmp/${ROLE}.json" > "/tmp/${ROLE}.new.json"
  aws iam put-role-policy --role-name "${ROLE}" --policy-name msk-access \
    --policy-document "file:///tmp/${ROLE}.new.json"
done

通用结论:迁移前必须先列清楚 namespace 内所有 SA → role → policy 的三层映射。「一个 role 通吃」看似省事,但一旦做权限分级就会变成跨集群迁移最容易漏的环节。运维 SOP 加一步「dump SA-role mapping」,比事后排查 CrashLoop 便宜得多。这一类问题的共性是:故障表现集中在握手阶段,而握手错误往往被框架包装成笼统的连接异常,光看应用日志容易误判成网络问题。

坑 2:sarama 解析 bootstrap 字符串时不接受逗号分隔
#

现象:生产者和消费者切完一切正常,唯独 Go 网关启动即 panic:

panic: runtime error: too many colons in address b-1.svcfooqakafkav2.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-2.svcfooqakafkav2.xxxxxx.c4.kafka.ap-southeast-1.amazonaws.com:9098

goroutine 1 [running]:
net.parseAddr(...)
github.com/IBM/sarama.(*Broker).Open(...)

根因:MSK 的 get-bootstrap-brokers 返回单字符串 b-1.xxx:9098,b-2.xxx:9098,三种客户端解析行为不同:

客户端单字符串逗号分隔
aiokafka (Python)接受
confluent-kafka-go / librdkafka接受
confluent-kafka (Python)接受
sarama (Go)拒绝,要求 []string

Go 网关用 sarama,配置经 YAML 反序列化后还是单字符串,所以挂掉。

修复(YAML 配置侧)

 Kafka:
-  Addrs: ["b-1.xxx:9098,b-2.xxx:9098"]
+  Addrs:
+    - "b-1.xxx:9098"
+    - "b-2.xxx:9098"

修复(迁移脚本侧),把可能混入逗号的写法自动展开成 list:

import re

def split_addrs(content: str) -> str:
    def repl(m):
        hosts = m.group(1).split(",")
        if len(hosts) > 1:
            return "Addrs: [" + ", ".join(f'"{h.strip()}"' for h in hosts) + "]"
        return m.group(0)
    return re.sub(r'Addrs:\s*\["([^"]+)"\]', repl, content)

修复(Go 代码侧)兜底

brokers := strings.Split(strings.TrimSpace(brokersCSV), ",")
for i := range brokers {
    brokers[i] = strings.TrimSpace(brokers[i])
}
client, err := sarama.NewClient(brokers, cfg)

通用结论:MSK Serverless 通常只暴露一个 endpoint(前面挂 NLB),切到 Provisioned 后 broker 列表才是真实的 N 个 host,客户端兼容性必须按客户端逐个验证,不能假设「原来能跑就还能跑」。改造前先灰度一台 Pod 试新配置。同样的兼容性陷阱在 Confluent Cloud / 自建 Kafka 切换时也存在,根因都是 bootstrap 字符串的解析约定在不同语言生态里没有统一标准。

坑 3:Provisioned 集群创建后不能减 broker 数量
#

现象:起步选了 kafka.t3.small × 2,半年后业务长大想缩到 1 broker 省点钱(流量降回去了),AWS 控制台找不到入口,CLI 也直接报错。

根因:MSK Provisioned 支持 broker 数 和机型 变更,但不支持 broker 数减少(除了删除整个集群)。这是 MSK 控制平面的硬约束,并不是配额能开。

修复(其实只能预防)——容量规划脚本:

#!/usr/bin/env python3
# capacity_plan.py - 给定当前流量 / 目标 12 个月增长,反算最小 broker 数
# 用法:python3 capacity_plan.py --current-mbs 1.5 --growth 3 --partitions 6

import argparse, math

INSTANCE_THROUGHPUT_MBS = {
    "kafka.t3.small":   3,   # burstable,长时间峰值 3 MB/s
    "kafka.m5.large":   30,
    "kafka.m5.xlarge":  60,
    "kafka.m5.2xlarge": 120,
}
RF = 2  # replication factor

def plan(current_mbs, growth, partitions):
    target = current_mbs * growth
    rows = []
    for inst, cap in INSTANCE_THROUGHPUT_MBS.items():
        per_broker = cap * 0.6  # 留 burst 余量
        brokers_for_throughput = math.ceil(target * RF / per_broker)
        brokers_for_partitions = math.ceil(partitions / 50)  # 每 broker 50 partition
        brokers = max(2, brokers_for_throughput, brokers_for_partitions)
        rows.append((inst, brokers, brokers * cap * 0.6))
    print(f"目标 {target:.1f} MB/s,partition {partitions}:")
    for inst, b, cap in rows:
        print(f"  {inst:20s} brokers={b}  容量~{cap:.0f} MB/s")

if __name__ == "__main__":
    p = argparse.ArgumentParser()
    p.add_argument("--current-mbs", type=float, required=True)
    p.add_argument("--growth", type=float, default=3.0)
    p.add_argument("--partitions", type=int, default=6)
    a = p.parse_args()
    plan(a.current_mbs, a.growth, a.partitions)

如果环境本身就要回收,新建小集群 + 双写 + 切换比「原地缩容」更现实。

通用结论:Provisioned 集群的容量决策是 单调 的——只能加不能减。和 Aurora cluster size、ElastiCache shard 数同一类约束。规划阶段做一次「最低可接受配置」的反向估算,把这个数字作为当前的起点,往往就够用一两年。

坑 4:Glue Schema Registry / ACL / Connect 不会自动跟随
#

现象:业务切换后 producer 写消息时报 Schema not found, schemaId=42。原本旧链路挂了一个 Glue Schema Registry,但新集群没绑定。

根因:MSK Serverless 和 Provisioned 都「支持」Glue Schema Registry,但绑定关系是在客户端配置里指定 registry name,跟集群本身无关。然而生产环境往往还会用到这些跟集群伴生的资源:

  • Glue Schema Registry(schema 数据)
  • Kafka ACL(如果用 SCRAM 而非 IAM)
  • MSK Connect 的连接器(source / sink 配置)
  • CloudWatch metric / alarm(按 cluster ARN 过滤)
  • IAM policy 中的 cluster ARN 引用

任何一个忘记迁,都可能在切换后某个低频路径触发故障。

修复:除了步骤 8 的 schema 导出/导入脚本之外,迁移 checklist 里加一节「伴生资源审计」:

# Schema Registry
aws glue list-registries --region $REGION
aws glue list-schemas    --region $REGION --max-results 100

# MSK Connect
aws kafkaconnect list-connectors --region $REGION

# CloudWatch alarm referencing old cluster
aws cloudwatch describe-alarms --region $REGION \
  --query "MetricAlarms[?contains(Dimensions[?Name=='Cluster Name'].Value | [0], 'old-cluster')].AlarmName" \
  --output table

# IAM policy 含旧 ARN 的角色
aws iam list-policies --scope Local \
  --query 'Policies[?contains(PolicyName, `msk`)].Arn' --output text \
  | xargs -I{} aws iam get-policy-version --policy-arn {} --version-id v1 \
      --query 'PolicyVersion.Document' --output json

通用结论:Kafka 集群是一个生态而非单点,迁移前要先把「对外接口」全部画一遍依赖图,不只是 producer/consumer 的 bootstrap。一份「绑了什么东西」清单对每次基础设施迁移都通用。

坑 5:Confluent CLI 与 AWS CLI 命令不可混用
#

现象:团队成员习惯用 Confluent Cloud 的 confluent kafka cluster create / confluent kafka topic create,迁回 AWS 后照搬命令,结果一直报 Error: rest endpoint cluster id ... not found

根因confluent CLI 只对接 Confluent Cloud REST API,不能管理 AWS MSK;AWS MSK 的等价命令是 aws kafka *(建集群)、kafka-topics --bootstrap-server(建 topic、走 IAM 鉴权 jar)。常见对照:

操作Confluent CLIAWS MSK
建集群confluent kafka cluster createaws kafka create-cluster
列集群confluent kafka cluster listaws kafka list-clusters
拿 bootstrapconfluent kafka cluster describeaws kafka get-bootstrap-brokers
建 topicconfluent kafka topic createkafka-topics --create + IAM client.properties
列 topicconfluent kafka topic listkafka-topics --list
看 groupconfluent kafka consumer-group describekafka-consumer-groups --describe
ACLconfluent kafka acl createIAM policy(不在 broker 维度)

修复:在团队 wiki 里贴一份对照表,并在 PR 模板里加一行「确认所有 kafka cli 调用使用了与目标集群匹配的命令族」。

通用结论:迁回 AWS MSK 之后,「集群管理」走 AWS CLI、「topic / group 管理」走原生 kafka-* 工具 + IAM jar,没有统一的高层 CLI。如果团队此前重度依赖 Confluent CLI,要给一段适应期,把习惯命令对照成 Kafka 原生命令。

衡量指标
#

非生产环境(4 个集群)实测对比:

指标迁移前(Serverless)迁移后(Provisioned t3.small × 2)变化
单集群月费$540~$75-86%
4 集群月费合计$2,160~$300-86%
topic partition 上限120不限(受 broker 资源约束)取消瓶颈
端到端写入延迟 P5012 ms8 ms-33%
端到端写入延迟 P9985 ms42 ms-50%
客户端断连率(每天)偶发 NLB 漂移导致 1-3 次< 1 次显著下降
配置可调项retention 等少数几个全量 Kafka 配置+
创建/删除时间15-25 分钟15-25 分钟持平
运维负担几乎为 0broker patch / 磁盘扩容(托管,<1h/年)略增

定性变化:

  • 排障时可以直接用 kafka-topics --describekafka-log-dirs 等命令查内部状态,Serverless 时这些命令大多被屏蔽。
  • 出现客户端兼容性问题时,可以原地降级机型或回滚单个 broker;Serverless 出问题只能开 case 等 AWS。
  • 月度账单可预测,不再需要每月解释一次「为什么 Kafka 占了这么多」给财务。
  • 集群和 IAM role、Schema Registry、Connector 的绑定关系全部进了 IaC 仓库,新增环境从过去的「开 ticket 等 AWS 配额」变成提一次 PR 审批。

局限
#

下列情况里 Serverless 仍然是更好的选择:

  • 流量极稀疏且突发:例如长期 0 流量,但偶尔单天 50 GB 的批处理。Serverless 的弹性更划算,Provisioned 还要为最大瞬时容量预留 broker。
  • 团队完全没人能盯 broker:Provisioned 仍需要偶尔关注 disk usage、replica skew、CPU credit(t3 系列是 burstable)。如果没有这部分精力,Serverless 的「忘了它存在」体验更优。
  • 跨账号 / 跨 region 的统一接入层:Serverless 在 AWS 内部网络拓扑上更扁平,避免 VPC peering / Transit Gateway 的复杂度。
  • 流量已超 100 MB/s 且会继续涨:那应该往 kafka.m5.2xlarge+ 或 Confluent Cloud 走,本 Playbook 的小机型不适合。

另外,已有自建 Kafka 集群且工具链成熟的团队,不应该为了「用上 MSK」而迁——自建的灵活性和成本下限更有竞争力。

后续演进方向
#

  • Provisioned 上叠 KRaft:MSK 已逐步支持 KRaft 替代 ZooKeeper,可以省去 ZooKeeper 节点的存在感和故障面,等 GA 后可以无缝切换。
  • 评估 Confluent Cloud Basic:低流量小集群可以考虑 Confluent Basic($1/小时起,含 Schema Registry),但要算上跨云出向流量。
  • Pulsar 调研:本案例消费者用静态 partition 绑定,本质是想要「一个消费者绑定一个分区」的语义。Pulsar 的 Key_Shared 订阅模式和分层存储可能更贴合,但社区在 AWS 上的托管选项有限,目前停留在调研阶段。
  • Kafka 不是业务主干就考虑去掉:本案例 Kafka 只承载录屏控制信号,体量仅 1 GB/月。下一步会评估能否合并到 RabbitMQ 的一个独立 vhost,彻底移除 MSK 依赖。

最后验证:2026-04-30,AWS MSK 2.8.x + sarama 1.42 + aws-msk-iam-auth 2.2.0 + confluent-kafka-go 2.3 + confluent-kafka-python 2.3 超过 12 个月未复核请重新核对当前定价、SDK 行为和 IRSA 推荐用法。

Wenzhuo Huang
作者
Wenzhuo Huang
搞运维的工程师,写代码的运维人。专注 Kubernetes、AWS、GitOps 与基础设施可靠性。这个博客既是我的技术笔记本,也是我踩过的坑的受害者档案。
实战手册 - 这篇文章属于一个选集。
§ : 本文

相关文章

Playbook:新建子环境的隔离 checklist——一次 ID 撞车污染 10 万条数据的事故复盘

·4626 字·22 分钟
一个共用 RabbitMQ broker、共用 Aurora cluster、自增 id 都从 1 起步的新子环境上线 24 天,向已有环境的老用户项目里灌入了约 10 万条不属于他们的消息。本文复盘事故根因(4 件套同时成立才会爆雷),对比三种隔离方案的成本与风险,给出推荐架构(独立中间件 + 共享集群 + ID 起点错开),并把 7 条强制 checklist 沉淀为新子环境上线门槛,附完整可执行的 aws cli / kubectl / SQL / Go 中间件代码。

Playbook:CI/CD 流水线模板化——3 个标准模板覆盖 80% 服务的端到端实战

·5048 字·24 分钟
在 80+ 条流水线的体量下,每条服务自己拷一份 yaml 是工程债:字段命名漂移、改一次通知模板要改 80 处、新人不知道照哪条抄。本文把方案从「思路」推进到「拿来即用」:每个标准模板给完整 YAML(含 anchors / 变量组绑定 / 审批节点)、对应 GitHub Actions reusable workflow、Jenkins shared library;附 create-pipeline.sh 端到端脚本、变量组管理 API 调用、模板回归测试 dry-run;7 个云效官方文档不写的硬约束(schedule 不工作 / step envs 失效 / stage 间永远线性渲染等)每个含完整修复 + 通用结论。