#!/usr/bin/env python3 from __future__ import annotations import argparse import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) from common import ( DEFAULT_NAMESPACE, human_bytes, human_duration, kubectl_json, now_utc, parse_k8s_timestamp, parse_storage_quantity, print_table, probe_path_usage, ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Show current unrip deployment state, pod uptime, and mounted storage usage." ) parser.add_argument( "--namespace", default=DEFAULT_NAMESPACE, help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})", ) parser.add_argument( "--include-rootfs", action="store_true", help="Also probe '/' for pods that do not have PVC-backed mounts.", ) parser.add_argument( "--include-completed", action="store_true", help="Include completed Job pods in the pod list.", ) return parser.parse_args() def ready_string(pod: dict) -> str: statuses = pod.get("status", {}).get("containerStatuses", []) ready = sum(1 for status in statuses if status.get("ready")) total = len(statuses) return f"{ready}/{total}" def restart_count(pod: dict) -> int: return sum(status.get("restartCount", 0) for status in pod.get("status", {}).get("containerStatuses", [])) def pvc_mounts_for_pod(pod: dict) -> list[dict[str, str]]: volumes = { volume["name"]: volume.get("persistentVolumeClaim", {}).get("claimName") for volume in pod.get("spec", {}).get("volumes", []) } mounts: list[dict[str, str]] = [] for container in pod.get("spec", {}).get("containers", []): for mount in container.get("volumeMounts", []): claim_name = volumes.get(mount["name"]) if claim_name: mounts.append( { "container": container.get("name", "app"), "claim_name": claim_name, "mount_path": mount["mountPath"], } ) return mounts def is_completed_job_pod(pod: dict) -> bool: owners = pod.get("metadata", {}).get("ownerReferences", []) if not any(owner.get("kind") == "Job" for owner in owners): return False return pod.get("status", {}).get("phase") in {"Succeeded", "Failed"} def main() -> int: args = parse_args() namespace = args.namespace deployment_payload = kubectl_json("get", "deploy", "-n", namespace, "-o", "json") pod_payload = kubectl_json("get", "pods", "-n", namespace, "-o", "json") pvc_payload = kubectl_json("get", "pvc", "-n", namespace, "-o", "json") pvc_requests = { item["metadata"]["name"]: item.get("spec", {}) .get("resources", {}) .get("requests", {}) .get("storage", "") for item in pvc_payload.get("items", []) } deployment_rows: list[list[str]] = [] for item in sorted(deployment_payload.get("items", []), key=lambda value: value["metadata"]["name"]): container = item.get("spec", {}).get("template", {}).get("spec", {}).get("containers", [{}])[0] deployment_rows.append( [ item["metadata"]["name"], f"{item.get('status', {}).get('readyReplicas', 0)}/{item.get('spec', {}).get('replicas', 0)}", str(item.get("status", {}).get("availableReplicas", 0)), container.get("image", "-"), ] ) pod_rows: list[list[str]] = [] storage_rows: list[list[str]] = [] for pod in sorted(pod_payload.get("items", []), key=lambda value: value["metadata"]["name"]): if not args.include_completed and is_completed_job_pod(pod): continue pod_name = pod["metadata"]["name"] start_time = parse_k8s_timestamp( pod.get("status", {}).get("startTime") or pod.get("status", {}).get("containerStatuses", [{}])[0] .get("state", {}) .get("running", {}) .get("startedAt") ) uptime = human_duration((now_utc() - start_time).total_seconds() if start_time else None) image = pod.get("spec", {}).get("containers", [{}])[0].get("image", "-") pod_rows.append( [ pod_name, pod.get("status", {}).get("phase", "-"), ready_string(pod), str(restart_count(pod)), uptime, pod.get("spec", {}).get("nodeName", "-"), image, ] ) mounts = pvc_mounts_for_pod(pod) if not mounts and args.include_rootfs: mounts = [{"claim_name": "-", "mount_path": "/", "container": "app"}] if not mounts: storage_rows.append([pod_name, "-", "no pvc mounts", "-", "-", "-", "-", "-"]) continue for mount in mounts: usage = probe_path_usage(pod_name, mount["mount_path"], namespace=namespace) requested_raw = pvc_requests.get(mount["claim_name"], "") requested_bytes = parse_storage_quantity(requested_raw) storage_rows.append( [ pod_name, mount["mount_path"], mount["claim_name"], human_bytes(requested_bytes) if requested_bytes is not None else (requested_raw or "-"), human_bytes(usage["path_bytes"]), human_bytes(usage["filesystem_used_bytes"]), human_bytes(usage["filesystem_available_bytes"]), str(usage["filesystem_use_percent"]), ] ) print(f"Namespace: {namespace}") print() print("Deployments") print_table(["NAME", "READY", "AVAILABLE", "IMAGE"], deployment_rows) print() print("Pods") print_table(["POD", "PHASE", "READY", "RESTARTS", "UPTIME", "NODE", "IMAGE"], pod_rows) print() print("Storage") print_table( ["POD", "PATH", "CLAIM", "REQUESTED", "PATH_BYTES", "FS_USED", "FS_AVAIL", "USE%"], storage_rows, ) print() print("Note: storage figures come from container-mounted paths. Pods without PVC-backed mounts report no dedicated storage.") return 0 if __name__ == "__main__": raise SystemExit(main())