跳过正文

Python 操作 Kubernetes:kubernetes-client 实战

·2156 字·11 分钟·
目录

安装与认证
#

pip install kubernetes

认证方式
#

from kubernetes import client, config
from kubernetes.client import ApiClient

# ── 方式1:读取本地 kubeconfig(开发/本地调试)──
config.load_kube_config()                          # 默认 ~/.kube/config
config.load_kube_config(config_file="/path/to/kubeconfig")
config.load_kube_config(context="prod-cluster")    # 指定 context

# ── 方式2:集群内认证(Pod 内运行时)──
# 读取 /var/run/secrets/kubernetes.io/serviceaccount/
config.load_incluster_config()

# ── 方式3:自动判断(推荐写法)──
def load_k8s_config(kubeconfig: str | None = None, context: str | None = None) -> None:
    """自动选择认证方式:优先 in-cluster,其次 kubeconfig。"""
    try:
        config.load_incluster_config()
        print("使用 in-cluster 认证")
    except config.config_exception.ConfigException:
        config.load_kube_config(config_file=kubeconfig, context=context)
        print(f"使用 kubeconfig 认证  context={context or 'default'}")


# ── 方式4:手动指定 API Server(适合多集群)──
configuration = client.Configuration()
configuration.host = "https://10.0.0.1:6443"
configuration.verify_ssl = False
configuration.api_key["authorization"] = "Bearer eyJhbGci..."

with ApiClient(configuration) as api_client:
    v1 = client.CoreV1Api(api_client)
    pods = v1.list_pod_for_all_namespaces()

CoreV1Api:Pod 操作
#

from kubernetes import client, config
from kubernetes.client.rest import ApiException

config.load_kube_config()
v1 = client.CoreV1Api()


# ── 列出 Pod ──────────────────────────────────────────────────────────────────
def list_pods(namespace: str = "default", label_selector: str = "") -> list:
    """列出指定命名空间的 Pod。"""
    resp = v1.list_namespaced_pod(
        namespace=namespace,
        label_selector=label_selector,   # 如 "app=nginx,env=prod"
    )
    return resp.items


# 列出所有命名空间
def list_all_pods(field_selector: str = "") -> list:
    """列出所有命名空间的 Pod。"""
    resp = v1.list_pod_for_all_namespaces(field_selector=field_selector)
    return resp.items


# 使用示例
pods = list_pods("kube-system")
for pod in pods:
    phase = pod.status.phase
    node = pod.spec.node_name
    print(f"  {pod.metadata.name:<50} {phase:<12} {node}")


# ── 获取单个 Pod ──────────────────────────────────────────────────────────────
def get_pod(name: str, namespace: str = "default"):
    try:
        return v1.read_namespaced_pod(name=name, namespace=namespace)
    except ApiException as e:
        if e.status == 404:
            return None
        raise


# ── Pod 状态分析 ──────────────────────────────────────────────────────────────
def get_pod_status_summary(pod) -> dict:
    """提取 Pod 关键状态信息。"""
    meta = pod.metadata
    spec = pod.spec
    status = pod.status

    # 容器重启次数
    restart_counts = []
    container_statuses = status.container_statuses or []
    for cs in container_statuses:
        restart_counts.append({
            "name": cs.name,
            "restarts": cs.restart_count,
            "ready": cs.ready,
            "state": list(cs.state.to_dict().keys())[0] if cs.state else "unknown",
        })

    return {
        "name": meta.name,
        "namespace": meta.namespace,
        "phase": status.phase,
        "node": spec.node_name,
        "pod_ip": status.pod_ip,
        "start_time": meta.creation_timestamp,
        "containers": restart_counts,
        "conditions": [
            {"type": c.type, "status": c.status}
            for c in (status.conditions or [])
        ],
    }


# ── 删除 Pod ──────────────────────────────────────────────────────────────────
def delete_pod(name: str, namespace: str = "default", grace_period: int = 0) -> bool:
    try:
        v1.delete_namespaced_pod(
            name=name,
            namespace=namespace,
            grace_period_seconds=grace_period,
        )
        print(f"已删除 Pod: {namespace}/{name}")
        return True
    except ApiException as e:
        print(f"删除 Pod 失败: {e.status} {e.reason}")
        return False


