unrip/scripts/ops/redpanda_storage.py
philipp d8d9a34db5
All checks were successful
deploy / deploy (push) Successful in 20s
Add near intents control API
2026-04-01 10:11:33 +02:00

196 lines
5.5 KiB
Python
Executable file

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import re
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from common import (
DEFAULT_NAMESPACE,
DEFAULT_REDPANDA_DATA_PATH,
app_topics,
human_bytes,
kafka_brokers,
print_table,
redpanda_exec,
redpanda_pod_name,
probe_path_usage,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Show how much data Redpanda is currently storing for unrip topics."
)
parser.add_argument(
"--namespace",
default=DEFAULT_NAMESPACE,
help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})",
)
parser.add_argument(
"--brokers",
default="",
help="Override Kafka brokers instead of reading them from unrip-config.",
)
parser.add_argument(
"--topic",
action="append",
default=[],
help="Specific topic to inspect. Can be passed multiple times.",
)
parser.add_argument(
"--all-topics",
action="store_true",
help="Inspect every topic visible to Redpanda instead of the app topics from config.",
)
return parser.parse_args()
def list_topics(namespace: str, brokers: str) -> list[str]:
output = redpanda_exec(
"rpk",
"topic",
"list",
"--brokers",
brokers,
namespace=namespace,
).stdout.splitlines()
topics: list[str] = []
for line in output[1:]:
fields = line.split()
if fields:
topics.append(fields[0])
return topics
def parse_storage(output: str) -> dict[str, int | str]:
summary: dict[str, str] = {}
size_rows: list[dict[str, int]] = []
section = ""
size_header_seen = False
for raw_line in output.splitlines():
line = raw_line.strip()
if not line or set(line) == {"="}:
continue
if line == "SUMMARY":
section = "summary"
continue
if line == "SIZE":
section = "size"
size_header_seen = False
continue
if section == "summary":
fields = re.split(r"\s{2,}", line, maxsplit=1)
if len(fields) == 2:
summary[fields[0]] = fields[1]
continue
if section == "size":
if not size_header_seen:
size_header_seen = True
continue
fields = re.split(r"\s+", line)
if len(fields) < 6:
continue
size_rows.append(
{
"partition": int(fields[0]),
"cloud_bytes": int(fields[1]),
"local_bytes": int(fields[2]),
"total_bytes": int(fields[3]),
"cloud_segments": int(fields[4]),
"local_segments": int(fields[5]),
}
)
return {
"name": summary.get("NAME", ""),
"partitions": int(summary.get("PARTITIONS", "0")),
"replicas": int(summary.get("REPLICAS", "0")),
"local_bytes": sum(row["local_bytes"] for row in size_rows),
"total_bytes": sum(row["total_bytes"] for row in size_rows),
"local_segments": sum(row["local_segments"] for row in size_rows),
}
def main() -> int:
args = parse_args()
namespace = args.namespace
brokers = args.brokers or kafka_brokers(namespace=namespace)
if args.topic:
topics = args.topic
elif args.all_topics:
topics = list_topics(namespace, brokers)
else:
topics = app_topics(namespace=namespace)
if not topics:
raise SystemExit("no topics found")
topic_rows: list[list[str]] = []
total_local_bytes = 0
total_bytes = 0
total_segments = 0
for topic in topics:
output = redpanda_exec(
"rpk",
"topic",
"describe-storage",
topic,
"--brokers",
brokers,
namespace=namespace,
).stdout
parsed = parse_storage(output)
total_local_bytes += int(parsed["local_bytes"])
total_bytes += int(parsed["total_bytes"])
total_segments += int(parsed["local_segments"])
topic_rows.append(
[
topic,
str(parsed["partitions"]),
str(parsed["replicas"]),
human_bytes(int(parsed["local_bytes"])),
human_bytes(int(parsed["total_bytes"])),
str(parsed["local_segments"]),
]
)
redpanda_pod = redpanda_pod_name(namespace=namespace)
usage = probe_path_usage(redpanda_pod, DEFAULT_REDPANDA_DATA_PATH, namespace=namespace)
print(f"Namespace: {namespace}")
print(f"Brokers: {brokers}")
print(f"Pod: {redpanda_pod}")
print(f"Data path: {DEFAULT_REDPANDA_DATA_PATH}")
print(
"Disk: "
f"path={human_bytes(usage['path_bytes'])}, "
f"fs_used={human_bytes(usage['filesystem_used_bytes'])}, "
f"fs_avail={human_bytes(usage['filesystem_available_bytes'])}, "
f"use={usage['filesystem_use_percent']}"
)
print()
print("Topics")
print_table(
["TOPIC", "PARTITIONS", "REPLICAS", "LOCAL_BYTES", "TOTAL_BYTES", "LOCAL_SEGMENTS"],
topic_rows,
)
print()
print(
f"Totals: local={human_bytes(total_local_bytes)}, "
f"total={human_bytes(total_bytes)}, "
f"segments={total_segments}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())