From d8d9a34db53699f00551d127ff08a46d12e2a889 Mon Sep 17 00:00:00 2001 From: philipp Date: Wed, 1 Apr 2026 10:11:33 +0200 Subject: [PATCH] Add near intents control API --- .env.example | 3 + README.md | 77 ++++++++ deploy/k8s/base/unrip.yaml | 6 + scripts/ops/common.py | 301 +++++++++++++++++++++++++++++++ scripts/ops/deployment_status.py | 184 +++++++++++++++++++ scripts/ops/live_near_intents.py | 127 +++++++++++++ scripts/ops/redpanda_storage.py | 196 ++++++++++++++++++++ src/apps/near-intents-ingest.mjs | 41 +++-- src/core/control-api.mjs | 156 ++++++++++++++++ src/core/pair-filter.mjs | 93 +++++++++- src/lib/config.mjs | 18 ++ src/venues/near-intents/ws.mjs | 63 ++++++- 12 files changed, 1246 insertions(+), 19 deletions(-) create mode 100755 scripts/ops/common.py create mode 100755 scripts/ops/deployment_status.py create mode 100755 scripts/ops/live_near_intents.py create mode 100755 scripts/ops/redpanda_storage.py create mode 100644 src/core/control-api.mjs diff --git a/.env.example b/.env.example index f8b3a53..4ca702e 100644 --- a/.env.example +++ b/.env.example @@ -2,6 +2,9 @@ NEAR_INTENTS_API_KEY=replace_me NEAR_INTENTS_WS_URL=wss://solver-relay-v2.chaindefuser.com/ws NEAR_INTENTS_PAIR_FILTER=nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf +NEAR_INTENTS_CONTROL_API_ENABLED=true +NEAR_INTENTS_CONTROL_HOST=0.0.0.0 +NEAR_INTENTS_CONTROL_PORT=8081 KAFKA_BROKERS=redpanda:9092 KAFKA_CLIENT_ID=unrip KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE=raw.near_intents.quote diff --git a/README.md b/README.md index c77d64a..fd870a5 100644 --- a/README.md +++ b/README.md @@ -108,3 +108,80 @@ KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout sta KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-executor KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-consumer ``` + +### Auxiliary ops scripts + +These scripts default to the same adjacent platform checkout as the deployment +bootstrap: `../unrip3/.state/hetzner/kubeconfig.yaml`. Override with +`KUBECONFIG_PATH`, `KUBECONFIG`, or `PLATFORM_REPO_DIR` if needed. + +`scripts/ops/deployment_status.py` +- Shows current deployment readiness, pod uptime, restart counts, and mounted storage usage. +- By default it shows the live deployment pods only. +- Use `--include-completed` to include completed Job pods. +- Use `--include-rootfs` if you also want a probe of `/` for pods without PVC-backed mounts. + +```bash +python3 scripts/ops/deployment_status.py +python3 scripts/ops/deployment_status.py --include-completed +``` + +`scripts/ops/redpanda_storage.py` +- Shows how much data Redpanda is currently storing for the unrip topics. +- Prints per-topic local bytes, total bytes, segment counts, and the overall Redpanda data-path usage. +- Use `--all-topics` to inspect every visible topic, or `--topic` multiple times for a subset. + +```bash +python3 scripts/ops/redpanda_storage.py +python3 scripts/ops/redpanda_storage.py --all-topics +python3 scripts/ops/redpanda_storage.py --topic raw.near_intents.quote --topic norm.swap_demand +``` + +`scripts/ops/live_near_intents.py` +- Live-inspects the raw NEAR quote stream entering Redpanda. +- Defaults to the configured raw topic, `--offset end`, and an unbounded live tail. +- Use `--num` for a bounded sample, `--offset start` to read from the beginning, `--value-only` for payload-only output, or `--rpk-json` for the full record metadata emitted by `rpk`. +- Use `--timeout` when you want the script to stop automatically. + +```bash +python3 scripts/ops/live_near_intents.py +python3 scripts/ops/live_near_intents.py --num 10 --offset start +python3 scripts/ops/live_near_intents.py --value-only --timeout 30 +python3 scripts/ops/live_near_intents.py --rpk-json --num 5 +``` + +### Near Intents control API + +`near-intents-ingest` exposes a small in-process control API on port `8081` by +default. It is meant for ad hoc inspection and runtime filter changes without a +redeploy. + +Port-forward the deployment: + +```bash +KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip port-forward deploy/near-intents-ingest 8081:8081 +``` + +Inspect current state, including the active pair filter and ingest counters: + +```bash +curl -s http://127.0.0.1:8081/state +``` + +Set or disable the runtime pair filter: + +```bash +curl -s -X PUT http://127.0.0.1:8081/pair-filter \ + -H 'content-type: application/json' \ + -d '{"pair":"nep141:btc.omft.near->nep141:eth.omft.near"}' + +curl -s -X PUT http://127.0.0.1:8081/pair-filter \ + -H 'content-type: application/json' \ + -d '{"pair":null}' +``` + +Reset the runtime filter back to the configured env/file/default state: + +```bash +curl -s -X POST http://127.0.0.1:8081/pair-filter/reset +``` diff --git a/deploy/k8s/base/unrip.yaml b/deploy/k8s/base/unrip.yaml index d930a46..47a4deb 100644 --- a/deploy/k8s/base/unrip.yaml +++ b/deploy/k8s/base/unrip.yaml @@ -6,6 +6,9 @@ metadata: data: NEAR_INTENTS_WS_URL: wss://solver-relay-v2.chaindefuser.com/ws NEAR_INTENTS_PAIR_FILTER: nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf + NEAR_INTENTS_CONTROL_API_ENABLED: "true" + NEAR_INTENTS_CONTROL_HOST: 0.0.0.0 + NEAR_INTENTS_CONTROL_PORT: "8081" KAFKA_BROKERS: redpanda.unrip.svc.cluster.local:9092 KAFKA_CLIENT_ID: unrip KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE: raw.near_intents.quote @@ -52,6 +55,9 @@ spec: image: ghcr.io/example/unrip:bootstrap imagePullPolicy: IfNotPresent command: ["node", "src/apps/near-intents-ingest.mjs"] + ports: + - name: control-api + containerPort: 8081 envFrom: - configMapRef: name: unrip-config diff --git a/scripts/ops/common.py b/scripts/ops/common.py new file mode 100755 index 0000000..25d9dfd --- /dev/null +++ b/scripts/ops/common.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import os +import shlex +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parents[2] +PLATFORM_REPO_DIR = Path( + os.environ.get("PLATFORM_REPO_DIR", str(ROOT_DIR / "../unrip3")) +).resolve() +DEFAULT_KUBECONFIG_PATH = PLATFORM_REPO_DIR / ".state/hetzner/kubeconfig.yaml" + +DEFAULT_NAMESPACE = os.environ.get("PROJECT_NAMESPACE", "unrip") +DEFAULT_CONFIGMAP_NAME = os.environ.get("CONFIGMAP_NAME", "unrip-config") +DEFAULT_REDPANDA_DEPLOYMENT = os.environ.get("REDPANDA_DEPLOYMENT", "redpanda") +DEFAULT_REDPANDA_LABEL_SELECTOR = os.environ.get("REDPANDA_LABEL_SELECTOR", "app=redpanda") +DEFAULT_REDPANDA_DATA_PATH = "/var/lib/redpanda/data" + +APP_TOPIC_KEYS = ( + "KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE", + "KAFKA_TOPIC_NORM_SWAP_DEMAND", + "KAFKA_TOPIC_CMD_EXECUTE_TRADE", + "KAFKA_TOPIC_EXEC_TRADE_RESULT", +) + + +def resolve_kubeconfig_path() -> str: + return os.environ.get("KUBECONFIG_PATH") or os.environ.get( + "KUBECONFIG", str(DEFAULT_KUBECONFIG_PATH) + ) + + +def build_env() -> dict[str, str]: + env = os.environ.copy() + kubeconfig_path = resolve_kubeconfig_path() + if not Path(kubeconfig_path).exists(): + raise SystemExit( + f"missing kubeconfig: {kubeconfig_path}\n" + "set KUBECONFIG_PATH, KUBECONFIG, or PLATFORM_REPO_DIR" + ) + env["KUBECONFIG"] = kubeconfig_path + return env + + +def run( + args: list[str], + *, + capture_output: bool = True, + check: bool = True, + input_text: str | None = None, +) -> subprocess.CompletedProcess[str]: + return subprocess.run( + args, + env=build_env(), + text=True, + capture_output=capture_output, + input=input_text, + check=check, + ) + + +def kubectl( + *args: str, + capture_output: bool = True, + check: bool = True, + input_text: str | None = None, +) -> subprocess.CompletedProcess[str]: + return run( + ["kubectl", *args], + capture_output=capture_output, + check=check, + input_text=input_text, + ) + + +def kubectl_json(*args: str) -> dict: + return json.loads(kubectl(*args).stdout) + + +def config_value(key: str, namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str: + return kubectl( + "get", + "configmap", + configmap_name, + "-n", + namespace, + "-o", + f"jsonpath={{.data.{key}}}", + ).stdout.strip() + + +def app_topics(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> list[str]: + topics: list[str] = [] + for key in APP_TOPIC_KEYS: + value = config_value(key, namespace=namespace, configmap_name=configmap_name).strip() + if value: + topics.append(value) + return topics + + +def kafka_brokers(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str: + return config_value("KAFKA_BROKERS", namespace=namespace, configmap_name=configmap_name) + + +def raw_near_topic(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str: + return config_value( + "KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE", + namespace=namespace, + configmap_name=configmap_name, + ) + + +def redpanda_pod_name(namespace: str = DEFAULT_NAMESPACE) -> str: + payload = kubectl_json( + "get", + "pods", + "-n", + namespace, + "-l", + DEFAULT_REDPANDA_LABEL_SELECTOR, + "-o", + "json", + ) + items = payload.get("items", []) + if not items: + raise SystemExit(f"no Redpanda pod found in namespace {namespace}") + return items[0]["metadata"]["name"] + + +def redpanda_exec( + *args: str, + namespace: str = DEFAULT_NAMESPACE, + capture_output: bool = True, + check: bool = True, +) -> subprocess.CompletedProcess[str]: + return kubectl( + "exec", + "-n", + namespace, + f"deploy/{DEFAULT_REDPANDA_DEPLOYMENT}", + "--", + *args, + capture_output=capture_output, + check=check, + ) + + +def pod_exec( + pod_name: str, + *args: str, + namespace: str = DEFAULT_NAMESPACE, + capture_output: bool = True, + check: bool = True, +) -> subprocess.CompletedProcess[str]: + return kubectl( + "exec", + "-n", + namespace, + pod_name, + "--", + *args, + capture_output=capture_output, + check=check, + ) + + +def probe_path_usage( + pod_name: str, + path: str, + *, + namespace: str = DEFAULT_NAMESPACE, +) -> dict[str, object]: + shell_path = shlex.quote(path) + probe = "\n".join( + ( + f"df -B1 {shell_path} | awk 'NR==2 {{print $2, $3, $4, $5}}'", + f"du -sb {shell_path} 2>/dev/null | awk 'END {{print $1}}'", + ) + ) + result = pod_exec( + pod_name, + "sh", + "-lc", + probe, + namespace=namespace, + ).stdout.strip().splitlines() + if not result: + raise RuntimeError(f"no usage output for {pod_name}:{path}") + + fs_fields = result[0].split() + if len(fs_fields) != 4: + raise RuntimeError(f"unexpected df output for {pod_name}:{path}: {result[0]}") + + du_bytes = None + if len(result) > 1 and result[1].strip(): + try: + du_bytes = int(result[1].strip()) + except ValueError: + du_bytes = None + + return { + "filesystem_total_bytes": int(fs_fields[0]), + "filesystem_used_bytes": int(fs_fields[1]), + "filesystem_available_bytes": int(fs_fields[2]), + "filesystem_use_percent": fs_fields[3], + "path_bytes": du_bytes, + } + + +def parse_k8s_timestamp(value: str | None) -> datetime | None: + if not value: + return None + return datetime.fromisoformat(value.replace("Z", "+00:00")) + + +def now_utc() -> datetime: + return datetime.now(timezone.utc) + + +def human_duration(seconds: float | int | None) -> str: + if seconds is None: + return "-" + seconds = int(max(0, seconds)) + chunks = [] + for unit_seconds, suffix in ( + (86400, "d"), + (3600, "h"), + (60, "m"), + (1, "s"), + ): + if seconds >= unit_seconds or (suffix == "s" and not chunks): + value, seconds = divmod(seconds, unit_seconds) + if value or suffix == "s": + chunks.append(f"{value}{suffix}") + if len(chunks) == 2: + break + return " ".join(chunks) + + +def human_bytes(value: int | float | None) -> str: + if value is None: + return "-" + value = float(value) + units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB") + unit_index = 0 + while value >= 1024 and unit_index < len(units) - 1: + value /= 1024 + unit_index += 1 + if unit_index == 0: + return f"{int(value)} {units[unit_index]}" + return f"{value:.1f} {units[unit_index]}" + + +def parse_storage_quantity(value: str | None) -> int | None: + if not value: + return None + + binary_units = { + "Ki": 1024**1, + "Mi": 1024**2, + "Gi": 1024**3, + "Ti": 1024**4, + "Pi": 1024**5, + } + decimal_units = { + "K": 1000**1, + "M": 1000**2, + "G": 1000**3, + "T": 1000**4, + "P": 1000**5, + } + + for suffix, factor in binary_units.items(): + if value.endswith(suffix): + return int(float(value[: -len(suffix)]) * factor) + for suffix, factor in decimal_units.items(): + if value.endswith(suffix): + return int(float(value[: -len(suffix)]) * factor) + return int(float(value)) + + +def print_table(headers: list[str], rows: list[list[str]]) -> None: + widths = [len(header) for header in headers] + for row in rows: + for index, cell in enumerate(row): + widths[index] = max(widths[index], len(str(cell))) + + template = " ".join(f"{{:{width}}}" for width in widths) + print(template.format(*headers)) + print(template.format(*["-" * width for width in widths])) + for row in rows: + print(template.format(*row)) + + +def stderr(message: str) -> None: + print(message, file=sys.stderr) diff --git a/scripts/ops/deployment_status.py b/scripts/ops/deployment_status.py new file mode 100755 index 0000000..49db22d --- /dev/null +++ b/scripts/ops/deployment_status.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +from common import ( + DEFAULT_NAMESPACE, + human_bytes, + human_duration, + kubectl_json, + now_utc, + parse_k8s_timestamp, + parse_storage_quantity, + print_table, + probe_path_usage, +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Show current unrip deployment state, pod uptime, and mounted storage usage." + ) + parser.add_argument( + "--namespace", + default=DEFAULT_NAMESPACE, + help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})", + ) + parser.add_argument( + "--include-rootfs", + action="store_true", + help="Also probe '/' for pods that do not have PVC-backed mounts.", + ) + parser.add_argument( + "--include-completed", + action="store_true", + help="Include completed Job pods in the pod list.", + ) + return parser.parse_args() + + +def ready_string(pod: dict) -> str: + statuses = pod.get("status", {}).get("containerStatuses", []) + ready = sum(1 for status in statuses if status.get("ready")) + total = len(statuses) + return f"{ready}/{total}" + + +def restart_count(pod: dict) -> int: + return sum(status.get("restartCount", 0) for status in pod.get("status", {}).get("containerStatuses", [])) + + +def pvc_mounts_for_pod(pod: dict) -> list[dict[str, str]]: + volumes = { + volume["name"]: volume.get("persistentVolumeClaim", {}).get("claimName") + for volume in pod.get("spec", {}).get("volumes", []) + } + mounts: list[dict[str, str]] = [] + for container in pod.get("spec", {}).get("containers", []): + for mount in container.get("volumeMounts", []): + claim_name = volumes.get(mount["name"]) + if claim_name: + mounts.append( + { + "container": container.get("name", "app"), + "claim_name": claim_name, + "mount_path": mount["mountPath"], + } + ) + return mounts + + +def is_completed_job_pod(pod: dict) -> bool: + owners = pod.get("metadata", {}).get("ownerReferences", []) + if not any(owner.get("kind") == "Job" for owner in owners): + return False + return pod.get("status", {}).get("phase") in {"Succeeded", "Failed"} + + +def main() -> int: + args = parse_args() + namespace = args.namespace + + deployment_payload = kubectl_json("get", "deploy", "-n", namespace, "-o", "json") + pod_payload = kubectl_json("get", "pods", "-n", namespace, "-o", "json") + pvc_payload = kubectl_json("get", "pvc", "-n", namespace, "-o", "json") + + pvc_requests = { + item["metadata"]["name"]: item.get("spec", {}) + .get("resources", {}) + .get("requests", {}) + .get("storage", "") + for item in pvc_payload.get("items", []) + } + + deployment_rows: list[list[str]] = [] + for item in sorted(deployment_payload.get("items", []), key=lambda value: value["metadata"]["name"]): + container = item.get("spec", {}).get("template", {}).get("spec", {}).get("containers", [{}])[0] + deployment_rows.append( + [ + item["metadata"]["name"], + f"{item.get('status', {}).get('readyReplicas', 0)}/{item.get('spec', {}).get('replicas', 0)}", + str(item.get("status", {}).get("availableReplicas", 0)), + container.get("image", "-"), + ] + ) + + pod_rows: list[list[str]] = [] + storage_rows: list[list[str]] = [] + + for pod in sorted(pod_payload.get("items", []), key=lambda value: value["metadata"]["name"]): + if not args.include_completed and is_completed_job_pod(pod): + continue + + pod_name = pod["metadata"]["name"] + start_time = parse_k8s_timestamp( + pod.get("status", {}).get("startTime") + or pod.get("status", {}).get("containerStatuses", [{}])[0] + .get("state", {}) + .get("running", {}) + .get("startedAt") + ) + uptime = human_duration((now_utc() - start_time).total_seconds() if start_time else None) + image = pod.get("spec", {}).get("containers", [{}])[0].get("image", "-") + pod_rows.append( + [ + pod_name, + pod.get("status", {}).get("phase", "-"), + ready_string(pod), + str(restart_count(pod)), + uptime, + pod.get("spec", {}).get("nodeName", "-"), + image, + ] + ) + + mounts = pvc_mounts_for_pod(pod) + if not mounts and args.include_rootfs: + mounts = [{"claim_name": "-", "mount_path": "/", "container": "app"}] + + if not mounts: + storage_rows.append([pod_name, "-", "no pvc mounts", "-", "-", "-", "-", "-"]) + continue + + for mount in mounts: + usage = probe_path_usage(pod_name, mount["mount_path"], namespace=namespace) + requested_raw = pvc_requests.get(mount["claim_name"], "") + requested_bytes = parse_storage_quantity(requested_raw) + storage_rows.append( + [ + pod_name, + mount["mount_path"], + mount["claim_name"], + human_bytes(requested_bytes) if requested_bytes is not None else (requested_raw or "-"), + human_bytes(usage["path_bytes"]), + human_bytes(usage["filesystem_used_bytes"]), + human_bytes(usage["filesystem_available_bytes"]), + str(usage["filesystem_use_percent"]), + ] + ) + + print(f"Namespace: {namespace}") + print() + print("Deployments") + print_table(["NAME", "READY", "AVAILABLE", "IMAGE"], deployment_rows) + print() + print("Pods") + print_table(["POD", "PHASE", "READY", "RESTARTS", "UPTIME", "NODE", "IMAGE"], pod_rows) + print() + print("Storage") + print_table( + ["POD", "PATH", "CLAIM", "REQUESTED", "PATH_BYTES", "FS_USED", "FS_AVAIL", "USE%"], + storage_rows, + ) + print() + print("Note: storage figures come from container-mounted paths. Pods without PVC-backed mounts report no dedicated storage.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/ops/live_near_intents.py b/scripts/ops/live_near_intents.py new file mode 100755 index 0000000..632ecf5 --- /dev/null +++ b/scripts/ops/live_near_intents.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import signal +import subprocess +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +from common import DEFAULT_NAMESPACE, build_env, kafka_brokers, raw_near_topic, stderr + + +DEFAULT_OUTPUT_FORMAT = ( + "%d{go[2006-01-02T15:04:05Z07:00]} topic=%t partition=%p offset=%o\n" + "%v\n" +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Live-inspect raw NEAR intents arriving in Redpanda." + ) + parser.add_argument( + "--namespace", + default=DEFAULT_NAMESPACE, + help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})", + ) + parser.add_argument( + "--brokers", + default="", + help="Override Kafka brokers instead of reading them from unrip-config.", + ) + parser.add_argument( + "--topic", + default="", + help="Topic to consume. Defaults to KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE from unrip-config.", + ) + parser.add_argument( + "--offset", + default="end", + help="Kafka offset selector passed to rpk (default: end)", + ) + parser.add_argument( + "--num", + type=int, + default=0, + help="Number of records to read before exiting. 0 means unbounded (default: 0).", + ) + parser.add_argument( + "--timeout", + type=float, + default=0, + help="Stop after this many seconds even if the stream is still open.", + ) + parser.add_argument( + "--value-only", + action="store_true", + help="Print only the record value without topic/partition/offset metadata.", + ) + parser.add_argument( + "--rpk-json", + action="store_true", + help="Use rpk's built-in JSON record formatter instead of the default metadata+value output.", + ) + return parser.parse_args() + + +def build_command(args: argparse.Namespace, brokers: str, topic: str) -> list[str]: + command = [ + "kubectl", + "exec", + "-i", + "-n", + args.namespace, + "deploy/redpanda", + "--", + "rpk", + "topic", + "consume", + topic, + "--brokers", + brokers, + "--offset", + args.offset, + "--num", + str(args.num), + ] + + if args.rpk_json: + command.extend(["--format", "json", "--pretty-print=false"]) + elif args.value_only: + command.extend(["--format", "%v\n"]) + else: + command.extend(["--format", DEFAULT_OUTPUT_FORMAT]) + + return command + + +def main() -> int: + args = parse_args() + brokers = args.brokers or kafka_brokers(namespace=args.namespace) + topic = args.topic or raw_near_topic(namespace=args.namespace) + command = build_command(args, brokers, topic) + + stderr(f"namespace={args.namespace} brokers={brokers} topic={topic} offset={args.offset} num={args.num}") + + process = subprocess.Popen(command, env=build_env()) + try: + process.wait(timeout=args.timeout if args.timeout and args.timeout > 0 else None) + except subprocess.TimeoutExpired: + process.send_signal(signal.SIGINT) + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait(timeout=5) + return 124 + except KeyboardInterrupt: + process.send_signal(signal.SIGINT) + return process.wait() + return process.returncode + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/ops/redpanda_storage.py b/scripts/ops/redpanda_storage.py new file mode 100755 index 0000000..f236884 --- /dev/null +++ b/scripts/ops/redpanda_storage.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import re +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +from common import ( + DEFAULT_NAMESPACE, + DEFAULT_REDPANDA_DATA_PATH, + app_topics, + human_bytes, + kafka_brokers, + print_table, + redpanda_exec, + redpanda_pod_name, + probe_path_usage, +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Show how much data Redpanda is currently storing for unrip topics." + ) + parser.add_argument( + "--namespace", + default=DEFAULT_NAMESPACE, + help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})", + ) + parser.add_argument( + "--brokers", + default="", + help="Override Kafka brokers instead of reading them from unrip-config.", + ) + parser.add_argument( + "--topic", + action="append", + default=[], + help="Specific topic to inspect. Can be passed multiple times.", + ) + parser.add_argument( + "--all-topics", + action="store_true", + help="Inspect every topic visible to Redpanda instead of the app topics from config.", + ) + return parser.parse_args() + + +def list_topics(namespace: str, brokers: str) -> list[str]: + output = redpanda_exec( + "rpk", + "topic", + "list", + "--brokers", + brokers, + namespace=namespace, + ).stdout.splitlines() + topics: list[str] = [] + for line in output[1:]: + fields = line.split() + if fields: + topics.append(fields[0]) + return topics + + +def parse_storage(output: str) -> dict[str, int | str]: + summary: dict[str, str] = {} + size_rows: list[dict[str, int]] = [] + section = "" + size_header_seen = False + + for raw_line in output.splitlines(): + line = raw_line.strip() + if not line or set(line) == {"="}: + continue + if line == "SUMMARY": + section = "summary" + continue + if line == "SIZE": + section = "size" + size_header_seen = False + continue + + if section == "summary": + fields = re.split(r"\s{2,}", line, maxsplit=1) + if len(fields) == 2: + summary[fields[0]] = fields[1] + continue + + if section == "size": + if not size_header_seen: + size_header_seen = True + continue + fields = re.split(r"\s+", line) + if len(fields) < 6: + continue + size_rows.append( + { + "partition": int(fields[0]), + "cloud_bytes": int(fields[1]), + "local_bytes": int(fields[2]), + "total_bytes": int(fields[3]), + "cloud_segments": int(fields[4]), + "local_segments": int(fields[5]), + } + ) + + return { + "name": summary.get("NAME", ""), + "partitions": int(summary.get("PARTITIONS", "0")), + "replicas": int(summary.get("REPLICAS", "0")), + "local_bytes": sum(row["local_bytes"] for row in size_rows), + "total_bytes": sum(row["total_bytes"] for row in size_rows), + "local_segments": sum(row["local_segments"] for row in size_rows), + } + + +def main() -> int: + args = parse_args() + namespace = args.namespace + brokers = args.brokers or kafka_brokers(namespace=namespace) + + if args.topic: + topics = args.topic + elif args.all_topics: + topics = list_topics(namespace, brokers) + else: + topics = app_topics(namespace=namespace) + + if not topics: + raise SystemExit("no topics found") + + topic_rows: list[list[str]] = [] + total_local_bytes = 0 + total_bytes = 0 + total_segments = 0 + + for topic in topics: + output = redpanda_exec( + "rpk", + "topic", + "describe-storage", + topic, + "--brokers", + brokers, + namespace=namespace, + ).stdout + parsed = parse_storage(output) + total_local_bytes += int(parsed["local_bytes"]) + total_bytes += int(parsed["total_bytes"]) + total_segments += int(parsed["local_segments"]) + topic_rows.append( + [ + topic, + str(parsed["partitions"]), + str(parsed["replicas"]), + human_bytes(int(parsed["local_bytes"])), + human_bytes(int(parsed["total_bytes"])), + str(parsed["local_segments"]), + ] + ) + + redpanda_pod = redpanda_pod_name(namespace=namespace) + usage = probe_path_usage(redpanda_pod, DEFAULT_REDPANDA_DATA_PATH, namespace=namespace) + + print(f"Namespace: {namespace}") + print(f"Brokers: {brokers}") + print(f"Pod: {redpanda_pod}") + print(f"Data path: {DEFAULT_REDPANDA_DATA_PATH}") + print( + "Disk: " + f"path={human_bytes(usage['path_bytes'])}, " + f"fs_used={human_bytes(usage['filesystem_used_bytes'])}, " + f"fs_avail={human_bytes(usage['filesystem_available_bytes'])}, " + f"use={usage['filesystem_use_percent']}" + ) + print() + print("Topics") + print_table( + ["TOPIC", "PARTITIONS", "REPLICAS", "LOCAL_BYTES", "TOTAL_BYTES", "LOCAL_SEGMENTS"], + topic_rows, + ) + print() + print( + f"Totals: local={human_bytes(total_local_bytes)}, " + f"total={human_bytes(total_bytes)}, " + f"segments={total_segments}" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/apps/near-intents-ingest.mjs b/src/apps/near-intents-ingest.mjs index ab95059..565f96d 100644 --- a/src/apps/near-intents-ingest.mjs +++ b/src/apps/near-intents-ingest.mjs @@ -1,6 +1,7 @@ import process from 'node:process'; import { createProducer } from '../bus/kafka/producer.mjs'; +import { startControlApi } from '../core/control-api.mjs'; import { createLogger } from '../core/log.mjs'; import { createPairFilterController } from '../core/pair-filter.mjs'; import { loadConfig } from '../lib/config.mjs'; @@ -40,19 +41,7 @@ const producer = await createProducer({ logger, }); -process.on('SIGINT', async () => { - pairFilterController.close(); - await producer.disconnect(); - process.exit(0); -}); - -process.on('SIGTERM', async () => { - pairFilterController.close(); - await producer.disconnect(); - process.exit(0); -}); - -await startNearIntentsWs({ +const wsRuntime = await startNearIntentsWs({ apiKey: config.nearIntentsApiKey, wsUrl: config.nearIntentsWsUrl, getPairFilter: () => pairFilterController.getPairFilter(), @@ -65,3 +54,29 @@ await startNearIntentsWs({ venue: 'near-intents', }), }); + +const controlApi = config.nearIntentsControlApiEnabled + ? startControlApi({ + host: config.nearIntentsControlHost, + port: config.nearIntentsControlPort, + logger: logger.child({ + component: 'control-api', + venue: 'near-intents', + }), + service: 'near-intents-ingest', + namespace: config.projectNamespace, + pairFilterController, + stateProvider: wsRuntime, + }) + : null; + +async function shutdown() { + controlApi && await controlApi.close().catch(() => {}); + wsRuntime.close(); + pairFilterController.close(); + await producer.disconnect(); + process.exit(0); +} + +process.on('SIGINT', shutdown); +process.on('SIGTERM', shutdown); diff --git a/src/core/control-api.mjs b/src/core/control-api.mjs new file mode 100644 index 0000000..d312ebb --- /dev/null +++ b/src/core/control-api.mjs @@ -0,0 +1,156 @@ +import http from 'node:http'; + +export function startControlApi({ + host = '0.0.0.0', + port = 8081, + logger = null, + service = 'near-intents-ingest', + namespace = 'unrip', + pairFilterController, + stateProvider = null, +} = {}) { + const server = http.createServer(async (req, res) => { + try { + if (req.method === 'GET' && req.url === '/healthz') { + return sendJson(res, 200, { + ok: true, + service, + namespace, + }); + } + + if (req.method === 'GET' && req.url === '/state') { + return sendJson(res, 200, buildStateResponse({ + service, + namespace, + pairFilterController, + stateProvider, + })); + } + + if (req.method === 'GET' && req.url === '/pair-filter') { + return sendJson(res, 200, { + service, + namespace, + ...pairFilterController.getState(), + }); + } + + if (req.method === 'PUT' && req.url === '/pair-filter') { + const body = await readJsonBody(req); + + if (body.disabled === true || body.enabled === false || body.pair == null) { + return sendJson(res, 200, { + service, + namespace, + ...pairFilterController.disable(), + }); + } + + if (typeof body.pair !== 'string') { + return sendJson(res, 400, { + error: "send JSON like {\"pair\":\"asset_a->asset_b\"} or {\"pair\":null}", + }); + } + + return sendJson(res, 200, { + service, + namespace, + ...pairFilterController.setPairFilter(body.pair), + }); + } + + if (req.method === 'POST' && req.url === '/pair-filter/reset') { + return sendJson(res, 200, { + service, + namespace, + ...pairFilterController.reset(), + }); + } + + return sendJson(res, 404, { + error: 'not_found', + }); + } catch (error) { + logger?.error('control_api_request_failed', { + details: { + method: req.method, + path: req.url, + error: error.message, + }, + }); + return sendJson(res, 500, { + error: error.message, + }); + } + }); + + server.listen(port, host, () => { + logger?.info('control_api_started', { + details: { + host, + port, + }, + }); + }); + + return { + close() { + return new Promise((resolve, reject) => { + server.close((error) => { + if (error) reject(error); + else resolve(); + }); + }); + }, + }; +} + +function buildStateResponse({ + service, + namespace, + pairFilterController, + stateProvider, +}) { + return { + service, + namespace, + pair_filter: pairFilterController.getState(), + ingest: stateProvider?.getState?.() ?? null, + }; +} + +function sendJson(res, statusCode, payload) { + const body = JSON.stringify(payload, null, 2); + res.statusCode = statusCode; + res.setHeader('content-type', 'application/json; charset=utf-8'); + res.end(`${body}\n`); +} + +function readJsonBody(req) { + return new Promise((resolve, reject) => { + let raw = ''; + + req.on('data', (chunk) => { + raw += chunk; + if (raw.length > 64 * 1024) { + reject(new Error('request body too large')); + } + }); + + req.on('end', () => { + if (!raw.trim()) { + resolve({}); + return; + } + + try { + resolve(JSON.parse(raw)); + } catch { + reject(new Error('invalid_json')); + } + }); + + req.on('error', reject); + }); +} diff --git a/src/core/pair-filter.mjs b/src/core/pair-filter.mjs index 0c47bb6..0eccffc 100644 --- a/src/core/pair-filter.mjs +++ b/src/core/pair-filter.mjs @@ -74,14 +74,39 @@ export function createPairFilterController({ pairFilterFile = env.NEAR_INTENTS_PAIR_FILTER_FILE, reloadEveryMs = env.NEAR_INTENTS_PAIR_FILTER_RELOAD_MS, } = {}) { - const resolved = resolvePairFilter({ argv, env, defaultPairFilter }); + function resolveConfiguredState() { + const nextResolved = resolvePairFilter({ argv, env, defaultPairFilter }); + let nextPairFilter = nextResolved.pairFilter; + let nextPair = nextResolved.pair; + let nextSource = nextResolved.source; + + if (normalizedPairFilterFile) { + const fileValue = readPairFilterFile(normalizedPairFilterFile); + if (fileValue != null) { + const filePairFilter = parsePairFilterValue(fileValue, { + fieldName: 'NEAR_INTENTS_PAIR_FILTER_FILE', + }); + nextPairFilter = filePairFilter; + nextPair = formatPairFilter(filePairFilter); + nextSource = 'file'; + } + } + + return { + pairFilter: nextPairFilter, + pair: nextPair, + source: nextSource, + }; + } + + const normalizedPairFilterFile = String(pairFilterFile || '').trim() || null; + const normalizedReloadEveryMs = parseReloadMs(reloadEveryMs); + const resolved = resolveConfiguredState(); let currentPairFilter = resolved.pairFilter; let currentPair = resolved.pair; let lastLoadedFileValue = null; let source = resolved.source; - - const normalizedPairFilterFile = String(pairFilterFile || '').trim() || null; - const normalizedReloadEveryMs = parseReloadMs(reloadEveryMs); + let overrideSource = null; if (normalizedPairFilterFile) { const initialFileValue = readPairFilterFile(normalizedPairFilterFile); @@ -106,6 +131,8 @@ export function createPairFilterController({ const timer = normalizedPairFilterFile ? setInterval(() => { + if (overrideSource === 'api') return; + const nextValue = readPairFilterFile(normalizedPairFilterFile); if (nextValue == null || nextValue === lastLoadedFileValue) return; @@ -136,6 +163,12 @@ export function createPairFilterController({ if (timer && typeof timer.unref === 'function') timer.unref(); + function setState(nextPairFilter, nextSource) { + currentPairFilter = nextPairFilter; + currentPair = formatPairFilter(nextPairFilter); + source = nextSource; + } + return { getPairFilter() { return currentPairFilter; @@ -143,6 +176,58 @@ export function createPairFilterController({ getPair() { return currentPair; }, + getState() { + return { + pairFilter: currentPairFilter, + pair: currentPair, + source, + configured: resolveConfiguredState(), + pairFilterFile: normalizedPairFilterFile, + reloadEveryMs: normalizedPairFilterFile ? normalizedReloadEveryMs : null, + }; + }, + setPairFilter(raw, { source: nextSource = 'api' } = {}) { + const parsed = Array.isArray(raw) + ? raw + : parsePairFilterValue(raw, { + fieldName: 'pair filter update', + }); + overrideSource = nextSource === 'api' ? 'api' : null; + setState(parsed, nextSource); + logger?.info('pair_filter_updated', { + pair: currentPair, + details: { + source: nextSource, + pair_filter_file: normalizedPairFilterFile, + }, + }); + return this.getState(); + }, + disable({ source: nextSource = 'api' } = {}) { + overrideSource = nextSource === 'api' ? 'api' : null; + setState(null, nextSource); + logger?.info('pair_filter_disabled', { + pair: null, + details: { + source: nextSource, + pair_filter_file: normalizedPairFilterFile, + }, + }); + return this.getState(); + }, + reset() { + overrideSource = null; + const configuredState = resolveConfiguredState(); + setState(configuredState.pairFilter, configuredState.source); + logger?.info('pair_filter_reset', { + pair: currentPair, + details: { + source, + pair_filter_file: normalizedPairFilterFile, + }, + }); + return this.getState(); + }, close() { if (timer) clearInterval(timer); }, diff --git a/src/lib/config.mjs b/src/lib/config.mjs index 1b1e31c..780426d 100644 --- a/src/lib/config.mjs +++ b/src/lib/config.mjs @@ -5,6 +5,9 @@ const DEFAULTS = { nearIntentsWsUrl: 'wss://solver-relay-v2.chaindefuser.com/ws', nearIntentsPairFilter: DEFAULT_NEAR_INTENTS_PAIR_FILTER, nearIntentsPairFilterReloadMs: 5_000, + nearIntentsControlApiEnabled: true, + nearIntentsControlHost: '0.0.0.0', + nearIntentsControlPort: 8081, kafkaBrokers: ['127.0.0.1:9092'], kafkaClientId: 'unrip', kafkaTopicRawNearIntentsQuote: 'raw.near_intents.quote', @@ -42,6 +45,12 @@ export function loadConfig({ envPath = '.env' } = {}) { nearIntentsPairFilterFile: process.env.NEAR_INTENTS_PAIR_FILTER_FILE || '', nearIntentsPairFilterReloadMs: parseNumber(process.env.NEAR_INTENTS_PAIR_FILTER_RELOAD_MS, DEFAULTS.nearIntentsPairFilterReloadMs), + nearIntentsControlApiEnabled: + parseBoolean(process.env.NEAR_INTENTS_CONTROL_API_ENABLED, DEFAULTS.nearIntentsControlApiEnabled), + nearIntentsControlHost: + process.env.NEAR_INTENTS_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, + nearIntentsControlPort: + parseNumber(process.env.NEAR_INTENTS_CONTROL_PORT, DEFAULTS.nearIntentsControlPort), kafkaBrokers: splitCsv(process.env.KAFKA_BROKERS).length ? splitCsv(process.env.KAFKA_BROKERS) : DEFAULTS.kafkaBrokers, @@ -70,3 +79,12 @@ function parseNumber(value, fallback) { const parsed = Number(value); return Number.isFinite(parsed) ? parsed : fallback; } + +function parseBoolean(value, fallback) { + if (value == null || value === '') return fallback; + + const normalized = String(value).trim().toLowerCase(); + if (['1', 'true', 'yes', 'on'].includes(normalized)) return true; + if (['0', 'false', 'no', 'off'].includes(normalized)) return false; + return fallback; +} diff --git a/src/venues/near-intents/ws.mjs b/src/venues/near-intents/ws.mjs index 221e584..2cf5afb 100644 --- a/src/venues/near-intents/ws.mjs +++ b/src/venues/near-intents/ws.mjs @@ -25,13 +25,30 @@ export async function startNearIntentsWs({ let quoteStatusSubscriptionId = null; let publishedCount = 0; let publishLocked = false; + let closed = false; + let reconnectTimer = null; + let activeSocket = null; + let connected = false; + let framesReceived = 0; + let quoteFramesReceived = 0; + let filteredCount = 0; + let publishErrorCount = 0; + let invalidJsonCount = 0; + let lastMessageAt = null; + let lastMatchingQuoteAt = null; + let lastPublishedAt = null; + let lastPublishedPair = null; function connect() { + if (closed) return; + const ws = new WebSocket(wsUrl, { headers: { Authorization: `Bearer ${apiKey}` }, }); + activeSocket = ws; ws.addEventListener('open', () => { + connected = true; logger?.info('connection_established', { namespace, }); @@ -40,12 +57,15 @@ export async function startNearIntentsWs({ }); ws.addEventListener('message', async (event) => { + framesReceived += 1; + lastMessageAt = new Date().toISOString(); const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8'); let payload; try { payload = JSON.parse(text); } catch { + invalidJsonCount += 1; return; } @@ -61,6 +81,7 @@ export async function startNearIntentsWs({ const eventFrame = extractQuoteEventFrame(payload); if (!eventFrame) return; + quoteFramesReceived += 1; const { subscription, merged } = eventFrame; @@ -78,15 +99,22 @@ export async function startNearIntentsWs({ if (!assetIn || !assetOut) return; const activePairFilter = getPairFilter(); - if (!matchesPairFilter(assetIn, assetOut, activePairFilter)) return; + if (!matchesPairFilter(assetIn, assetOut, activePairFilter)) { + filteredCount += 1; + return; + } + lastMatchingQuoteAt = new Date().toISOString(); publishLocked = true; try { await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id }); await producer.sendJson(normalizedTopic, envelope, { key: envelope.payload.quote_id }); publishedCount += 1; + lastPublishedAt = new Date().toISOString(); + lastPublishedPair = `${assetIn}->${assetOut}`; onPublish(envelope, publishedCount); } catch (error) { + publishErrorCount += 1; logger?.error('publish_failed', { namespace, topic: normalizedTopic, @@ -103,13 +131,17 @@ export async function startNearIntentsWs({ }); ws.addEventListener('close', () => { + connected = false; + activeSocket = null; logger?.warn('connection_lost', { namespace, details: { reconnect_in_ms: 2_000, }, }); - setTimeout(connect, 2000); + if (!closed) { + reconnectTimer = setTimeout(connect, 2000); + } }); ws.addEventListener('error', (err) => { @@ -123,6 +155,33 @@ export async function startNearIntentsWs({ } connect(); + + return { + getState() { + return { + connected, + frames_received: framesReceived, + quote_frames_received: quoteFramesReceived, + filtered_count: filteredCount, + published_count: publishedCount, + publish_error_count: publishErrorCount, + invalid_json_count: invalidJsonCount, + last_message_at: lastMessageAt, + last_matching_quote_at: lastMatchingQuoteAt, + last_published_at: lastPublishedAt, + last_published_pair: lastPublishedPair, + raw_topic: rawTopic, + normalized_topic: normalizedTopic, + }; + }, + close() { + closed = true; + if (reconnectTimer) clearTimeout(reconnectTimer); + if (activeSocket && activeSocket.readyState <= 1) { + activeSocket.close(); + } + }, + }; } function extractSubscriptionId(result) {