# ── 在 Pod 中执行命令(exec)─────────────────────────────────────────────────
from kubernetes.stream import stream


def exec_in_pod(
    pod_name: str,
    namespace: str,
    command: list[str],
    container: str | None = None,
    timeout: int = 30,
) -> tuple[str, str]:
    """
    在 Pod 中执行命令,返回 (stdout, stderr)。

    示例:
        out, err = exec_in_pod("nginx-abc123", "default", ["nginx", "-t"])
    """
    kwargs = dict(
        name=pod_name,
        namespace=namespace,
        command=command,
        stderr=True,
        stdin=False,
        stdout=True,
        tty=False,
        _preload_content=False,
    )
    if container:
        kwargs["container"] = container

    resp = stream(v1.connect_get_namespaced_pod_exec, **kwargs)
    resp.run_forever(timeout=timeout)

    stdout = resp.read_stdout(timeout=5) or ""
    stderr = resp.read_stderr(timeout=5) or ""
    return stdout, stderr


# ── 获取 Pod 日志 ─────────────────────────────────────────────────────────────
def get_pod_logs(
    pod_name: str,
    namespace: str = "default",
    container: str | None = None,
    tail_lines: int = 100,
    previous: bool = False,
) -> str:
    """获取 Pod 日志。previous=True 获取上一次容器的日志(崩溃调试用)。"""
    try:
        return v1.read_namespaced_pod_log(
            name=pod_name,
            namespace=namespace,
            container=container,
            tail_lines=tail_lines,
            previous=previous,
        )
    except ApiException as e:
        return f"获取日志失败: {e.reason}"

AppsV1Api:Deployment 操作
#

from kubernetes import client, config

config.load_kube_config()
apps_v1 = client.AppsV1Api()


# ── 列出 Deployment ───────────────────────────────────────────────────────────
def list_deployments(namespace: str = "default") -> list:
    resp = apps_v1.list_namespaced_deployment(namespace=namespace)
    return resp.items


# 全命名空间
def list_all_deployments() -> list:
    resp = apps_v1.list_deployment_for_all_namespaces()
    return resp.items


# ── 修改副本数 ────────────────────────────────────────────────────────────────
def scale_deployment(name: str, namespace: str, replicas: int) -> bool:
    """扩缩容 Deployment。"""
    try:
        body = {"spec": {"replicas": replicas}}
        apps_v1.patch_namespaced_deployment_scale(
            name=name,
            namespace=namespace,
            body=body,
        )
        print(f"已将 {namespace}/{name} 副本数调整为 {replicas}")
        return True
    except client.ApiException as e:
        print(f"扩缩容失败: {e.reason}")
        return False


# ── Rollout Restart(触发滚动重启)────────────────────────────────────────────
import datetime


def rollout_restart(name: str, namespace: str = "default") -> bool:
    """
    触发 Deployment 滚动重启(等价于 kubectl rollout restart deployment/xxx)。
    原理:给 spec.template.metadata.annotations 加一个时间戳 annotation。
    """
    now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    body = {
        "spec": {
            "template": {
                "metadata": {
                    "annotations": {
                        "kubectl.kubernetes.io/restartedAt": now,
                    }
                }
            }
        }
    }
    try:
        apps_v1.patch_namespaced_deployment(name=name, namespace=namespace, body=body)
        print(f"已触发 {namespace}/{name} 滚动重启")
        return True
    except client.ApiException as e:
        print(f"触发重启失败: {e.reason}")
        return False


