196 lines
5.5 KiB
Python
Executable file
196 lines
5.5 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
|
|
from common import (
|
|
DEFAULT_NAMESPACE,
|
|
DEFAULT_REDPANDA_DATA_PATH,
|
|
app_topics,
|
|
human_bytes,
|
|
kafka_brokers,
|
|
print_table,
|
|
redpanda_exec,
|
|
redpanda_pod_name,
|
|
probe_path_usage,
|
|
)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Show how much data Redpanda is currently storing for unrip topics."
|
|
)
|
|
parser.add_argument(
|
|
"--namespace",
|
|
default=DEFAULT_NAMESPACE,
|
|
help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})",
|
|
)
|
|
parser.add_argument(
|
|
"--brokers",
|
|
default="",
|
|
help="Override Kafka brokers instead of reading them from unrip-config.",
|
|
)
|
|
parser.add_argument(
|
|
"--topic",
|
|
action="append",
|
|
default=[],
|
|
help="Specific topic to inspect. Can be passed multiple times.",
|
|
)
|
|
parser.add_argument(
|
|
"--all-topics",
|
|
action="store_true",
|
|
help="Inspect every topic visible to Redpanda instead of the app topics from config.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def list_topics(namespace: str, brokers: str) -> list[str]:
|
|
output = redpanda_exec(
|
|
"rpk",
|
|
"topic",
|
|
"list",
|
|
"--brokers",
|
|
brokers,
|
|
namespace=namespace,
|
|
).stdout.splitlines()
|
|
topics: list[str] = []
|
|
for line in output[1:]:
|
|
fields = line.split()
|
|
if fields:
|
|
topics.append(fields[0])
|
|
return topics
|
|
|
|
|
|
def parse_storage(output: str) -> dict[str, int | str]:
|
|
summary: dict[str, str] = {}
|
|
size_rows: list[dict[str, int]] = []
|
|
section = ""
|
|
size_header_seen = False
|
|
|
|
for raw_line in output.splitlines():
|
|
line = raw_line.strip()
|
|
if not line or set(line) == {"="}:
|
|
continue
|
|
if line == "SUMMARY":
|
|
section = "summary"
|
|
continue
|
|
if line == "SIZE":
|
|
section = "size"
|
|
size_header_seen = False
|
|
continue
|
|
|
|
if section == "summary":
|
|
fields = re.split(r"\s{2,}", line, maxsplit=1)
|
|
if len(fields) == 2:
|
|
summary[fields[0]] = fields[1]
|
|
continue
|
|
|
|
if section == "size":
|
|
if not size_header_seen:
|
|
size_header_seen = True
|
|
continue
|
|
fields = re.split(r"\s+", line)
|
|
if len(fields) < 6:
|
|
continue
|
|
size_rows.append(
|
|
{
|
|
"partition": int(fields[0]),
|
|
"cloud_bytes": int(fields[1]),
|
|
"local_bytes": int(fields[2]),
|
|
"total_bytes": int(fields[3]),
|
|
"cloud_segments": int(fields[4]),
|
|
"local_segments": int(fields[5]),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"name": summary.get("NAME", ""),
|
|
"partitions": int(summary.get("PARTITIONS", "0")),
|
|
"replicas": int(summary.get("REPLICAS", "0")),
|
|
"local_bytes": sum(row["local_bytes"] for row in size_rows),
|
|
"total_bytes": sum(row["total_bytes"] for row in size_rows),
|
|
"local_segments": sum(row["local_segments"] for row in size_rows),
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
namespace = args.namespace
|
|
brokers = args.brokers or kafka_brokers(namespace=namespace)
|
|
|
|
if args.topic:
|
|
topics = args.topic
|
|
elif args.all_topics:
|
|
topics = list_topics(namespace, brokers)
|
|
else:
|
|
topics = app_topics(namespace=namespace)
|
|
|
|
if not topics:
|
|
raise SystemExit("no topics found")
|
|
|
|
topic_rows: list[list[str]] = []
|
|
total_local_bytes = 0
|
|
total_bytes = 0
|
|
total_segments = 0
|
|
|
|
for topic in topics:
|
|
output = redpanda_exec(
|
|
"rpk",
|
|
"topic",
|
|
"describe-storage",
|
|
topic,
|
|
"--brokers",
|
|
brokers,
|
|
namespace=namespace,
|
|
).stdout
|
|
parsed = parse_storage(output)
|
|
total_local_bytes += int(parsed["local_bytes"])
|
|
total_bytes += int(parsed["total_bytes"])
|
|
total_segments += int(parsed["local_segments"])
|
|
topic_rows.append(
|
|
[
|
|
topic,
|
|
str(parsed["partitions"]),
|
|
str(parsed["replicas"]),
|
|
human_bytes(int(parsed["local_bytes"])),
|
|
human_bytes(int(parsed["total_bytes"])),
|
|
str(parsed["local_segments"]),
|
|
]
|
|
)
|
|
|
|
redpanda_pod = redpanda_pod_name(namespace=namespace)
|
|
usage = probe_path_usage(redpanda_pod, DEFAULT_REDPANDA_DATA_PATH, namespace=namespace)
|
|
|
|
print(f"Namespace: {namespace}")
|
|
print(f"Brokers: {brokers}")
|
|
print(f"Pod: {redpanda_pod}")
|
|
print(f"Data path: {DEFAULT_REDPANDA_DATA_PATH}")
|
|
print(
|
|
"Disk: "
|
|
f"path={human_bytes(usage['path_bytes'])}, "
|
|
f"fs_used={human_bytes(usage['filesystem_used_bytes'])}, "
|
|
f"fs_avail={human_bytes(usage['filesystem_available_bytes'])}, "
|
|
f"use={usage['filesystem_use_percent']}"
|
|
)
|
|
print()
|
|
print("Topics")
|
|
print_table(
|
|
["TOPIC", "PARTITIONS", "REPLICAS", "LOCAL_BYTES", "TOTAL_BYTES", "LOCAL_SEGMENTS"],
|
|
topic_rows,
|
|
)
|
|
print()
|
|
print(
|
|
f"Totals: local={human_bytes(total_local_bytes)}, "
|
|
f"total={human_bytes(total_bytes)}, "
|
|
f"segments={total_segments}"
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|