#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sys import time from dataclasses import dataclass from decimal import Decimal, ROUND_HALF_UP from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) from common import DEFAULT_NAMESPACE, kubectl CONTROL_PORTS = { "near-intents-ingest": 8081, "inventory-sync": 8083, "history-writer": 8085, "strategy-engine": 8086, "trade-executor": 8087, } @dataclass(frozen=True) class AssetMeta: asset_id: str symbol: str decimals: int def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Live-watch the active market-maker loop: matching quotes, trade decisions, " "execution commands, execution results, arming state, and credited inventory." ) ) parser.add_argument( "--namespace", default=DEFAULT_NAMESPACE, help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})", ) parser.add_argument( "--interval", type=float, default=5.0, help="Polling interval in seconds (default: 5.0)", ) parser.add_argument( "--backfill", type=int, default=0, help="Print the most recent N events from each stream before live tailing.", ) parser.add_argument( "--event-limit", type=int, default=12, help="How many recent rows to fetch per stream on each poll (default: 12).", ) parser.add_argument( "--once", action="store_true", help="Print one snapshot and exit without live polling.", ) parser.add_argument( "--heartbeat-every", type=float, default=30.0, help="Print an idle heartbeat after this many quiet seconds (default: 30.0).", ) return parser.parse_args() def main() -> int: args = parse_args() assets = load_asset_registry(namespace=args.namespace) seen = { "quote": set(), "decision": set(), "command": set(), "result": set(), } bootstrap_rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit) if args.backfill > 0: print_snapshot(namespace=args.namespace, assets=assets) emit_backfill(bootstrap_rows, seen, assets=assets, backfill=args.backfill) else: mark_seen(bootstrap_rows, seen) print_snapshot(namespace=args.namespace, assets=assets) if args.once: return 0 previous_state = current_state_fingerprint(namespace=args.namespace, assets=assets) last_heartbeat_at = time.monotonic() try: while True: rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit) emitted = emit_new_rows(rows, seen, assets=assets) current_state = current_state_fingerprint(namespace=args.namespace, assets=assets) if current_state != previous_state: print_state_line(current_state) previous_state = current_state emitted = True if emitted: last_heartbeat_at = time.monotonic() elif time.monotonic() - last_heartbeat_at >= args.heartbeat_every: print_heartbeat(current_state) last_heartbeat_at = time.monotonic() time.sleep(args.interval) except KeyboardInterrupt: return 130 def load_asset_registry(*, namespace: str) -> dict[str, AssetMeta]: rows = query_rows(ASSET_QUERY, namespace=namespace, columns=ASSET_COLUMNS) return { row["asset_id"]: AssetMeta( asset_id=row["asset_id"], symbol=row["symbol"] or row["asset_id"], decimals=int(row["decimals"] or "0"), ) for row in rows if row["asset_id"] } def fetch_event_rows(*, namespace: str, limit: int) -> dict[str, list[dict[str, str]]]: return { "quote": query_rows(QUOTE_QUERY.format(limit=limit), namespace=namespace, columns=QUOTE_COLUMNS), "decision": query_rows( DECISION_QUERY.format(limit=limit), namespace=namespace, columns=DECISION_COLUMNS, ), "command": query_rows( COMMAND_QUERY.format(limit=limit), namespace=namespace, columns=COMMAND_COLUMNS, ), "result": query_rows( RESULT_QUERY.format(limit=limit), namespace=namespace, columns=RESULT_COLUMNS, ), } def query_rows(query: str, *, namespace: str, columns: list[str]) -> list[dict[str, str]]: result = kubectl( "exec", "-n", namespace, "deploy/postgres", "--", "psql", "-U", "unrip", "-d", "unrip", "-At", "-F", "\t", "-c", query, ) rows: list[dict[str, str]] = [] for line in result.stdout.splitlines(): if not line.strip(): continue values = line.split("\t") if len(values) < len(columns): values.extend([""] * (len(columns) - len(values))) rows.append(dict(zip(columns, values))) return rows def fetch_service_state(*, namespace: str, deployment: str) -> dict: port = CONTROL_PORTS[deployment] expression = ( f"fetch('http://127.0.0.1:{port}/state')" ".then(async (response) => {" " if (!response.ok) throw new Error(`state fetch failed: ${response.status}`);" " process.stdout.write(await response.text());" "})" ".catch((error) => {" " console.error(error.message);" " process.exit(1);" "});" ) result = kubectl( "exec", "-n", namespace, f"deploy/{deployment}", "--", "node", "-e", expression, ) return json.loads(result.stdout) def current_state_fingerprint(*, namespace: str, assets: dict[str, AssetMeta]) -> dict[str, str]: ingest = fetch_service_state(namespace=namespace, deployment="near-intents-ingest") inventory = fetch_service_state(namespace=namespace, deployment="inventory-sync") history = fetch_service_state(namespace=namespace, deployment="history-writer") strategy = fetch_service_state(namespace=namespace, deployment="strategy-engine") executor = fetch_service_state(namespace=namespace, deployment="trade-executor") snapshot = inventory.get("last_snapshot") or {} spendable = snapshot.get("spendable") or {} latest_decision = strategy.get("latest_decision") or {} latest_metrics = history.get("latest_portfolio_metrics") or {} return { "ts": now_string(), "last_matching_quote_at": str(ingest.get("ingest", {}).get("last_matching_quote_at") or "none"), "published_quotes": str(ingest.get("ingest", {}).get("published_count") or 0), "strategy_armed": str(bool(strategy.get("armed"))).lower(), "executor_armed": str(bool(executor.get("armed"))).lower(), "signer_registered": str(executor.get("signer_registered")), "btc_spendable": format_amount(spendable.get(find_symbol_asset_id(assets, "BTC"), "0"), assets.get(find_symbol_asset_id(assets, "BTC"))), "eure_spendable": format_amount(spendable.get(find_symbol_asset_id(assets, "EURe"), "0"), assets.get(find_symbol_asset_id(assets, "EURe"))), "latest_reason": str(latest_decision.get("decision_reason") or "none"), "latest_quote_id": str(latest_decision.get("quote_id") or "none"), "trade_pnl_eure": str(latest_metrics.get("trade_pnl_eure") or "-"), "mark_to_market_pnl_eure": str(latest_metrics.get("mark_to_market_pnl_eure") or "-"), } def emit_backfill( rows: dict[str, list[dict[str, str]]], seen: dict[str, set[str]], *, assets: dict[str, AssetMeta], backfill: int, ) -> None: for stream, formatter in STREAM_FORMATTERS.items(): stream_rows = list(reversed(rows[stream][:backfill])) for row in stream_rows: if row["event_id"] in seen[stream]: continue print(formatter(row, assets)) seen[stream].add(row["event_id"]) mark_seen(rows, seen) def emit_new_rows( rows: dict[str, list[dict[str, str]]], seen: dict[str, set[str]], *, assets: dict[str, AssetMeta], ) -> bool: emitted = False for stream, formatter in STREAM_FORMATTERS.items(): fresh = select_new_rows(rows[stream], seen[stream]) for row in fresh: print(formatter(row, assets)) emitted = True return emitted def mark_seen(rows: dict[str, list[dict[str, str]]], seen: dict[str, set[str]]) -> None: for stream, stream_rows in rows.items(): for row in stream_rows: seen[stream].add(row["event_id"]) def select_new_rows(rows: list[dict[str, str]], seen: set[str]) -> list[dict[str, str]]: fresh = [row for row in reversed(rows) if row["event_id"] not in seen] for row in fresh: seen.add(row["event_id"]) return fresh def print_snapshot(*, namespace: str, assets: dict[str, AssetMeta]) -> None: state = current_state_fingerprint(namespace=namespace, assets=assets) print( "WATCH" f" namespace={namespace}" f" last_matching_quote_at={state['last_matching_quote_at']}" f" published_quotes={state['published_quotes']}" f" strategy_armed={state['strategy_armed']}" f" executor_armed={state['executor_armed']}" f" signer_registered={state['signer_registered']}" f" btc={state['btc_spendable']}" f" eure={state['eure_spendable']}" f" trade_pnl_eure={state['trade_pnl_eure']}" f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}" f" latest_reason={state['latest_reason']}" f" latest_quote={state['latest_quote_id']}" ) def print_state_line(state: dict[str, str]) -> None: print( f"{state['ts']} STATE" f" strategy_armed={state['strategy_armed']}" f" executor_armed={state['executor_armed']}" f" signer_registered={state['signer_registered']}" f" btc={state['btc_spendable']}" f" eure={state['eure_spendable']}" f" trade_pnl_eure={state['trade_pnl_eure']}" f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}" f" last_matching_quote_at={state['last_matching_quote_at']}" f" latest_reason={state['latest_reason']}" f" latest_quote={state['latest_quote_id']}" ) def print_heartbeat(state: dict[str, str]) -> None: print( f"{state['ts']} HEARTBEAT" f" strategy_armed={state['strategy_armed']}" f" executor_armed={state['executor_armed']}" f" trade_pnl_eure={state['trade_pnl_eure']}" f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}" f" latest_reason={state['latest_reason']}" f" latest_quote={state['latest_quote_id']}" ) def render_quote_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str: asset_in = assets.get(row["asset_in"]) asset_out = assets.get(row["asset_out"]) amount_in = format_amount(row["amount_in"], asset_in) if row["amount_in"] else "-" amount_out = format_amount(row["amount_out"], asset_out) if row["amount_out"] else "-" return ( f"{row['ingested_at']} QUOTE" f" quote={row['quote_id']}" f" pair={pair_label(row['asset_in'], row['asset_out'], assets)}" f" kind={row['request_kind']}" f" amount_in={amount_in}" f" amount_out={amount_out}" f" deadline_ms={row['min_deadline_ms'] or '-'}" ) def render_decision_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str: inventory_asset = assets.get(row["inventory_asset"]) need = format_amount(row["inventory_required"], inventory_asset) if row["inventory_required"] else "-" have = format_amount(row["inventory_available"], inventory_asset) if row["inventory_available"] else "-" return ( f"{row['ingested_at']} DECISION" f" quote={row['quote_id']}" f" direction={row['direction']}" f" decision={row['decision']}" f" reason={row['decision_reason']}" f" notional={row.get('notional') or row.get('eure_notional') or '-'}" f" notional_symbol={row.get('notional_symbol') or ('EURe' if row.get('eure_notional') else '-')}" f" gross_edge_pct={row['gross_edge_pct'] or '-'}" f" need={need}" f" have={have}" ) def render_command_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str: asset_in = assets.get(row["asset_in"]) asset_out = assets.get(row["asset_out"]) proposed_in = format_amount(row["proposed_amount_in"], asset_in) if row["proposed_amount_in"] else "-" proposed_out = format_amount(row["proposed_amount_out"], asset_out) if row["proposed_amount_out"] else "-" return ( f"{row['ingested_at']} COMMAND" f" quote={row['quote_id']}" f" command={row['command_id']}" f" kind={row['request_kind']}" f" proposed_in={proposed_in}" f" proposed_out={proposed_out}" ) def render_result_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str: extra = row["venue_response"] or row["error_message"] or "-" return ( f"{row['ingested_at']} RESULT" f" quote={row['quote_id']}" f" status={row['status']}" f" code={row['result_code']}" f" detail={extra}" ) def pair_label(asset_in: str, asset_out: str, assets: dict[str, AssetMeta]) -> str: return f"{asset_symbol(asset_in, assets)}->{asset_symbol(asset_out, assets)}" def asset_symbol(asset_id: str, assets: dict[str, AssetMeta]) -> str: asset = assets.get(asset_id) return asset.symbol if asset else asset_id def find_symbol_asset_id(assets: dict[str, AssetMeta], symbol: str) -> str: for asset_id, asset in assets.items(): if asset.symbol == symbol: return asset_id raise KeyError(f"missing asset for symbol {symbol}") def format_amount(raw_amount: str | None, asset: AssetMeta | None, *, max_places: int = 8) -> str: if raw_amount in (None, ""): return "-" if asset is None: return str(raw_amount) value = int(raw_amount) scale = Decimal(10) ** asset.decimals quant = Decimal(1) / (Decimal(10) ** min(asset.decimals, max_places)) decimal_value = (Decimal(value) / scale).quantize(quant, rounding=ROUND_HALF_UP) return f"{format_decimal(decimal_value)} {asset.symbol}" def format_decimal(value: Decimal) -> str: normalized = format(value, "f") if "." in normalized: normalized = normalized.rstrip("0").rstrip(".") return normalized or "0" def now_string() -> str: return time.strftime("%Y-%m-%dT%H:%M:%S%z") QUOTE_COLUMNS = [ "event_id", "ingested_at", "quote_id", "pair", "request_kind", "asset_in", "amount_in", "asset_out", "amount_out", "min_deadline_ms", ] DECISION_COLUMNS = [ "event_id", "ingested_at", "quote_id", "direction", "decision", "decision_reason", "gross_edge_pct", "notional", "notional_symbol", "eure_notional", "inventory_asset", "inventory_available", "inventory_required", ] ASSET_COLUMNS = [ "asset_id", "symbol", "decimals", ] ASSET_QUERY = """ select asset_id, coalesce(symbol, ''), coalesce(decimals::text, '') from trading_assets where enabled_for_inventory = true or asset_id in ( select payload->>'asset_in' from trade_decisions where payload ? 'asset_in' union select payload->>'asset_out' from trade_decisions where payload ? 'asset_out' union select payload->>'source_asset_id' from intent_request_preflights where payload ? 'source_asset_id' union select payload->>'destination_asset_id' from intent_request_preflights where payload ? 'destination_asset_id' ) order by symbol, asset_id; """ COMMAND_COLUMNS = [ "event_id", "ingested_at", "command_id", "quote_id", "request_kind", "asset_in", "asset_out", "proposed_amount_in", "proposed_amount_out", ] RESULT_COLUMNS = [ "event_id", "ingested_at", "quote_id", "status", "result_code", "venue_response", "error_message", ] QUOTE_QUERY = """ select event_id, coalesce(ingested_at::text, ''), coalesce(payload->>'quote_id', ''), coalesce(payload->>'pair', ''), coalesce(payload->>'request_kind', ''), coalesce(payload->>'asset_in', ''), coalesce(payload->>'amount_in', ''), coalesce(payload->>'asset_out', ''), coalesce(payload->>'amount_out', ''), coalesce(payload->>'min_deadline_ms', '') from swap_demand_events order by ingested_at desc limit {limit}; """ DECISION_QUERY = """ select event_id, coalesce(ingested_at::text, ''), coalesce(payload->>'quote_id', ''), coalesce(payload->>'direction', ''), coalesce(payload->>'decision', ''), coalesce(payload->>'decision_reason', ''), coalesce(payload->>'gross_edge_pct', ''), coalesce(payload->>'notional', ''), coalesce(payload->>'notional_symbol', ''), coalesce(payload->>'eure_notional', ''), coalesce(payload->>'inventory_asset', ''), coalesce(payload->>'inventory_available', ''), coalesce(payload->>'inventory_required', '') from trade_decisions order by ingested_at desc limit {limit}; """ COMMAND_QUERY = """ select event_id, coalesce(ingested_at::text, ''), coalesce(payload->>'command_id', ''), coalesce(payload->>'quote_id', ''), coalesce(payload->>'request_kind', ''), coalesce(payload->>'asset_in', ''), coalesce(payload->>'asset_out', ''), coalesce(payload->>'proposed_amount_in', ''), coalesce(payload->>'proposed_amount_out', '') from execute_trade_commands order by ingested_at desc limit {limit}; """ RESULT_QUERY = """ select event_id, coalesce(ingested_at::text, ''), coalesce(payload->>'quote_id', ''), coalesce(payload->>'status', ''), coalesce(payload->>'result_code', ''), coalesce(payload->>'venue_response', ''), coalesce(payload->'error'->>'message', '') from trade_execution_results order by ingested_at desc limit {limit}; """ STREAM_FORMATTERS = { "quote": render_quote_row, "decision": render_decision_row, "command": render_command_row, "result": render_result_row, } if __name__ == "__main__": raise SystemExit(main())