# ── 更新镜像 ──────────────────────────────────────────────────────────────────
def update_image(
    deployment_name: str,
    namespace: str,
    container_name: str,
    new_image: str,
) -> bool:
    """更新指定容器的镜像。"""
    try:
        deploy = apps_v1.read_namespaced_deployment(deployment_name, namespace)
        for container in deploy.spec.template.spec.containers:
            if container.name == container_name:
                container.image = new_image
                break
        else:
            print(f"找不到容器: {container_name}")
            return False

        apps_v1.replace_namespaced_deployment(deployment_name, namespace, deploy)
        print(f"已更新 {namespace}/{deployment_name}/{container_name} -> {new_image}")
        return True
    except client.ApiException as e:
        print(f"更新镜像失败: {e.reason}")
        return False


# ── 获取 Deployment 状态 ──────────────────────────────────────────────────────
def get_deployment_status(name: str, namespace: str) -> dict:
    deploy = apps_v1.read_namespaced_deployment(name, namespace)
    status = deploy.status
    return {
        "name": name,
        "namespace": namespace,
        "desired": deploy.spec.replicas,
        "ready": status.ready_replicas or 0,
        "available": status.available_replicas or 0,
        "updated": status.updated_replicas or 0,
        "unavailable": status.unavailable_replicas or 0,
    }

自定义资源(CustomObjectsApi)
#

from kubernetes import client, config

config.load_kube_config()
custom_api = client.CustomObjectsApi()

GROUP = "networking.istio.io"
VERSION = "v1alpha3"
PLURAL = "virtualservices"


# ── 列出 CRD 资源 ─────────────────────────────────────────────────────────────
def list_crd_resources(
    group: str,
    version: str,
    plural: str,
    namespace: str | None = None,
) -> list[dict]:
    if namespace:
        resp = custom_api.list_namespaced_custom_object(
            group=group, version=version, plural=plural, namespace=namespace
        )
    else:
        resp = custom_api.list_cluster_custom_object(
            group=group, version=version, plural=plural
        )
    return resp.get("items", [])


# ── 获取单个 CRD 资源 ─────────────────────────────────────────────────────────
def get_crd_resource(
    group: str, version: str, plural: str,
    name: str, namespace: str,
) -> dict | None:
    try:
        return custom_api.get_namespaced_custom_object(
            group=group, version=version, plural=plural,
            namespace=namespace, name=name,
        )
    except client.ApiException as e:
        if e.status == 404:
            return None
        raise


# ── 创建/更新 CRD 资源 ────────────────────────────────────────────────────────
def apply_crd_resource(
    group: str, version: str, plural: str,
    namespace: str, body: dict,
) -> dict:
    """Create or Replace(简单的 apply 实现)。"""
    name = body["metadata"]["name"]
    existing = get_crd_resource(group, version, plural, name, namespace)

    if existing:
        body["metadata"]["resourceVersion"] = existing["metadata"]["resourceVersion"]
        return custom_api.replace_namespaced_custom_object(
            group=group, version=version, plural=plural,
            namespace=namespace, name=name, body=body,
        )
    else:
        return custom_api.create_namespaced_custom_object(
            group=group, version=version, plural=plural,
            namespace=namespace, body=body,
        )


# 示例:操作 HPA(autoscaling/v2)
HPA_GROUP = "autoscaling"
HPA_VERSION = "v2"
HPA_PLURAL = "horizontalpodautoscalers"


def get_hpa_status(name: str, namespace: str) -> dict:
    v2 = client.AutoscalingV2Api()
    hpa = v2.read_namespaced_horizontal_pod_autoscaler(name, namespace)
    return {
        "name": name,
        "min_replicas": hpa.spec.min_replicas,
        "max_replicas": hpa.spec.max_replicas,
        "current_replicas": hpa.status.current_replicas,
        "desired_replicas": hpa.status.desired_replicas,
    }

Watch 机制:监听资源变化
#

from kubernetes import client, config, watch

config.load_kube_config()
v1 = client.CoreV1Api()


