unrip/scripts/ops/watch_live_mm.py
philipp fdeb1287b4
Some checks failed
deploy / deploy (push) Failing after 40s
Fix pair-native runtime validation gaps
Proof: targeted pair-native strategy, preflight, outcome, dashboard, and ops tests pass; full npm test passes 237/237; operator dashboard production bundle builds; ops watcher Python test passes.

Assumptions: DB asset, pair, strategy config, and price route rows remain canonical; legacy EURe fields stay only for old-row/API compatibility; local shell has no Kubernetes context for direct live namespace recheck.

Still fake: venue-native terminal fill ids and realized fee/PnL attribution remain unavailable; live deployment verification must happen through the repo workflow because manual cluster repair is out of scope.
2026-05-18 19:44:54 +02:00

602 lines
19 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
import time
from dataclasses import dataclass
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from common import DEFAULT_NAMESPACE, kubectl
CONTROL_PORTS = {
"near-intents-ingest": 8081,
"inventory-sync": 8083,
"history-writer": 8085,
"strategy-engine": 8086,
"trade-executor": 8087,
}
@dataclass(frozen=True)
class AssetMeta:
asset_id: str
symbol: str
decimals: int
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Live-watch the active market-maker loop: matching quotes, trade decisions, "
"execution commands, execution results, arming state, and credited inventory."
)
)
parser.add_argument(
"--namespace",
default=DEFAULT_NAMESPACE,
help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})",
)
parser.add_argument(
"--interval",
type=float,
default=5.0,
help="Polling interval in seconds (default: 5.0)",
)
parser.add_argument(
"--backfill",
type=int,
default=0,
help="Print the most recent N events from each stream before live tailing.",
)
parser.add_argument(
"--event-limit",
type=int,
default=12,
help="How many recent rows to fetch per stream on each poll (default: 12).",
)
parser.add_argument(
"--once",
action="store_true",
help="Print one snapshot and exit without live polling.",
)
parser.add_argument(
"--heartbeat-every",
type=float,
default=30.0,
help="Print an idle heartbeat after this many quiet seconds (default: 30.0).",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
assets = load_asset_registry(namespace=args.namespace)
seen = {
"quote": set(),
"decision": set(),
"command": set(),
"result": set(),
}
bootstrap_rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit)
if args.backfill > 0:
print_snapshot(namespace=args.namespace, assets=assets)
emit_backfill(bootstrap_rows, seen, assets=assets, backfill=args.backfill)
else:
mark_seen(bootstrap_rows, seen)
print_snapshot(namespace=args.namespace, assets=assets)
if args.once:
return 0
previous_state = current_state_fingerprint(namespace=args.namespace, assets=assets)
last_heartbeat_at = time.monotonic()
try:
while True:
rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit)
emitted = emit_new_rows(rows, seen, assets=assets)
current_state = current_state_fingerprint(namespace=args.namespace, assets=assets)
if current_state != previous_state:
print_state_line(current_state)
previous_state = current_state
emitted = True
if emitted:
last_heartbeat_at = time.monotonic()
elif time.monotonic() - last_heartbeat_at >= args.heartbeat_every:
print_heartbeat(current_state)
last_heartbeat_at = time.monotonic()
time.sleep(args.interval)
except KeyboardInterrupt:
return 130
def load_asset_registry(*, namespace: str) -> dict[str, AssetMeta]:
rows = query_rows(ASSET_QUERY, namespace=namespace, columns=ASSET_COLUMNS)
return {
row["asset_id"]: AssetMeta(
asset_id=row["asset_id"],
symbol=row["symbol"] or row["asset_id"],
decimals=int(row["decimals"] or "0"),
)
for row in rows
if row["asset_id"]
}
def fetch_event_rows(*, namespace: str, limit: int) -> dict[str, list[dict[str, str]]]:
return {
"quote": query_rows(QUOTE_QUERY.format(limit=limit), namespace=namespace, columns=QUOTE_COLUMNS),
"decision": query_rows(
DECISION_QUERY.format(limit=limit),
namespace=namespace,
columns=DECISION_COLUMNS,
),
"command": query_rows(
COMMAND_QUERY.format(limit=limit),
namespace=namespace,
columns=COMMAND_COLUMNS,
),
"result": query_rows(
RESULT_QUERY.format(limit=limit),
namespace=namespace,
columns=RESULT_COLUMNS,
),
}
def query_rows(query: str, *, namespace: str, columns: list[str]) -> list[dict[str, str]]:
result = kubectl(
"exec",
"-n",
namespace,
"deploy/postgres",
"--",
"psql",
"-U",
"unrip",
"-d",
"unrip",
"-At",
"-F",
"\t",
"-c",
query,
)
rows: list[dict[str, str]] = []
for line in result.stdout.splitlines():
if not line.strip():
continue
values = line.split("\t")
if len(values) < len(columns):
values.extend([""] * (len(columns) - len(values)))
rows.append(dict(zip(columns, values)))
return rows
def fetch_service_state(*, namespace: str, deployment: str) -> dict:
port = CONTROL_PORTS[deployment]
expression = (
f"fetch('http://127.0.0.1:{port}/state')"
".then(async (response) => {"
" if (!response.ok) throw new Error(`state fetch failed: ${response.status}`);"
" process.stdout.write(await response.text());"
"})"
".catch((error) => {"
" console.error(error.message);"
" process.exit(1);"
"});"
)
result = kubectl(
"exec",
"-n",
namespace,
f"deploy/{deployment}",
"--",
"node",
"-e",
expression,
)
return json.loads(result.stdout)
def current_state_fingerprint(*, namespace: str, assets: dict[str, AssetMeta]) -> dict[str, str]:
ingest = fetch_service_state(namespace=namespace, deployment="near-intents-ingest")
inventory = fetch_service_state(namespace=namespace, deployment="inventory-sync")
history = fetch_service_state(namespace=namespace, deployment="history-writer")
strategy = fetch_service_state(namespace=namespace, deployment="strategy-engine")
executor = fetch_service_state(namespace=namespace, deployment="trade-executor")
snapshot = inventory.get("last_snapshot") or {}
spendable = snapshot.get("spendable") or {}
latest_decision = strategy.get("latest_decision") or {}
latest_metrics = history.get("latest_portfolio_metrics") or {}
spendable_assets = format_spendable_assets(spendable=spendable, assets=assets)
return {
"ts": now_string(),
"last_matching_quote_at": str(ingest.get("ingest", {}).get("last_matching_quote_at") or "none"),
"published_quotes": str(ingest.get("ingest", {}).get("published_count") or 0),
"strategy_armed": str(bool(strategy.get("armed"))).lower(),
"executor_armed": str(bool(executor.get("armed"))).lower(),
"signer_registered": str(executor.get("signer_registered")),
"spendable_assets": ", ".join(spendable_assets) if spendable_assets else "none",
"btc_spendable": format_symbol_amount(spendable=spendable, assets=assets, symbol="BTC"),
"eure_spendable": format_symbol_amount(spendable=spendable, assets=assets, symbol="EURe"),
"latest_reason": str(latest_decision.get("decision_reason") or "none"),
"latest_quote_id": str(latest_decision.get("quote_id") or "none"),
"trade_pnl_eure": str(latest_metrics.get("trade_pnl_eure") or "-"),
"mark_to_market_pnl_eure": str(latest_metrics.get("mark_to_market_pnl_eure") or "-"),
}
def emit_backfill(
rows: dict[str, list[dict[str, str]]],
seen: dict[str, set[str]],
*,
assets: dict[str, AssetMeta],
backfill: int,
) -> None:
for stream, formatter in STREAM_FORMATTERS.items():
stream_rows = list(reversed(rows[stream][:backfill]))
for row in stream_rows:
if row["event_id"] in seen[stream]:
continue
print(formatter(row, assets))
seen[stream].add(row["event_id"])
mark_seen(rows, seen)
def emit_new_rows(
rows: dict[str, list[dict[str, str]]],
seen: dict[str, set[str]],
*,
assets: dict[str, AssetMeta],
) -> bool:
emitted = False
for stream, formatter in STREAM_FORMATTERS.items():
fresh = select_new_rows(rows[stream], seen[stream])
for row in fresh:
print(formatter(row, assets))
emitted = True
return emitted
def mark_seen(rows: dict[str, list[dict[str, str]]], seen: dict[str, set[str]]) -> None:
for stream, stream_rows in rows.items():
for row in stream_rows:
seen[stream].add(row["event_id"])
def select_new_rows(rows: list[dict[str, str]], seen: set[str]) -> list[dict[str, str]]:
fresh = [row for row in reversed(rows) if row["event_id"] not in seen]
for row in fresh:
seen.add(row["event_id"])
return fresh
def print_snapshot(*, namespace: str, assets: dict[str, AssetMeta]) -> None:
state = current_state_fingerprint(namespace=namespace, assets=assets)
print(
"WATCH"
f" namespace={namespace}"
f" last_matching_quote_at={state['last_matching_quote_at']}"
f" published_quotes={state['published_quotes']}"
f" strategy_armed={state['strategy_armed']}"
f" executor_armed={state['executor_armed']}"
f" signer_registered={state['signer_registered']}"
f" btc={state['btc_spendable']}"
f" eure={state['eure_spendable']}"
f" balances=[{state['spendable_assets']}]"
f" trade_pnl_eure={state['trade_pnl_eure']}"
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
f" latest_reason={state['latest_reason']}"
f" latest_quote={state['latest_quote_id']}"
)
def print_state_line(state: dict[str, str]) -> None:
print(
f"{state['ts']} STATE"
f" strategy_armed={state['strategy_armed']}"
f" executor_armed={state['executor_armed']}"
f" signer_registered={state['signer_registered']}"
f" btc={state['btc_spendable']}"
f" eure={state['eure_spendable']}"
f" balances=[{state['spendable_assets']}]"
f" trade_pnl_eure={state['trade_pnl_eure']}"
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
f" last_matching_quote_at={state['last_matching_quote_at']}"
f" latest_reason={state['latest_reason']}"
f" latest_quote={state['latest_quote_id']}"
)
def print_heartbeat(state: dict[str, str]) -> None:
print(
f"{state['ts']} HEARTBEAT"
f" strategy_armed={state['strategy_armed']}"
f" executor_armed={state['executor_armed']}"
f" trade_pnl_eure={state['trade_pnl_eure']}"
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
f" latest_reason={state['latest_reason']}"
f" latest_quote={state['latest_quote_id']}"
)
def render_quote_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
asset_in = assets.get(row["asset_in"])
asset_out = assets.get(row["asset_out"])
amount_in = format_amount(row["amount_in"], asset_in) if row["amount_in"] else "-"
amount_out = format_amount(row["amount_out"], asset_out) if row["amount_out"] else "-"
return (
f"{row['ingested_at']} QUOTE"
f" quote={row['quote_id']}"
f" pair={pair_label(row['asset_in'], row['asset_out'], assets)}"
f" kind={row['request_kind']}"
f" amount_in={amount_in}"
f" amount_out={amount_out}"
f" deadline_ms={row['min_deadline_ms'] or '-'}"
)
def render_decision_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
inventory_asset = assets.get(row["inventory_asset"])
need = format_amount(row["inventory_required"], inventory_asset) if row["inventory_required"] else "-"
have = format_amount(row["inventory_available"], inventory_asset) if row["inventory_available"] else "-"
return (
f"{row['ingested_at']} DECISION"
f" quote={row['quote_id']}"
f" direction={row['direction']}"
f" decision={row['decision']}"
f" reason={row['decision_reason']}"
f" notional={row.get('notional') or row.get('eure_notional') or '-'}"
f" notional_symbol={row.get('notional_symbol') or ('EURe' if row.get('eure_notional') else '-')}"
f" gross_edge_pct={row['gross_edge_pct'] or '-'}"
f" need={need}"
f" have={have}"
)
def render_command_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
asset_in = assets.get(row["asset_in"])
asset_out = assets.get(row["asset_out"])
proposed_in = format_amount(row["proposed_amount_in"], asset_in) if row["proposed_amount_in"] else "-"
proposed_out = format_amount(row["proposed_amount_out"], asset_out) if row["proposed_amount_out"] else "-"
return (
f"{row['ingested_at']} COMMAND"
f" quote={row['quote_id']}"
f" command={row['command_id']}"
f" kind={row['request_kind']}"
f" proposed_in={proposed_in}"
f" proposed_out={proposed_out}"
)
def render_result_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
extra = row["venue_response"] or row["error_message"] or "-"
return (
f"{row['ingested_at']} RESULT"
f" quote={row['quote_id']}"
f" status={row['status']}"
f" code={row['result_code']}"
f" detail={extra}"
)
def pair_label(asset_in: str, asset_out: str, assets: dict[str, AssetMeta]) -> str:
return f"{asset_symbol(asset_in, assets)}->{asset_symbol(asset_out, assets)}"
def asset_symbol(asset_id: str, assets: dict[str, AssetMeta]) -> str:
asset = assets.get(asset_id)
return asset.symbol if asset else asset_id
def find_symbol_asset_id(assets: dict[str, AssetMeta], symbol: str) -> str:
for asset_id, asset in assets.items():
if asset.symbol == symbol:
return asset_id
raise KeyError(f"missing asset for symbol {symbol}")
def format_symbol_amount(*, spendable: dict[str, str], assets: dict[str, AssetMeta], symbol: str) -> str:
for asset_id, asset in assets.items():
if asset.symbol == symbol:
return format_amount(spendable.get(asset_id, "0"), asset)
return "untracked"
def format_spendable_assets(*, spendable: dict[str, str], assets: dict[str, AssetMeta]) -> list[str]:
rows: list[str] = []
for asset_id, asset in sorted(assets.items(), key=lambda entry: (entry[1].symbol, entry[0])):
rows.append(format_amount(spendable.get(asset_id, "0"), asset))
for asset_id in sorted(set(spendable) - set(assets)):
rows.append(f"{spendable.get(asset_id, '0')} {asset_id}")
return rows
def format_amount(raw_amount: str | None, asset: AssetMeta | None, *, max_places: int = 8) -> str:
if raw_amount in (None, ""):
return "-"
if asset is None:
return str(raw_amount)
value = int(raw_amount)
scale = Decimal(10) ** asset.decimals
quant = Decimal(1) / (Decimal(10) ** min(asset.decimals, max_places))
decimal_value = (Decimal(value) / scale).quantize(quant, rounding=ROUND_HALF_UP)
return f"{format_decimal(decimal_value)} {asset.symbol}"
def format_decimal(value: Decimal) -> str:
normalized = format(value, "f")
if "." in normalized:
normalized = normalized.rstrip("0").rstrip(".")
return normalized or "0"
def now_string() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%S%z")
QUOTE_COLUMNS = [
"event_id",
"ingested_at",
"quote_id",
"pair",
"request_kind",
"asset_in",
"amount_in",
"asset_out",
"amount_out",
"min_deadline_ms",
]
DECISION_COLUMNS = [
"event_id",
"ingested_at",
"quote_id",
"direction",
"decision",
"decision_reason",
"gross_edge_pct",
"notional",
"notional_symbol",
"eure_notional",
"inventory_asset",
"inventory_available",
"inventory_required",
]
ASSET_COLUMNS = [
"asset_id",
"symbol",
"decimals",
]
ASSET_QUERY = """
select
asset_id,
coalesce(symbol, ''),
coalesce(decimals::text, '')
from trading_assets
where enabled_for_inventory = true
or asset_id in (
select payload->>'asset_in' from trade_decisions where payload ? 'asset_in'
union
select payload->>'asset_out' from trade_decisions where payload ? 'asset_out'
union
select payload->>'source_asset_id' from intent_request_preflights where payload ? 'source_asset_id'
union
select payload->>'destination_asset_id' from intent_request_preflights where payload ? 'destination_asset_id'
)
order by symbol, asset_id;
"""
COMMAND_COLUMNS = [
"event_id",
"ingested_at",
"command_id",
"quote_id",
"request_kind",
"asset_in",
"asset_out",
"proposed_amount_in",
"proposed_amount_out",
]
RESULT_COLUMNS = [
"event_id",
"ingested_at",
"quote_id",
"status",
"result_code",
"venue_response",
"error_message",
]
QUOTE_QUERY = """
select
event_id,
coalesce(ingested_at::text, ''),
coalesce(payload->>'quote_id', ''),
coalesce(payload->>'pair', ''),
coalesce(payload->>'request_kind', ''),
coalesce(payload->>'asset_in', ''),
coalesce(payload->>'amount_in', ''),
coalesce(payload->>'asset_out', ''),
coalesce(payload->>'amount_out', ''),
coalesce(payload->>'min_deadline_ms', '')
from swap_demand_events
order by ingested_at desc
limit {limit};
"""
DECISION_QUERY = """
select
event_id,
coalesce(ingested_at::text, ''),
coalesce(payload->>'quote_id', ''),
coalesce(payload->>'direction', ''),
coalesce(payload->>'decision', ''),
coalesce(payload->>'decision_reason', ''),
coalesce(payload->>'gross_edge_pct', ''),
coalesce(payload->>'notional', ''),
coalesce(payload->>'notional_symbol', ''),
coalesce(payload->>'eure_notional', ''),
coalesce(payload->>'inventory_asset', ''),
coalesce(payload->>'inventory_available', ''),
coalesce(payload->>'inventory_required', '')
from trade_decisions
order by ingested_at desc
limit {limit};
"""
COMMAND_QUERY = """
select
event_id,
coalesce(ingested_at::text, ''),
coalesce(payload->>'command_id', ''),
coalesce(payload->>'quote_id', ''),
coalesce(payload->>'request_kind', ''),
coalesce(payload->>'asset_in', ''),
coalesce(payload->>'asset_out', ''),
coalesce(payload->>'proposed_amount_in', ''),
coalesce(payload->>'proposed_amount_out', '')
from execute_trade_commands
order by ingested_at desc
limit {limit};
"""
RESULT_QUERY = """
select
event_id,
coalesce(ingested_at::text, ''),
coalesce(payload->>'quote_id', ''),
coalesce(payload->>'status', ''),
coalesce(payload->>'result_code', ''),
coalesce(payload->>'venue_response', ''),
coalesce(payload->'error'->>'message', '')
from trade_execution_results
order by ingested_at desc
limit {limit};
"""
STREAM_FORMATTERS = {
"quote": render_quote_row,
"decision": render_decision_row,
"command": render_command_row,
"result": render_result_row,
}
if __name__ == "__main__":
raise SystemExit(main())