#!/usr/bin/env python3 from __future__ import annotations import argparse import signal import subprocess import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) from common import DEFAULT_NAMESPACE, build_env, kafka_brokers, raw_near_topic, stderr DEFAULT_OUTPUT_FORMAT = ( "%d{go[2006-01-02T15:04:05Z07:00]} topic=%t partition=%p offset=%o\n" "%v\n" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Live-inspect raw NEAR intents arriving in Redpanda." ) parser.add_argument( "--namespace", default=DEFAULT_NAMESPACE, help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})", ) parser.add_argument( "--brokers", default="", help="Override Kafka brokers instead of reading them from unrip-config.", ) parser.add_argument( "--topic", default="", help="Topic to consume. Defaults to KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE from unrip-config.", ) parser.add_argument( "--offset", default="end", help="Kafka offset selector passed to rpk (default: end)", ) parser.add_argument( "--num", type=int, default=0, help="Number of records to read before exiting. 0 means unbounded (default: 0).", ) parser.add_argument( "--timeout", type=float, default=0, help="Stop after this many seconds even if the stream is still open.", ) parser.add_argument( "--value-only", action="store_true", help="Print only the record value without topic/partition/offset metadata.", ) parser.add_argument( "--rpk-json", action="store_true", help="Use rpk's built-in JSON record formatter instead of the default metadata+value output.", ) return parser.parse_args() def build_command(args: argparse.Namespace, brokers: str, topic: str) -> list[str]: command = [ "kubectl", "exec", "-i", "-n", args.namespace, "deploy/redpanda", "--", "rpk", "topic", "consume", topic, "--brokers", brokers, "--offset", args.offset, "--num", str(args.num), ] if args.rpk_json: command.extend(["--format", "json", "--pretty-print=false"]) elif args.value_only: command.extend(["--format", "%v\n"]) else: command.extend(["--format", DEFAULT_OUTPUT_FORMAT]) return command def main() -> int: args = parse_args() brokers = args.brokers or kafka_brokers(namespace=args.namespace) topic = args.topic or raw_near_topic(namespace=args.namespace) command = build_command(args, brokers, topic) stderr(f"namespace={args.namespace} brokers={brokers} topic={topic} offset={args.offset} num={args.num}") process = subprocess.Popen(command, env=build_env()) try: process.wait(timeout=args.timeout if args.timeout and args.timeout > 0 else None) except subprocess.TimeoutExpired: process.send_signal(signal.SIGINT) try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() process.wait(timeout=5) return 124 except KeyboardInterrupt: process.send_signal(signal.SIGINT) return process.wait() return process.returncode if __name__ == "__main__": raise SystemExit(main())