def watch_pods(namespace: str = "default", timeout_seconds: int = 60) -> None:
    """监听 Pod 事件(ADDED/MODIFIED/DELETED)。"""
    w = watch.Watch()
    print(f"开始监听 {namespace} 命名空间的 Pod 事件...")

    try:
        for event in w.stream(
            v1.list_namespaced_pod,
            namespace=namespace,
            timeout_seconds=timeout_seconds,
        ):
            event_type = event["type"]           # ADDED / MODIFIED / DELETED
            pod = event["object"]
            name = pod.metadata.name
            phase = pod.status.phase

            print(f"[{event_type}] Pod: {name}  Phase: {phase}")

            # 响应事件
            if event_type == "ADDED" and phase == "Pending":
                print(f"  新 Pending Pod: {name}")
            elif event_type == "MODIFIED" and phase == "Failed":
                print(f"  Pod 失败: {name}")
    except Exception as e:
        print(f"Watch 中断: {e}")
    finally:
        w.stop()


# ── 带重连的 Watch(生产可用)────────────────────────────────────────────────
import time
import logging

logger = logging.getLogger(__name__)


def watch_with_reconnect(
    list_func,
    event_handler,
    namespace: str | None = None,
    label_selector: str = "",
    reconnect_delay: float = 5.0,
) -> None:
    """带自动重连的 Watch,适合长期运行的控制器。"""
    resource_version = ""
    w = watch.Watch()

    while True:
        try:
            kwargs = {
                "timeout_seconds": 300,
                "resource_version": resource_version,
                "label_selector": label_selector,
            }
            if namespace:
                kwargs["namespace"] = namespace
                stream_iter = w.stream(list_func, **kwargs)
            else:
                stream_iter = w.stream(list_func, **kwargs)

            for event in stream_iter:
                obj = event["object"]
                resource_version = obj.metadata.resource_version
                event_handler(event["type"], obj)

        except client.ApiException as e:
            if e.status == 410:   # Gone,resource_version 过期
                resource_version = ""
                logger.warning("resource_version 过期,重新全量 List")
            else:
                logger.error(f"ApiException: {e}")
                time.sleep(reconnect_delay)
        except Exception as e:
            logger.error(f"Watch 异常: {e}{reconnect_delay}s 后重连")
            time.sleep(reconnect_delay)

实战:K8s 巡检脚本
#

检查所有命名空间的 Pending Pod 和频繁重启的容器,输出报告,支持钉钉告警。

#!/usr/bin/env python3
"""
k8s_inspector.py — Kubernetes 集群巡检脚本

功能:
  1. 检查所有命名空间下的 Pending Pod(超过指定时间)
  2. 检查频繁重启的容器(重启次数超过阈值)
  3. 检查 Deployment 不可用副本
  4. 输出格式化报告
  5. 可选:通过 Webhook 发送告警

用法:
    python k8s_inspector.py
    python k8s_inspector.py --context prod-cluster --pending-minutes 10
    python k8s_inspector.py --restart-threshold 5 --webhook https://oapi.dingtalk.com/...
"""

from __future__ import annotations

import argparse
import json
import logging
import sys
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Any

import requests
from kubernetes import client, config
from kubernetes.client.rest import ApiException

# ── 日志 ──────────────────────────────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)


# ── 数据结构 ──────────────────────────────────────────────────────────────────
@dataclass
class PendingPodIssue:
    namespace: str
    pod_name: str
    node: str
    pending_minutes: float
    reason: str


@dataclass
class RestartIssue:
    namespace: str
    pod_name: str
    container_name: str
    restart_count: int
    last_state: str


@dataclass
class DeploymentIssue:
    namespace: str
    deployment_name: str
    desired: int
    available: int
    unavailable: int


@dataclass
class InspectionReport:
    cluster_context: str
    checked_at: str
    pending_pods: list[PendingPodIssue] = field(default_factory=list)
    restart_issues: list[RestartIssue] = field(default_factory=list)
    deployment_issues: list[DeploymentIssue] = field(default_factory=list)

    @property
    def has_issues(self) -> bool:
        return bool(self.pending_pods or self.restart_issues or self.deployment_issues)

    @property
    def total_issues(self) -> int:
        return len(self.pending_pods) + len(self.restart_issues) + len(self.deployment_issues)


