Add near intents control API
All checks were successful
deploy / deploy (push) Successful in 20s

This commit is contained in:
philipp 2026-04-01 10:11:33 +02:00
parent 6e635c32e4
commit d8d9a34db5
12 changed files with 1246 additions and 19 deletions

View file

@ -2,6 +2,9 @@
NEAR_INTENTS_API_KEY=replace_me
NEAR_INTENTS_WS_URL=wss://solver-relay-v2.chaindefuser.com/ws
NEAR_INTENTS_PAIR_FILTER=nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf
NEAR_INTENTS_CONTROL_API_ENABLED=true
NEAR_INTENTS_CONTROL_HOST=0.0.0.0
NEAR_INTENTS_CONTROL_PORT=8081
KAFKA_BROKERS=redpanda:9092
KAFKA_CLIENT_ID=unrip
KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE=raw.near_intents.quote

View file

@ -108,3 +108,80 @@ KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout sta
KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-executor
KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-consumer
```
### Auxiliary ops scripts
These scripts default to the same adjacent platform checkout as the deployment
bootstrap: `../unrip3/.state/hetzner/kubeconfig.yaml`. Override with
`KUBECONFIG_PATH`, `KUBECONFIG`, or `PLATFORM_REPO_DIR` if needed.
`scripts/ops/deployment_status.py`
- Shows current deployment readiness, pod uptime, restart counts, and mounted storage usage.
- By default it shows the live deployment pods only.
- Use `--include-completed` to include completed Job pods.
- Use `--include-rootfs` if you also want a probe of `/` for pods without PVC-backed mounts.
```bash
python3 scripts/ops/deployment_status.py
python3 scripts/ops/deployment_status.py --include-completed
```
`scripts/ops/redpanda_storage.py`
- Shows how much data Redpanda is currently storing for the unrip topics.
- Prints per-topic local bytes, total bytes, segment counts, and the overall Redpanda data-path usage.
- Use `--all-topics` to inspect every visible topic, or `--topic` multiple times for a subset.
```bash
python3 scripts/ops/redpanda_storage.py
python3 scripts/ops/redpanda_storage.py --all-topics
python3 scripts/ops/redpanda_storage.py --topic raw.near_intents.quote --topic norm.swap_demand
```
`scripts/ops/live_near_intents.py`
- Live-inspects the raw NEAR quote stream entering Redpanda.
- Defaults to the configured raw topic, `--offset end`, and an unbounded live tail.
- Use `--num` for a bounded sample, `--offset start` to read from the beginning, `--value-only` for payload-only output, or `--rpk-json` for the full record metadata emitted by `rpk`.
- Use `--timeout` when you want the script to stop automatically.
```bash
python3 scripts/ops/live_near_intents.py
python3 scripts/ops/live_near_intents.py --num 10 --offset start
python3 scripts/ops/live_near_intents.py --value-only --timeout 30
python3 scripts/ops/live_near_intents.py --rpk-json --num 5
```
### Near Intents control API
`near-intents-ingest` exposes a small in-process control API on port `8081` by
default. It is meant for ad hoc inspection and runtime filter changes without a
redeploy.
Port-forward the deployment:
```bash
KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip port-forward deploy/near-intents-ingest 8081:8081
```
Inspect current state, including the active pair filter and ingest counters:
```bash
curl -s http://127.0.0.1:8081/state
```
Set or disable the runtime pair filter:
```bash
curl -s -X PUT http://127.0.0.1:8081/pair-filter \
-H 'content-type: application/json' \
-d '{"pair":"nep141:btc.omft.near->nep141:eth.omft.near"}'
curl -s -X PUT http://127.0.0.1:8081/pair-filter \
-H 'content-type: application/json' \
-d '{"pair":null}'
```
Reset the runtime filter back to the configured env/file/default state:
```bash
curl -s -X POST http://127.0.0.1:8081/pair-filter/reset
```

View file

@ -6,6 +6,9 @@ metadata:
data:
NEAR_INTENTS_WS_URL: wss://solver-relay-v2.chaindefuser.com/ws
NEAR_INTENTS_PAIR_FILTER: nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf
NEAR_INTENTS_CONTROL_API_ENABLED: "true"
NEAR_INTENTS_CONTROL_HOST: 0.0.0.0
NEAR_INTENTS_CONTROL_PORT: "8081"
KAFKA_BROKERS: redpanda.unrip.svc.cluster.local:9092
KAFKA_CLIENT_ID: unrip
KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE: raw.near_intents.quote
@ -52,6 +55,9 @@ spec:
image: ghcr.io/example/unrip:bootstrap
imagePullPolicy: IfNotPresent
command: ["node", "src/apps/near-intents-ingest.mjs"]
ports:
- name: control-api
containerPort: 8081
envFrom:
- configMapRef:
name: unrip-config

301
scripts/ops/common.py Executable file
View file

@ -0,0 +1,301 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import shlex
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
PLATFORM_REPO_DIR = Path(
os.environ.get("PLATFORM_REPO_DIR", str(ROOT_DIR / "../unrip3"))
).resolve()
DEFAULT_KUBECONFIG_PATH = PLATFORM_REPO_DIR / ".state/hetzner/kubeconfig.yaml"
DEFAULT_NAMESPACE = os.environ.get("PROJECT_NAMESPACE", "unrip")
DEFAULT_CONFIGMAP_NAME = os.environ.get("CONFIGMAP_NAME", "unrip-config")
DEFAULT_REDPANDA_DEPLOYMENT = os.environ.get("REDPANDA_DEPLOYMENT", "redpanda")
DEFAULT_REDPANDA_LABEL_SELECTOR = os.environ.get("REDPANDA_LABEL_SELECTOR", "app=redpanda")
DEFAULT_REDPANDA_DATA_PATH = "/var/lib/redpanda/data"
APP_TOPIC_KEYS = (
"KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE",
"KAFKA_TOPIC_NORM_SWAP_DEMAND",
"KAFKA_TOPIC_CMD_EXECUTE_TRADE",
"KAFKA_TOPIC_EXEC_TRADE_RESULT",
)
def resolve_kubeconfig_path() -> str:
return os.environ.get("KUBECONFIG_PATH") or os.environ.get(
"KUBECONFIG", str(DEFAULT_KUBECONFIG_PATH)
)
def build_env() -> dict[str, str]:
env = os.environ.copy()
kubeconfig_path = resolve_kubeconfig_path()
if not Path(kubeconfig_path).exists():
raise SystemExit(
f"missing kubeconfig: {kubeconfig_path}\n"
"set KUBECONFIG_PATH, KUBECONFIG, or PLATFORM_REPO_DIR"
)
env["KUBECONFIG"] = kubeconfig_path
return env
def run(
args: list[str],
*,
capture_output: bool = True,
check: bool = True,
input_text: str | None = None,
) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
env=build_env(),
text=True,
capture_output=capture_output,
input=input_text,
check=check,
)
def kubectl(
*args: str,
capture_output: bool = True,
check: bool = True,
input_text: str | None = None,
) -> subprocess.CompletedProcess[str]:
return run(
["kubectl", *args],
capture_output=capture_output,
check=check,
input_text=input_text,
)
def kubectl_json(*args: str) -> dict:
return json.loads(kubectl(*args).stdout)
def config_value(key: str, namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str:
return kubectl(
"get",
"configmap",
configmap_name,
"-n",
namespace,
"-o",
f"jsonpath={{.data.{key}}}",
).stdout.strip()
def app_topics(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> list[str]:
topics: list[str] = []
for key in APP_TOPIC_KEYS:
value = config_value(key, namespace=namespace, configmap_name=configmap_name).strip()
if value:
topics.append(value)
return topics
def kafka_brokers(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str:
return config_value("KAFKA_BROKERS", namespace=namespace, configmap_name=configmap_name)
def raw_near_topic(namespace: str = DEFAULT_NAMESPACE, configmap_name: str = DEFAULT_CONFIGMAP_NAME) -> str:
return config_value(
"KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE",
namespace=namespace,
configmap_name=configmap_name,
)
def redpanda_pod_name(namespace: str = DEFAULT_NAMESPACE) -> str:
payload = kubectl_json(
"get",
"pods",
"-n",
namespace,
"-l",
DEFAULT_REDPANDA_LABEL_SELECTOR,
"-o",
"json",
)
items = payload.get("items", [])
if not items:
raise SystemExit(f"no Redpanda pod found in namespace {namespace}")
return items[0]["metadata"]["name"]
def redpanda_exec(
*args: str,
namespace: str = DEFAULT_NAMESPACE,
capture_output: bool = True,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
return kubectl(
"exec",
"-n",
namespace,
f"deploy/{DEFAULT_REDPANDA_DEPLOYMENT}",
"--",
*args,
capture_output=capture_output,
check=check,
)
def pod_exec(
pod_name: str,
*args: str,
namespace: str = DEFAULT_NAMESPACE,
capture_output: bool = True,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
return kubectl(
"exec",
"-n",
namespace,
pod_name,
"--",
*args,
capture_output=capture_output,
check=check,
)
def probe_path_usage(
pod_name: str,
path: str,
*,
namespace: str = DEFAULT_NAMESPACE,
) -> dict[str, object]:
shell_path = shlex.quote(path)
probe = "\n".join(
(
f"df -B1 {shell_path} | awk 'NR==2 {{print $2, $3, $4, $5}}'",
f"du -sb {shell_path} 2>/dev/null | awk 'END {{print $1}}'",
)
)
result = pod_exec(
pod_name,
"sh",
"-lc",
probe,
namespace=namespace,
).stdout.strip().splitlines()
if not result:
raise RuntimeError(f"no usage output for {pod_name}:{path}")
fs_fields = result[0].split()
if len(fs_fields) != 4:
raise RuntimeError(f"unexpected df output for {pod_name}:{path}: {result[0]}")
du_bytes = None
if len(result) > 1 and result[1].strip():
try:
du_bytes = int(result[1].strip())
except ValueError:
du_bytes = None
return {
"filesystem_total_bytes": int(fs_fields[0]),
"filesystem_used_bytes": int(fs_fields[1]),
"filesystem_available_bytes": int(fs_fields[2]),
"filesystem_use_percent": fs_fields[3],
"path_bytes": du_bytes,
}
def parse_k8s_timestamp(value: str | None) -> datetime | None:
if not value:
return None
return datetime.fromisoformat(value.replace("Z", "+00:00"))
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def human_duration(seconds: float | int | None) -> str:
if seconds is None:
return "-"
seconds = int(max(0, seconds))
chunks = []
for unit_seconds, suffix in (
(86400, "d"),
(3600, "h"),
(60, "m"),
(1, "s"),
):
if seconds >= unit_seconds or (suffix == "s" and not chunks):
value, seconds = divmod(seconds, unit_seconds)
if value or suffix == "s":
chunks.append(f"{value}{suffix}")
if len(chunks) == 2:
break
return " ".join(chunks)
def human_bytes(value: int | float | None) -> str:
if value is None:
return "-"
value = float(value)
units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB")
unit_index = 0
while value >= 1024 and unit_index < len(units) - 1:
value /= 1024
unit_index += 1
if unit_index == 0:
return f"{int(value)} {units[unit_index]}"
return f"{value:.1f} {units[unit_index]}"
def parse_storage_quantity(value: str | None) -> int | None:
if not value:
return None
binary_units = {
"Ki": 1024**1,
"Mi": 1024**2,
"Gi": 1024**3,
"Ti": 1024**4,
"Pi": 1024**5,
}
decimal_units = {
"K": 1000**1,
"M": 1000**2,
"G": 1000**3,
"T": 1000**4,
"P": 1000**5,
}
for suffix, factor in binary_units.items():
if value.endswith(suffix):
return int(float(value[: -len(suffix)]) * factor)
for suffix, factor in decimal_units.items():
if value.endswith(suffix):
return int(float(value[: -len(suffix)]) * factor)
return int(float(value))
def print_table(headers: list[str], rows: list[list[str]]) -> None:
widths = [len(header) for header in headers]
for row in rows:
for index, cell in enumerate(row):
widths[index] = max(widths[index], len(str(cell)))
template = " ".join(f"{{:{width}}}" for width in widths)
print(template.format(*headers))
print(template.format(*["-" * width for width in widths]))
for row in rows:
print(template.format(*row))
def stderr(message: str) -> None:
print(message, file=sys.stderr)

184
scripts/ops/deployment_status.py Executable file
View file

@ -0,0 +1,184 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from common import (
DEFAULT_NAMESPACE,
human_bytes,
human_duration,
kubectl_json,
now_utc,
parse_k8s_timestamp,
parse_storage_quantity,
print_table,
probe_path_usage,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Show current unrip deployment state, pod uptime, and mounted storage usage."
)
parser.add_argument(
"--namespace",
default=DEFAULT_NAMESPACE,
help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})",
)
parser.add_argument(
"--include-rootfs",
action="store_true",
help="Also probe '/' for pods that do not have PVC-backed mounts.",
)
parser.add_argument(
"--include-completed",
action="store_true",
help="Include completed Job pods in the pod list.",
)
return parser.parse_args()
def ready_string(pod: dict) -> str:
statuses = pod.get("status", {}).get("containerStatuses", [])
ready = sum(1 for status in statuses if status.get("ready"))
total = len(statuses)
return f"{ready}/{total}"
def restart_count(pod: dict) -> int:
return sum(status.get("restartCount", 0) for status in pod.get("status", {}).get("containerStatuses", []))
def pvc_mounts_for_pod(pod: dict) -> list[dict[str, str]]:
volumes = {
volume["name"]: volume.get("persistentVolumeClaim", {}).get("claimName")
for volume in pod.get("spec", {}).get("volumes", [])
}
mounts: list[dict[str, str]] = []
for container in pod.get("spec", {}).get("containers", []):
for mount in container.get("volumeMounts", []):
claim_name = volumes.get(mount["name"])
if claim_name:
mounts.append(
{
"container": container.get("name", "app"),
"claim_name": claim_name,
"mount_path": mount["mountPath"],
}
)
return mounts
def is_completed_job_pod(pod: dict) -> bool:
owners = pod.get("metadata", {}).get("ownerReferences", [])
if not any(owner.get("kind") == "Job" for owner in owners):
return False
return pod.get("status", {}).get("phase") in {"Succeeded", "Failed"}
def main() -> int:
args = parse_args()
namespace = args.namespace
deployment_payload = kubectl_json("get", "deploy", "-n", namespace, "-o", "json")
pod_payload = kubectl_json("get", "pods", "-n", namespace, "-o", "json")
pvc_payload = kubectl_json("get", "pvc", "-n", namespace, "-o", "json")
pvc_requests = {
item["metadata"]["name"]: item.get("spec", {})
.get("resources", {})
.get("requests", {})
.get("storage", "")
for item in pvc_payload.get("items", [])
}
deployment_rows: list[list[str]] = []
for item in sorted(deployment_payload.get("items", []), key=lambda value: value["metadata"]["name"]):
container = item.get("spec", {}).get("template", {}).get("spec", {}).get("containers", [{}])[0]
deployment_rows.append(
[
item["metadata"]["name"],
f"{item.get('status', {}).get('readyReplicas', 0)}/{item.get('spec', {}).get('replicas', 0)}",
str(item.get("status", {}).get("availableReplicas", 0)),
container.get("image", "-"),
]
)
pod_rows: list[list[str]] = []
storage_rows: list[list[str]] = []
for pod in sorted(pod_payload.get("items", []), key=lambda value: value["metadata"]["name"]):
if not args.include_completed and is_completed_job_pod(pod):
continue
pod_name = pod["metadata"]["name"]
start_time = parse_k8s_timestamp(
pod.get("status", {}).get("startTime")
or pod.get("status", {}).get("containerStatuses", [{}])[0]
.get("state", {})
.get("running", {})
.get("startedAt")
)
uptime = human_duration((now_utc() - start_time).total_seconds() if start_time else None)
image = pod.get("spec", {}).get("containers", [{}])[0].get("image", "-")
pod_rows.append(
[
pod_name,
pod.get("status", {}).get("phase", "-"),
ready_string(pod),
str(restart_count(pod)),
uptime,
pod.get("spec", {}).get("nodeName", "-"),
image,
]
)
mounts = pvc_mounts_for_pod(pod)
if not mounts and args.include_rootfs:
mounts = [{"claim_name": "-", "mount_path": "/", "container": "app"}]
if not mounts:
storage_rows.append([pod_name, "-", "no pvc mounts", "-", "-", "-", "-", "-"])
continue
for mount in mounts:
usage = probe_path_usage(pod_name, mount["mount_path"], namespace=namespace)
requested_raw = pvc_requests.get(mount["claim_name"], "")
requested_bytes = parse_storage_quantity(requested_raw)
storage_rows.append(
[
pod_name,
mount["mount_path"],
mount["claim_name"],
human_bytes(requested_bytes) if requested_bytes is not None else (requested_raw or "-"),
human_bytes(usage["path_bytes"]),
human_bytes(usage["filesystem_used_bytes"]),
human_bytes(usage["filesystem_available_bytes"]),
str(usage["filesystem_use_percent"]),
]
)
print(f"Namespace: {namespace}")
print()
print("Deployments")
print_table(["NAME", "READY", "AVAILABLE", "IMAGE"], deployment_rows)
print()
print("Pods")
print_table(["POD", "PHASE", "READY", "RESTARTS", "UPTIME", "NODE", "IMAGE"], pod_rows)
print()
print("Storage")
print_table(
["POD", "PATH", "CLAIM", "REQUESTED", "PATH_BYTES", "FS_USED", "FS_AVAIL", "USE%"],
storage_rows,
)
print()
print("Note: storage figures come from container-mounted paths. Pods without PVC-backed mounts report no dedicated storage.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

127
scripts/ops/live_near_intents.py Executable file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import signal
import subprocess
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from common import DEFAULT_NAMESPACE, build_env, kafka_brokers, raw_near_topic, stderr
DEFAULT_OUTPUT_FORMAT = (
"%d{go[2006-01-02T15:04:05Z07:00]} topic=%t partition=%p offset=%o\n"
"%v\n"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Live-inspect raw NEAR intents arriving in Redpanda."
)
parser.add_argument(
"--namespace",
default=DEFAULT_NAMESPACE,
help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})",
)
parser.add_argument(
"--brokers",
default="",
help="Override Kafka brokers instead of reading them from unrip-config.",
)
parser.add_argument(
"--topic",
default="",
help="Topic to consume. Defaults to KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE from unrip-config.",
)
parser.add_argument(
"--offset",
default="end",
help="Kafka offset selector passed to rpk (default: end)",
)
parser.add_argument(
"--num",
type=int,
default=0,
help="Number of records to read before exiting. 0 means unbounded (default: 0).",
)
parser.add_argument(
"--timeout",
type=float,
default=0,
help="Stop after this many seconds even if the stream is still open.",
)
parser.add_argument(
"--value-only",
action="store_true",
help="Print only the record value without topic/partition/offset metadata.",
)
parser.add_argument(
"--rpk-json",
action="store_true",
help="Use rpk's built-in JSON record formatter instead of the default metadata+value output.",
)
return parser.parse_args()
def build_command(args: argparse.Namespace, brokers: str, topic: str) -> list[str]:
command = [
"kubectl",
"exec",
"-i",
"-n",
args.namespace,
"deploy/redpanda",
"--",
"rpk",
"topic",
"consume",
topic,
"--brokers",
brokers,
"--offset",
args.offset,
"--num",
str(args.num),
]
if args.rpk_json:
command.extend(["--format", "json", "--pretty-print=false"])
elif args.value_only:
command.extend(["--format", "%v\n"])
else:
command.extend(["--format", DEFAULT_OUTPUT_FORMAT])
return command
def main() -> int:
args = parse_args()
brokers = args.brokers or kafka_brokers(namespace=args.namespace)
topic = args.topic or raw_near_topic(namespace=args.namespace)
command = build_command(args, brokers, topic)
stderr(f"namespace={args.namespace} brokers={brokers} topic={topic} offset={args.offset} num={args.num}")
process = subprocess.Popen(command, env=build_env())
try:
process.wait(timeout=args.timeout if args.timeout and args.timeout > 0 else None)
except subprocess.TimeoutExpired:
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait(timeout=5)
return 124
except KeyboardInterrupt:
process.send_signal(signal.SIGINT)
return process.wait()
return process.returncode
if __name__ == "__main__":
raise SystemExit(main())

196
scripts/ops/redpanda_storage.py Executable file
View file

@ -0,0 +1,196 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import re
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from common import (
DEFAULT_NAMESPACE,
DEFAULT_REDPANDA_DATA_PATH,
app_topics,
human_bytes,
kafka_brokers,
print_table,
redpanda_exec,
redpanda_pod_name,
probe_path_usage,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Show how much data Redpanda is currently storing for unrip topics."
)
parser.add_argument(
"--namespace",
default=DEFAULT_NAMESPACE,
help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})",
)
parser.add_argument(
"--brokers",
default="",
help="Override Kafka brokers instead of reading them from unrip-config.",
)
parser.add_argument(
"--topic",
action="append",
default=[],
help="Specific topic to inspect. Can be passed multiple times.",
)
parser.add_argument(
"--all-topics",
action="store_true",
help="Inspect every topic visible to Redpanda instead of the app topics from config.",
)
return parser.parse_args()
def list_topics(namespace: str, brokers: str) -> list[str]:
output = redpanda_exec(
"rpk",
"topic",
"list",
"--brokers",
brokers,
namespace=namespace,
).stdout.splitlines()
topics: list[str] = []
for line in output[1:]:
fields = line.split()
if fields:
topics.append(fields[0])
return topics
def parse_storage(output: str) -> dict[str, int | str]:
summary: dict[str, str] = {}
size_rows: list[dict[str, int]] = []
section = ""
size_header_seen = False
for raw_line in output.splitlines():
line = raw_line.strip()
if not line or set(line) == {"="}:
continue
if line == "SUMMARY":
section = "summary"
continue
if line == "SIZE":
section = "size"
size_header_seen = False
continue
if section == "summary":
fields = re.split(r"\s{2,}", line, maxsplit=1)
if len(fields) == 2:
summary[fields[0]] = fields[1]
continue
if section == "size":
if not size_header_seen:
size_header_seen = True
continue
fields = re.split(r"\s+", line)
if len(fields) < 6:
continue
size_rows.append(
{
"partition": int(fields[0]),
"cloud_bytes": int(fields[1]),
"local_bytes": int(fields[2]),
"total_bytes": int(fields[3]),
"cloud_segments": int(fields[4]),
"local_segments": int(fields[5]),
}
)
return {
"name": summary.get("NAME", ""),
"partitions": int(summary.get("PARTITIONS", "0")),
"replicas": int(summary.get("REPLICAS", "0")),
"local_bytes": sum(row["local_bytes"] for row in size_rows),
"total_bytes": sum(row["total_bytes"] for row in size_rows),
"local_segments": sum(row["local_segments"] for row in size_rows),
}
def main() -> int:
args = parse_args()
namespace = args.namespace
brokers = args.brokers or kafka_brokers(namespace=namespace)
if args.topic:
topics = args.topic
elif args.all_topics:
topics = list_topics(namespace, brokers)
else:
topics = app_topics(namespace=namespace)
if not topics:
raise SystemExit("no topics found")
topic_rows: list[list[str]] = []
total_local_bytes = 0
total_bytes = 0
total_segments = 0
for topic in topics:
output = redpanda_exec(
"rpk",
"topic",
"describe-storage",
topic,
"--brokers",
brokers,
namespace=namespace,
).stdout
parsed = parse_storage(output)
total_local_bytes += int(parsed["local_bytes"])
total_bytes += int(parsed["total_bytes"])
total_segments += int(parsed["local_segments"])
topic_rows.append(
[
topic,
str(parsed["partitions"]),
str(parsed["replicas"]),
human_bytes(int(parsed["local_bytes"])),
human_bytes(int(parsed["total_bytes"])),
str(parsed["local_segments"]),
]
)
redpanda_pod = redpanda_pod_name(namespace=namespace)
usage = probe_path_usage(redpanda_pod, DEFAULT_REDPANDA_DATA_PATH, namespace=namespace)
print(f"Namespace: {namespace}")
print(f"Brokers: {brokers}")
print(f"Pod: {redpanda_pod}")
print(f"Data path: {DEFAULT_REDPANDA_DATA_PATH}")
print(
"Disk: "
f"path={human_bytes(usage['path_bytes'])}, "
f"fs_used={human_bytes(usage['filesystem_used_bytes'])}, "
f"fs_avail={human_bytes(usage['filesystem_available_bytes'])}, "
f"use={usage['filesystem_use_percent']}"
)
print()
print("Topics")
print_table(
["TOPIC", "PARTITIONS", "REPLICAS", "LOCAL_BYTES", "TOTAL_BYTES", "LOCAL_SEGMENTS"],
topic_rows,
)
print()
print(
f"Totals: local={human_bytes(total_local_bytes)}, "
f"total={human_bytes(total_bytes)}, "
f"segments={total_segments}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,6 +1,7 @@
import process from 'node:process';
import { createProducer } from '../bus/kafka/producer.mjs';
import { startControlApi } from '../core/control-api.mjs';
import { createLogger } from '../core/log.mjs';
import { createPairFilterController } from '../core/pair-filter.mjs';
import { loadConfig } from '../lib/config.mjs';
@ -40,19 +41,7 @@ const producer = await createProducer({
logger,
});
process.on('SIGINT', async () => {
pairFilterController.close();
await producer.disconnect();
process.exit(0);
});
process.on('SIGTERM', async () => {
pairFilterController.close();
await producer.disconnect();
process.exit(0);
});
await startNearIntentsWs({
const wsRuntime = await startNearIntentsWs({
apiKey: config.nearIntentsApiKey,
wsUrl: config.nearIntentsWsUrl,
getPairFilter: () => pairFilterController.getPairFilter(),
@ -65,3 +54,29 @@ await startNearIntentsWs({
venue: 'near-intents',
}),
});
const controlApi = config.nearIntentsControlApiEnabled
? startControlApi({
host: config.nearIntentsControlHost,
port: config.nearIntentsControlPort,
logger: logger.child({
component: 'control-api',
venue: 'near-intents',
}),
service: 'near-intents-ingest',
namespace: config.projectNamespace,
pairFilterController,
stateProvider: wsRuntime,
})
: null;
async function shutdown() {
controlApi && await controlApi.close().catch(() => {});
wsRuntime.close();
pairFilterController.close();
await producer.disconnect();
process.exit(0);
}
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

156
src/core/control-api.mjs Normal file
View file

@ -0,0 +1,156 @@
import http from 'node:http';
export function startControlApi({
host = '0.0.0.0',
port = 8081,
logger = null,
service = 'near-intents-ingest',
namespace = 'unrip',
pairFilterController,
stateProvider = null,
} = {}) {
const server = http.createServer(async (req, res) => {
try {
if (req.method === 'GET' && req.url === '/healthz') {
return sendJson(res, 200, {
ok: true,
service,
namespace,
});
}
if (req.method === 'GET' && req.url === '/state') {
return sendJson(res, 200, buildStateResponse({
service,
namespace,
pairFilterController,
stateProvider,
}));
}
if (req.method === 'GET' && req.url === '/pair-filter') {
return sendJson(res, 200, {
service,
namespace,
...pairFilterController.getState(),
});
}
if (req.method === 'PUT' && req.url === '/pair-filter') {
const body = await readJsonBody(req);
if (body.disabled === true || body.enabled === false || body.pair == null) {
return sendJson(res, 200, {
service,
namespace,
...pairFilterController.disable(),
});
}
if (typeof body.pair !== 'string') {
return sendJson(res, 400, {
error: "send JSON like {\"pair\":\"asset_a->asset_b\"} or {\"pair\":null}",
});
}
return sendJson(res, 200, {
service,
namespace,
...pairFilterController.setPairFilter(body.pair),
});
}
if (req.method === 'POST' && req.url === '/pair-filter/reset') {
return sendJson(res, 200, {
service,
namespace,
...pairFilterController.reset(),
});
}
return sendJson(res, 404, {
error: 'not_found',
});
} catch (error) {
logger?.error('control_api_request_failed', {
details: {
method: req.method,
path: req.url,
error: error.message,
},
});
return sendJson(res, 500, {
error: error.message,
});
}
});
server.listen(port, host, () => {
logger?.info('control_api_started', {
details: {
host,
port,
},
});
});
return {
close() {
return new Promise((resolve, reject) => {
server.close((error) => {
if (error) reject(error);
else resolve();
});
});
},
};
}
function buildStateResponse({
service,
namespace,
pairFilterController,
stateProvider,
}) {
return {
service,
namespace,
pair_filter: pairFilterController.getState(),
ingest: stateProvider?.getState?.() ?? null,
};
}
function sendJson(res, statusCode, payload) {
const body = JSON.stringify(payload, null, 2);
res.statusCode = statusCode;
res.setHeader('content-type', 'application/json; charset=utf-8');
res.end(`${body}\n`);
}
function readJsonBody(req) {
return new Promise((resolve, reject) => {
let raw = '';
req.on('data', (chunk) => {
raw += chunk;
if (raw.length > 64 * 1024) {
reject(new Error('request body too large'));
}
});
req.on('end', () => {
if (!raw.trim()) {
resolve({});
return;
}
try {
resolve(JSON.parse(raw));
} catch {
reject(new Error('invalid_json'));
}
});
req.on('error', reject);
});
}

View file

@ -74,14 +74,39 @@ export function createPairFilterController({
pairFilterFile = env.NEAR_INTENTS_PAIR_FILTER_FILE,
reloadEveryMs = env.NEAR_INTENTS_PAIR_FILTER_RELOAD_MS,
} = {}) {
const resolved = resolvePairFilter({ argv, env, defaultPairFilter });
function resolveConfiguredState() {
const nextResolved = resolvePairFilter({ argv, env, defaultPairFilter });
let nextPairFilter = nextResolved.pairFilter;
let nextPair = nextResolved.pair;
let nextSource = nextResolved.source;
if (normalizedPairFilterFile) {
const fileValue = readPairFilterFile(normalizedPairFilterFile);
if (fileValue != null) {
const filePairFilter = parsePairFilterValue(fileValue, {
fieldName: 'NEAR_INTENTS_PAIR_FILTER_FILE',
});
nextPairFilter = filePairFilter;
nextPair = formatPairFilter(filePairFilter);
nextSource = 'file';
}
}
return {
pairFilter: nextPairFilter,
pair: nextPair,
source: nextSource,
};
}
const normalizedPairFilterFile = String(pairFilterFile || '').trim() || null;
const normalizedReloadEveryMs = parseReloadMs(reloadEveryMs);
const resolved = resolveConfiguredState();
let currentPairFilter = resolved.pairFilter;
let currentPair = resolved.pair;
let lastLoadedFileValue = null;
let source = resolved.source;
const normalizedPairFilterFile = String(pairFilterFile || '').trim() || null;
const normalizedReloadEveryMs = parseReloadMs(reloadEveryMs);
let overrideSource = null;
if (normalizedPairFilterFile) {
const initialFileValue = readPairFilterFile(normalizedPairFilterFile);
@ -106,6 +131,8 @@ export function createPairFilterController({
const timer = normalizedPairFilterFile
? setInterval(() => {
if (overrideSource === 'api') return;
const nextValue = readPairFilterFile(normalizedPairFilterFile);
if (nextValue == null || nextValue === lastLoadedFileValue) return;
@ -136,6 +163,12 @@ export function createPairFilterController({
if (timer && typeof timer.unref === 'function') timer.unref();
function setState(nextPairFilter, nextSource) {
currentPairFilter = nextPairFilter;
currentPair = formatPairFilter(nextPairFilter);
source = nextSource;
}
return {
getPairFilter() {
return currentPairFilter;
@ -143,6 +176,58 @@ export function createPairFilterController({
getPair() {
return currentPair;
},
getState() {
return {
pairFilter: currentPairFilter,
pair: currentPair,
source,
configured: resolveConfiguredState(),
pairFilterFile: normalizedPairFilterFile,
reloadEveryMs: normalizedPairFilterFile ? normalizedReloadEveryMs : null,
};
},
setPairFilter(raw, { source: nextSource = 'api' } = {}) {
const parsed = Array.isArray(raw)
? raw
: parsePairFilterValue(raw, {
fieldName: 'pair filter update',
});
overrideSource = nextSource === 'api' ? 'api' : null;
setState(parsed, nextSource);
logger?.info('pair_filter_updated', {
pair: currentPair,
details: {
source: nextSource,
pair_filter_file: normalizedPairFilterFile,
},
});
return this.getState();
},
disable({ source: nextSource = 'api' } = {}) {
overrideSource = nextSource === 'api' ? 'api' : null;
setState(null, nextSource);
logger?.info('pair_filter_disabled', {
pair: null,
details: {
source: nextSource,
pair_filter_file: normalizedPairFilterFile,
},
});
return this.getState();
},
reset() {
overrideSource = null;
const configuredState = resolveConfiguredState();
setState(configuredState.pairFilter, configuredState.source);
logger?.info('pair_filter_reset', {
pair: currentPair,
details: {
source,
pair_filter_file: normalizedPairFilterFile,
},
});
return this.getState();
},
close() {
if (timer) clearInterval(timer);
},

View file

@ -5,6 +5,9 @@ const DEFAULTS = {
nearIntentsWsUrl: 'wss://solver-relay-v2.chaindefuser.com/ws',
nearIntentsPairFilter: DEFAULT_NEAR_INTENTS_PAIR_FILTER,
nearIntentsPairFilterReloadMs: 5_000,
nearIntentsControlApiEnabled: true,
nearIntentsControlHost: '0.0.0.0',
nearIntentsControlPort: 8081,
kafkaBrokers: ['127.0.0.1:9092'],
kafkaClientId: 'unrip',
kafkaTopicRawNearIntentsQuote: 'raw.near_intents.quote',
@ -42,6 +45,12 @@ export function loadConfig({ envPath = '.env' } = {}) {
nearIntentsPairFilterFile: process.env.NEAR_INTENTS_PAIR_FILTER_FILE || '',
nearIntentsPairFilterReloadMs:
parseNumber(process.env.NEAR_INTENTS_PAIR_FILTER_RELOAD_MS, DEFAULTS.nearIntentsPairFilterReloadMs),
nearIntentsControlApiEnabled:
parseBoolean(process.env.NEAR_INTENTS_CONTROL_API_ENABLED, DEFAULTS.nearIntentsControlApiEnabled),
nearIntentsControlHost:
process.env.NEAR_INTENTS_CONTROL_HOST || DEFAULTS.nearIntentsControlHost,
nearIntentsControlPort:
parseNumber(process.env.NEAR_INTENTS_CONTROL_PORT, DEFAULTS.nearIntentsControlPort),
kafkaBrokers: splitCsv(process.env.KAFKA_BROKERS).length
? splitCsv(process.env.KAFKA_BROKERS)
: DEFAULTS.kafkaBrokers,
@ -70,3 +79,12 @@ function parseNumber(value, fallback) {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : fallback;
}
function parseBoolean(value, fallback) {
if (value == null || value === '') return fallback;
const normalized = String(value).trim().toLowerCase();
if (['1', 'true', 'yes', 'on'].includes(normalized)) return true;
if (['0', 'false', 'no', 'off'].includes(normalized)) return false;
return fallback;
}

View file

@ -25,13 +25,30 @@ export async function startNearIntentsWs({
let quoteStatusSubscriptionId = null;
let publishedCount = 0;
let publishLocked = false;
let closed = false;
let reconnectTimer = null;
let activeSocket = null;
let connected = false;
let framesReceived = 0;
let quoteFramesReceived = 0;
let filteredCount = 0;
let publishErrorCount = 0;
let invalidJsonCount = 0;
let lastMessageAt = null;
let lastMatchingQuoteAt = null;
let lastPublishedAt = null;
let lastPublishedPair = null;
function connect() {
if (closed) return;
const ws = new WebSocket(wsUrl, {
headers: { Authorization: `Bearer ${apiKey}` },
});
activeSocket = ws;
ws.addEventListener('open', () => {
connected = true;
logger?.info('connection_established', {
namespace,
});
@ -40,12 +57,15 @@ export async function startNearIntentsWs({
});
ws.addEventListener('message', async (event) => {
framesReceived += 1;
lastMessageAt = new Date().toISOString();
const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8');
let payload;
try {
payload = JSON.parse(text);
} catch {
invalidJsonCount += 1;
return;
}
@ -61,6 +81,7 @@ export async function startNearIntentsWs({
const eventFrame = extractQuoteEventFrame(payload);
if (!eventFrame) return;
quoteFramesReceived += 1;
const { subscription, merged } = eventFrame;
@ -78,15 +99,22 @@ export async function startNearIntentsWs({
if (!assetIn || !assetOut) return;
const activePairFilter = getPairFilter();
if (!matchesPairFilter(assetIn, assetOut, activePairFilter)) return;
if (!matchesPairFilter(assetIn, assetOut, activePairFilter)) {
filteredCount += 1;
return;
}
lastMatchingQuoteAt = new Date().toISOString();
publishLocked = true;
try {
await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id });
await producer.sendJson(normalizedTopic, envelope, { key: envelope.payload.quote_id });
publishedCount += 1;
lastPublishedAt = new Date().toISOString();
lastPublishedPair = `${assetIn}->${assetOut}`;
onPublish(envelope, publishedCount);
} catch (error) {
publishErrorCount += 1;
logger?.error('publish_failed', {
namespace,
topic: normalizedTopic,
@ -103,13 +131,17 @@ export async function startNearIntentsWs({
});
ws.addEventListener('close', () => {
connected = false;
activeSocket = null;
logger?.warn('connection_lost', {
namespace,
details: {
reconnect_in_ms: 2_000,
},
});
setTimeout(connect, 2000);
if (!closed) {
reconnectTimer = setTimeout(connect, 2000);
}
});
ws.addEventListener('error', (err) => {
@ -123,6 +155,33 @@ export async function startNearIntentsWs({
}
connect();
return {
getState() {
return {
connected,
frames_received: framesReceived,
quote_frames_received: quoteFramesReceived,
filtered_count: filteredCount,
published_count: publishedCount,
publish_error_count: publishErrorCount,
invalid_json_count: invalidJsonCount,
last_message_at: lastMessageAt,
last_matching_quote_at: lastMatchingQuoteAt,
last_published_at: lastPublishedAt,
last_published_pair: lastPublishedPair,
raw_topic: rawTopic,
normalized_topic: normalizedTopic,
};
},
close() {
closed = true;
if (reconnectTimer) clearTimeout(reconnectTimer);
if (activeSocket && activeSocket.readyState <= 1) {
activeSocket.close();
}
},
};
}
function extractSubscriptionId(result) {