#!/usr/bin/env python3 from __future__ import annotations import json import os import shlex import subprocess import sys from datetime import datetime, timezone from pathlib import Path ROOT_DIR = Path(__file__).resolve().parents[2] PLATFORM_REPO_DIR = Path( os.environ.get("PLATFORM_REPO_DIR", str(ROOT_DIR / "../unrip3")) ).resolve() DEFAULT_KUBECONFIG_PATH = PLATFORM_REPO_DIR / ".state/hetzner/kubeconfig.yaml" DEFAULT_NAMESPACE = os.environ.get("PROJECT_NAMESPACE", "unrip") DEFAULT_CONFIGMAP_NAME = os.environ.get("CONFIGMAP_NAME", "unrip-config") DEFAULT_REDPANDA_DEPLOYMENT = os.environ.get("REDPANDA_DEPLOYMENT", "redpanda") DEFAULT_REDPANDA_LABEL_SELECTOR = os.environ.get("REDPANDA_LABEL_SELECTOR", "app=redpanda") DEFAULT_REDPANDA_DATA_PATH = "/var/lib/redpanda/data" APP_TOPIC_KEYS = ( "KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE", "KAFKA_TOPIC_NORM_SWAP_DEMAND", "KAFKA_TOPIC_CMD_EXECUTE_TRADE", "KAFKA_TOPIC_EXEC_TRADE_RESULT", ) def resolve_kubeconfig_path() -> str: return os.environ.get("KUBECONFIG_PATH") or os.environ.get( "KUBECONFIG", str(DEFAULT_KUBECONFIG_PATH) ) def build_env() -> dict[str, str]: env = os.environ.copy() kubeconfig_path = resolve_kubeconfig_path() if not Path(kubeconfig_path).exists(): raise SystemExit( f"missing kubeconfig: {kubeconfig_path}\n" "set KUBECONFIG_PATH, KUBECONFIG, or PLATFORM_REPO_DIR" ) env["KUBECONFIG"] = kubeconfig_path return env def run( args: list[str], *, capture_output: bool = True, check: bool = True, input_text: str | None = None, ) -> subprocess.CompletedProcess[str]: return subprocess.run( args, env=build_env(), text=True, capture_output=capture_output, input=input_text, check=check, ) def kubectl( *args: str, capture_output: bool = True, check: bool = True, input_text: str | None = None, ) -> subprocess.CompletedProcess[str]: return run( ["kubectl", *args], capture_output=capture_output, check=check, input_text=input_text, ) def kubectl_json(*args: str) -> dict: return json.loads(kubectl(*args).stdout) def config_value(key: str, namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str: return kubectl( "get", "configmap", configmap_name, "-n", namespace, "-o", f"jsonpath={{.data.{key}}}", ).stdout.strip() def app_topics(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> list[str]: topics: list[str] = [] for key in APP_TOPIC_KEYS: value = config_value(key, namespace=namespace, configmap_name=configmap_name).strip() if value: topics.append(value) return topics def kafka_brokers(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str: return config_value("KAFKA_BROKERS", namespace=namespace, configmap_name=configmap_name) def raw_near_topic(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str: return config_value( "KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE", namespace=namespace, configmap_name=configmap_name, ) def redpanda_pod_name(namespace: str = DEFAULT_NAMESPACE) -> str: payload = kubectl_json( "get", "pods", "-n", namespace, "-l", DEFAULT_REDPANDA_LABEL_SELECTOR, "-o", "json", ) items = payload.get("items", []) if not items: raise SystemExit(f"no Redpanda pod found in namespace {namespace}") return items[0]["metadata"]["name"] def redpanda_exec( *args: str, namespace: str = DEFAULT_NAMESPACE, capture_output: bool = True, check: bool = True, ) -> subprocess.CompletedProcess[str]: return kubectl( "exec", "-n", namespace, f"deploy/{DEFAULT_REDPANDA_DEPLOYMENT}", "--", *args, capture_output=capture_output, check=check, ) def pod_exec( pod_name: str, *args: str, namespace: str = DEFAULT_NAMESPACE, capture_output: bool = True, check: bool = True, ) -> subprocess.CompletedProcess[str]: return kubectl( "exec", "-n", namespace, pod_name, "--", *args, capture_output=capture_output, check=check, ) def probe_path_usage( pod_name: str, path: str, *, namespace: str = DEFAULT_NAMESPACE, ) -> dict[str, object]: shell_path = shlex.quote(path) probe = "\n".join( ( f"df -B1 {shell_path} | awk 'NR==2 {{print $2, $3, $4, $5}}'", f"du -sb {shell_path} 2>/dev/null | awk 'END {{print $1}}'", ) ) result = pod_exec( pod_name, "sh", "-lc", probe, namespace=namespace, ).stdout.strip().splitlines() if not result: raise RuntimeError(f"no usage output for {pod_name}:{path}") fs_fields = result[0].split() if len(fs_fields) != 4: raise RuntimeError(f"unexpected df output for {pod_name}:{path}: {result[0]}") du_bytes = None if len(result) > 1 and result[1].strip(): try: du_bytes = int(result[1].strip()) except ValueError: du_bytes = None return { "filesystem_total_bytes": int(fs_fields[0]), "filesystem_used_bytes": int(fs_fields[1]), "filesystem_available_bytes": int(fs_fields[2]), "filesystem_use_percent": fs_fields[3], "path_bytes": du_bytes, } def parse_k8s_timestamp(value: str | None) -> datetime | None: if not value: return None return datetime.fromisoformat(value.replace("Z", "+00:00")) def now_utc() -> datetime: return datetime.now(timezone.utc) def human_duration(seconds: float | int | None) -> str: if seconds is None: return "-" seconds = int(max(0, seconds)) chunks = [] for unit_seconds, suffix in ( (86400, "d"), (3600, "h"), (60, "m"), (1, "s"), ): if seconds >= unit_seconds or (suffix == "s" and not chunks): value, seconds = divmod(seconds, unit_seconds) if value or suffix == "s": chunks.append(f"{value}{suffix}") if len(chunks) == 2: break return " ".join(chunks) def human_bytes(value: int | float | None) -> str: if value is None: return "-" value = float(value) units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB") unit_index = 0 while value >= 1024 and unit_index < len(units) - 1: value /= 1024 unit_index += 1 if unit_index == 0: return f"{int(value)} {units[unit_index]}" return f"{value:.1f} {units[unit_index]}" def parse_storage_quantity(value: str | None) -> int | None: if not value: return None binary_units = { "Ki": 1024**1, "Mi": 1024**2, "Gi": 1024**3, "Ti": 1024**4, "Pi": 1024**5, } decimal_units = { "K": 1000**1, "M": 1000**2, "G": 1000**3, "T": 1000**4, "P": 1000**5, } for suffix, factor in binary_units.items(): if value.endswith(suffix): return int(float(value[: -len(suffix)]) * factor) for suffix, factor in decimal_units.items(): if value.endswith(suffix): return int(float(value[: -len(suffix)]) * factor) return int(float(value)) def print_table(headers: list[str], rows: list[list[str]]) -> None: widths = [len(header) for header in headers] for row in rows: for index, cell in enumerate(row): widths[index] = max(widths[index], len(str(cell))) template = " ".join(f"{{:{width}}}" for width in widths) print(template.format(*headers)) print(template.format(*["-" * width for width in widths])) for row in rows: print(template.format(*row)) def stderr(message: str) -> None: print(message, file=sys.stderr)