# ── 巡检逻辑 ──────────────────────────────────────────────────────────────────
def check_pending_pods(
    v1: client.CoreV1Api,
    pending_threshold_minutes: int = 5,
) -> list[PendingPodIssue]:
    """找出所有 Pending 超过阈值的 Pod。"""
    issues = []
    now = datetime.now(timezone.utc)

    try:
        pods = v1.list_pod_for_all_namespaces(
            field_selector="status.phase=Pending"
        )
    except ApiException as e:
        logger.error(f"列出 Pending Pod 失败: {e}")
        return issues

    for pod in pods.items:
        meta = pod.metadata
        status = pod.status

        created_at = meta.creation_timestamp
        if created_at is None:
            continue

        pending_seconds = (now - created_at).total_seconds()
        pending_minutes = pending_seconds / 60

        if pending_minutes < pending_threshold_minutes:
            continue

        # 提取 Pending 原因
        reason = "Unknown"
        for condition in (status.conditions or []):
            if condition.type == "PodScheduled" and condition.status != "True":
                reason = condition.reason or condition.message or "Unschedulable"
                break
            elif condition.type == "ContainersReady" and condition.status != "True":
                reason = condition.reason or "ContainersNotReady"

        issues.append(PendingPodIssue(
            namespace=meta.namespace,
            pod_name=meta.name,
            node=pod.spec.node_name or "未调度",
            pending_minutes=round(pending_minutes, 1),
            reason=reason,
        ))

    return issues


def check_restart_issues(
    v1: client.CoreV1Api,
    restart_threshold: int = 10,
) -> list[RestartIssue]:
    """找出重启次数超过阈值的容器。"""
    issues = []

    try:
        pods = v1.list_pod_for_all_namespaces()
    except ApiException as e:
        logger.error(f"列出所有 Pod 失败: {e}")
        return issues

    for pod in pods.items:
        meta = pod.metadata
        status = pod.status

        for cs in (status.container_statuses or []):
            if cs.restart_count < restart_threshold:
                continue

            # 获取上次退出状态
            last_state = "unknown"
            if cs.last_state and cs.last_state.terminated:
                t = cs.last_state.terminated
                last_state = f"exit={t.exit_code} reason={t.reason or 'unknown'}"

            issues.append(RestartIssue(
                namespace=meta.namespace,
                pod_name=meta.name,
                container_name=cs.name,
                restart_count=cs.restart_count,
                last_state=last_state,
            ))

    # 按重启次数降序
    return sorted(issues, key=lambda x: x.restart_count, reverse=True)


def check_deployment_issues(
    apps_v1: client.AppsV1Api,
) -> list[DeploymentIssue]:
    """找出有不可用副本的 Deployment。"""
    issues = []

    try:
        deploys = apps_v1.list_deployment_for_all_namespaces()
    except ApiException as e:
        logger.error(f"列出 Deployment 失败: {e}")
        return issues

    for deploy in deploys.items:
        meta = deploy.metadata
        status = deploy.status
        spec = deploy.spec

        desired = spec.replicas or 0
        available = status.available_replicas or 0
        unavailable = status.unavailable_replicas or 0

        if unavailable > 0 or available < desired:
            issues.append(DeploymentIssue(
                namespace=meta.namespace,
                deployment_name=meta.name,
                desired=desired,
                available=available,
                unavailable=unavailable,
            ))

    return issues


# ── 报告输出 ──────────────────────────────────────────────────────────────────
def format_report(report: InspectionReport) -> str:
    lines = []
    sep = "=" * 70

    lines.append(sep)
    lines.append(f"  K8s 巡检报告")
    lines.append(f"  集群: {report.cluster_context}")
    lines.append(f"  时间: {report.checked_at}")
    lines.append(f"  问题总计: {report.total_issues}")
    lines.append(sep)

    # Pending Pod
    lines.append(f"\n【Pending Pod】共 {len(report.pending_pods)} 个")
    if report.pending_pods:
        lines.append(f"  {'命名空间':<20} {'Pod 名称':<40} {'等待时长':>10}  {'原因'}")
        lines.append("  " + "-" * 66)
        for issue in report.pending_pods:
            lines.append(
                f"  {issue.namespace:<20} {issue.pod_name:<40} "
                f"{issue.pending_minutes:>8.1f}m  {issue.reason}"
            )
    else:
        lines.append("  无异常")

    # 重启问题
    lines.append(f"\n【频繁重启容器】共 {len(report.restart_issues)} 个(阈值已在参数中设定)")
    if report.restart_issues:
        lines.append(f"  {'命名空间':<20} {'Pod':<35} {'容器':<20} {'重启次数':>8}  {'上次状态'}")
        lines.append("  " + "-" * 80)
        for issue in report.restart_issues:
            lines.append(
                f"  {issue.namespace:<20} {issue.pod_name:<35} "
                f"{issue.container_name:<20} {issue.restart_count:>8}  {issue.last_state}"
            )
    else:
        lines.append("  无异常")

    # Deployment 问题
    lines.append(f"\n【Deployment 异常】共 {len(report.deployment_issues)} 个")
    if report.deployment_issues:
        lines.append(f"  {'命名空间':<20} {'Deployment':<40} {'期望':>6} {'可用':>6} {'不可用':>8}")
        lines.append("  " + "-" * 66)
        for issue in report.deployment_issues:
            lines.append(
                f"  {issue.namespace:<20} {issue.deployment_name:<40} "
                f"{issue.desired:>6} {issue.available:>6} {issue.unavailable:>8}"
            )
    else:
        lines.append("  无异常")

    lines.append("\n" + sep)
    return "\n".join(lines)


# ── 钉钉告警 ──────────────────────────────────────────────────────────────────
def send_webhook_alert(webhook_url: str, report: InspectionReport) -> None:
    if not report.has_issues:
        return

    lines = [f"**K8s 巡检告警** | 集群: {report.cluster_context}", ""]

    if report.pending_pods:
        lines.append(f"> **Pending Pod**: {len(report.pending_pods)} 个")
        for p in report.pending_pods[:5]:   # 只显示前5个
            lines.append(f"> - `{p.namespace}/{p.pod_name}` 等待 {p.pending_minutes:.0f}分钟,原因: {p.reason}")
        if len(report.pending_pods) > 5:
            lines.append(f"> - ... 还有 {len(report.pending_pods) - 5} 个")

    if report.restart_issues:
        lines.append(f"\n> **频繁重启**: {len(report.restart_issues)} 个容器")
        for r in report.restart_issues[:5]:
            lines.append(f"> - `{r.namespace}/{r.pod_name}/{r.container_name}` 重启 {r.restart_count} 次")

    if report.deployment_issues:
        lines.append(f"\n> **Deployment 异常**: {len(report.deployment_issues)} 个")
        for d in report.deployment_issues[:5]:
            lines.append(f"> - `{d.namespace}/{d.deployment_name}` 期望 {d.desired} 实际 {d.available}")

    content = "\n".join(lines)
    payload = {
        "msgtype": "markdown",
        "markdown": {"title": f"K8s 巡检告警 - {report.total_issues} 个问题", "text": content},
    }

    try:
        resp = requests.post(webhook_url, json=payload, timeout=10)
        resp.raise_for_status()
        logger.info("告警已发送")
    except Exception as e:
        logger.error(f"发送告警失败: {e}")


# ── 主逻辑 ────────────────────────────────────────────────────────────────────
def run_inspection(
    context: str | None,
    pending_threshold: int,
    restart_threshold: int,
    webhook: str | None,
    output_json: str | None,
) -> InspectionReport:
    # 初始化认证
    try:
        config.load_incluster_config()
        ctx = "in-cluster"
    except config.config_exception.ConfigException:
        config.load_kube_config(context=context)
        contexts, active = config.list_kube_config_contexts()
        ctx = (active or {}).get("name", context or "unknown")

    logger.info(f"连接集群: {ctx}")

    v1 = client.CoreV1Api()
    apps_v1 = client.AppsV1Api()

    report = InspectionReport(
        cluster_context=ctx,
        checked_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    )

    logger.info("检查 Pending Pod...")
    report.pending_pods = check_pending_pods(v1, pending_threshold)

    logger.info("检查容器重启次数...")
    report.restart_issues = check_restart_issues(v1, restart_threshold)

    logger.info("检查 Deployment 状态...")
    report.deployment_issues = check_deployment_issues(apps_v1)

    return report


# ── 入口 ──────────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="K8s 集群巡检工具")
    parser.add_argument("--context", help="kubectl context 名称(默认当前 context)")
    parser.add_argument(
        "--pending-minutes", type=int, default=5, metavar="N",
        help="Pending 超过 N 分钟才告警(默认 5)",
    )
    parser.add_argument(
        "--restart-threshold", type=int, default=10, metavar="N",
        help="容器重启次数超过 N 才告警(默认 10)",
    )
    parser.add_argument("--webhook", metavar="URL", help="告警 Webhook URL(钉钉/企微)")
    parser.add_argument("--output-json", metavar="FILE", help="将报告保存为 JSON")
    return parser.parse_args()


def main() -> int:
    args = parse_args()

    report = run_inspection(
        context=args.context,
        pending_threshold=args.pending_minutes,
        restart_threshold=args.restart_threshold,
        webhook=args.webhook,
        output_json=args.output_json,
    )

    # 打印报告
    print(format_report(report))

    # 发送告警
    if args.webhook and report.has_issues:
        send_webhook_alert(args.webhook, report)

    # 输出 JSON
    if args.output_json:
        import json as _json
        from pathlib import Path
        out = {
            "cluster": report.cluster_context,
            "checked_at": report.checked_at,
            "total_issues": report.total_issues,
            "pending_pods": [asdict(p) for p in report.pending_pods],
            "restart_issues": [asdict(r) for r in report.restart_issues],
            "deployment_issues": [asdict(d) for d in report.deployment_issues],
        }
        Path(args.output_json).write_text(
            _json.dumps(out, indent=2, ensure_ascii=False), encoding="utf-8"
        )
        logger.info(f"JSON 报告已写入: {args.output_json}")

    # 有问题返回非零退出码(CI/监控触发用)
    return 1 if report.has_issues else 0


if __name__ == "__main__":
    sys.exit(main())

运行示例
#

# 使用当前 kubeconfig context 巡检
python k8s_inspector.py

# 指定 context,调整告警阈值
python k8s_inspector.py --context prod-us-west --pending-minutes 10 --restart-threshold 5

# 输出 JSON + 发送钉钉告警
python k8s_inspector.py \
  --context prod-cluster \
  --webhook "https://oapi.dingtalk.com/robot/send?access_token=xxx" \
  --output-json /tmp/k8s-report.json

所需权限(RBAC)
#

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: k8s-inspector
rules:
  - apiGroups: [""]
    resources: ["pods", "pods/log", "nodes", "namespaces"]
    verbs: ["get", "list", "watch"]
  - apiGroups: ["apps"]
    resources: ["deployments", "replicasets", "statefulsets", "daemonsets"]
    verbs: ["get", "list", "watch"]
  - apiGroups: ["autoscaling"]
    resources: ["horizontalpodautoscalers"]
    verbs: ["get", "list"]
Wenzhuo Huang
作者
Wenzhuo Huang
搞运维的工程师,写代码的运维人。专注 Kubernetes、AWS、GitOps 与基础设施可靠性。这个博客既是我的技术笔记本,也是我踩过的坑的受害者档案。

相关文章

Python 基础速查(运维向)

·1578 字·8 分钟
运维工程师必备的 Python 基础知识速查,从变量类型到标准库,聚焦实际使用场景

Python 系统与文件操作实战

·1540 字·8 分钟
深入讲解 Python 系统操作,含 subprocess 进程管理、psutil 系统监控,以及一个完整的生产级日志清理脚本

Python 网络编程与 HTTP 请求

·1679 字·8 分钟
从 requests 基础到 httpx 异步,再到并发健康检查脚本,覆盖运维工程师日常 HTTP 操作场景