All checks were successful
deploy / deploy (push) Successful in 22s
Proof: Persist portfolio value and PnL snapshots from the live inventory and reference-price path so operators can inspect trading performance from repo-controlled data. Assumptions: The last credited inventory snapshot before the first live command is the correct baseline for trade-driven PnL, and EURe remains explicit 1:1 with EUR. Still fake: The new portfolio metrics and watch output are implemented and tested locally but are not live until the updated app image is deployed to k3s.
556 lines
17 KiB
Python
556 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
|
|
from common import DEFAULT_NAMESPACE, config_value, kubectl
|
|
|
|
|
|
CONTROL_PORTS = {
|
|
"near-intents-ingest": 8081,
|
|
"inventory-sync": 8083,
|
|
"history-writer": 8085,
|
|
"strategy-engine": 8086,
|
|
"trade-executor": 8087,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AssetMeta:
|
|
asset_id: str
|
|
symbol: str
|
|
decimals: int
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"Live-watch the active market-maker loop: matching quotes, trade decisions, "
|
|
"execution commands, execution results, arming state, and credited inventory."
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"--namespace",
|
|
default=DEFAULT_NAMESPACE,
|
|
help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})",
|
|
)
|
|
parser.add_argument(
|
|
"--interval",
|
|
type=float,
|
|
default=5.0,
|
|
help="Polling interval in seconds (default: 5.0)",
|
|
)
|
|
parser.add_argument(
|
|
"--backfill",
|
|
type=int,
|
|
default=0,
|
|
help="Print the most recent N events from each stream before live tailing.",
|
|
)
|
|
parser.add_argument(
|
|
"--event-limit",
|
|
type=int,
|
|
default=12,
|
|
help="How many recent rows to fetch per stream on each poll (default: 12).",
|
|
)
|
|
parser.add_argument(
|
|
"--once",
|
|
action="store_true",
|
|
help="Print one snapshot and exit without live polling.",
|
|
)
|
|
parser.add_argument(
|
|
"--heartbeat-every",
|
|
type=float,
|
|
default=30.0,
|
|
help="Print an idle heartbeat after this many quiet seconds (default: 30.0).",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
assets = load_asset_registry(namespace=args.namespace)
|
|
seen = {
|
|
"quote": set(),
|
|
"decision": set(),
|
|
"command": set(),
|
|
"result": set(),
|
|
}
|
|
|
|
bootstrap_rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit)
|
|
if args.backfill > 0:
|
|
print_snapshot(namespace=args.namespace, assets=assets)
|
|
emit_backfill(bootstrap_rows, seen, assets=assets, backfill=args.backfill)
|
|
else:
|
|
mark_seen(bootstrap_rows, seen)
|
|
print_snapshot(namespace=args.namespace, assets=assets)
|
|
|
|
if args.once:
|
|
return 0
|
|
|
|
previous_state = current_state_fingerprint(namespace=args.namespace, assets=assets)
|
|
last_heartbeat_at = time.monotonic()
|
|
try:
|
|
while True:
|
|
rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit)
|
|
emitted = emit_new_rows(rows, seen, assets=assets)
|
|
|
|
current_state = current_state_fingerprint(namespace=args.namespace, assets=assets)
|
|
if current_state != previous_state:
|
|
print_state_line(current_state)
|
|
previous_state = current_state
|
|
emitted = True
|
|
|
|
if emitted:
|
|
last_heartbeat_at = time.monotonic()
|
|
elif time.monotonic() - last_heartbeat_at >= args.heartbeat_every:
|
|
print_heartbeat(current_state)
|
|
last_heartbeat_at = time.monotonic()
|
|
|
|
time.sleep(args.interval)
|
|
except KeyboardInterrupt:
|
|
return 130
|
|
|
|
|
|
def load_asset_registry(*, namespace: str) -> dict[str, AssetMeta]:
|
|
btc = AssetMeta(
|
|
asset_id=config_value("TRADING_BTC_ASSET_ID", namespace=namespace),
|
|
symbol=config_value("TRADING_BTC_SYMBOL", namespace=namespace) or "BTC",
|
|
decimals=int(config_value("TRADING_BTC_DECIMALS", namespace=namespace) or "8"),
|
|
)
|
|
eure = AssetMeta(
|
|
asset_id=config_value("TRADING_EURE_ASSET_ID", namespace=namespace),
|
|
symbol=config_value("TRADING_EURE_SYMBOL", namespace=namespace) or "EURe",
|
|
decimals=int(config_value("TRADING_EURE_DECIMALS", namespace=namespace) or "18"),
|
|
)
|
|
return {
|
|
btc.asset_id: btc,
|
|
eure.asset_id: eure,
|
|
}
|
|
|
|
|
|
def fetch_event_rows(*, namespace: str, limit: int) -> dict[str, list[dict[str, str]]]:
|
|
return {
|
|
"quote": query_rows(QUOTE_QUERY.format(limit=limit), namespace=namespace, columns=QUOTE_COLUMNS),
|
|
"decision": query_rows(
|
|
DECISION_QUERY.format(limit=limit),
|
|
namespace=namespace,
|
|
columns=DECISION_COLUMNS,
|
|
),
|
|
"command": query_rows(
|
|
COMMAND_QUERY.format(limit=limit),
|
|
namespace=namespace,
|
|
columns=COMMAND_COLUMNS,
|
|
),
|
|
"result": query_rows(
|
|
RESULT_QUERY.format(limit=limit),
|
|
namespace=namespace,
|
|
columns=RESULT_COLUMNS,
|
|
),
|
|
}
|
|
|
|
|
|
def query_rows(query: str, *, namespace: str, columns: list[str]) -> list[dict[str, str]]:
|
|
result = kubectl(
|
|
"exec",
|
|
"-n",
|
|
namespace,
|
|
"deploy/postgres",
|
|
"--",
|
|
"psql",
|
|
"-U",
|
|
"unrip",
|
|
"-d",
|
|
"unrip",
|
|
"-At",
|
|
"-F",
|
|
"\t",
|
|
"-c",
|
|
query,
|
|
)
|
|
rows: list[dict[str, str]] = []
|
|
for line in result.stdout.splitlines():
|
|
if not line.strip():
|
|
continue
|
|
values = line.split("\t")
|
|
if len(values) < len(columns):
|
|
values.extend([""] * (len(columns) - len(values)))
|
|
rows.append(dict(zip(columns, values)))
|
|
return rows
|
|
|
|
|
|
def fetch_service_state(*, namespace: str, deployment: str) -> dict:
|
|
port = CONTROL_PORTS[deployment]
|
|
expression = (
|
|
f"fetch('http://127.0.0.1:{port}/state')"
|
|
".then(async (response) => {"
|
|
" if (!response.ok) throw new Error(`state fetch failed: ${response.status}`);"
|
|
" process.stdout.write(await response.text());"
|
|
"})"
|
|
".catch((error) => {"
|
|
" console.error(error.message);"
|
|
" process.exit(1);"
|
|
"});"
|
|
)
|
|
result = kubectl(
|
|
"exec",
|
|
"-n",
|
|
namespace,
|
|
f"deploy/{deployment}",
|
|
"--",
|
|
"node",
|
|
"-e",
|
|
expression,
|
|
)
|
|
return json.loads(result.stdout)
|
|
|
|
|
|
def current_state_fingerprint(*, namespace: str, assets: dict[str, AssetMeta]) -> dict[str, str]:
|
|
ingest = fetch_service_state(namespace=namespace, deployment="near-intents-ingest")
|
|
inventory = fetch_service_state(namespace=namespace, deployment="inventory-sync")
|
|
history = fetch_service_state(namespace=namespace, deployment="history-writer")
|
|
strategy = fetch_service_state(namespace=namespace, deployment="strategy-engine")
|
|
executor = fetch_service_state(namespace=namespace, deployment="trade-executor")
|
|
|
|
snapshot = inventory.get("last_snapshot") or {}
|
|
spendable = snapshot.get("spendable") or {}
|
|
latest_decision = strategy.get("latest_decision") or {}
|
|
latest_metrics = history.get("latest_portfolio_metrics") or {}
|
|
|
|
return {
|
|
"ts": now_string(),
|
|
"last_matching_quote_at": str(ingest.get("ingest", {}).get("last_matching_quote_at") or "none"),
|
|
"published_quotes": str(ingest.get("ingest", {}).get("published_count") or 0),
|
|
"strategy_armed": str(bool(strategy.get("armed"))).lower(),
|
|
"executor_armed": str(bool(executor.get("armed"))).lower(),
|
|
"signer_registered": str(executor.get("signer_registered")),
|
|
"btc_spendable": format_amount(spendable.get(find_symbol_asset_id(assets, "BTC"), "0"), assets.get(find_symbol_asset_id(assets, "BTC"))),
|
|
"eure_spendable": format_amount(spendable.get(find_symbol_asset_id(assets, "EURe"), "0"), assets.get(find_symbol_asset_id(assets, "EURe"))),
|
|
"latest_reason": str(latest_decision.get("decision_reason") or "none"),
|
|
"latest_quote_id": str(latest_decision.get("quote_id") or "none"),
|
|
"trade_pnl_eure": str(latest_metrics.get("trade_pnl_eure") or "-"),
|
|
"mark_to_market_pnl_eure": str(latest_metrics.get("mark_to_market_pnl_eure") or "-"),
|
|
}
|
|
|
|
|
|
def emit_backfill(
|
|
rows: dict[str, list[dict[str, str]]],
|
|
seen: dict[str, set[str]],
|
|
*,
|
|
assets: dict[str, AssetMeta],
|
|
backfill: int,
|
|
) -> None:
|
|
for stream, formatter in STREAM_FORMATTERS.items():
|
|
stream_rows = list(reversed(rows[stream][:backfill]))
|
|
for row in stream_rows:
|
|
if row["event_id"] in seen[stream]:
|
|
continue
|
|
print(formatter(row, assets))
|
|
seen[stream].add(row["event_id"])
|
|
mark_seen(rows, seen)
|
|
|
|
|
|
def emit_new_rows(
|
|
rows: dict[str, list[dict[str, str]]],
|
|
seen: dict[str, set[str]],
|
|
*,
|
|
assets: dict[str, AssetMeta],
|
|
) -> bool:
|
|
emitted = False
|
|
for stream, formatter in STREAM_FORMATTERS.items():
|
|
fresh = select_new_rows(rows[stream], seen[stream])
|
|
for row in fresh:
|
|
print(formatter(row, assets))
|
|
emitted = True
|
|
return emitted
|
|
|
|
|
|
def mark_seen(rows: dict[str, list[dict[str, str]]], seen: dict[str, set[str]]) -> None:
|
|
for stream, stream_rows in rows.items():
|
|
for row in stream_rows:
|
|
seen[stream].add(row["event_id"])
|
|
|
|
|
|
def select_new_rows(rows: list[dict[str, str]], seen: set[str]) -> list[dict[str, str]]:
|
|
fresh = [row for row in reversed(rows) if row["event_id"] not in seen]
|
|
for row in fresh:
|
|
seen.add(row["event_id"])
|
|
return fresh
|
|
|
|
|
|
def print_snapshot(*, namespace: str, assets: dict[str, AssetMeta]) -> None:
|
|
state = current_state_fingerprint(namespace=namespace, assets=assets)
|
|
print(
|
|
"WATCH"
|
|
f" namespace={namespace}"
|
|
f" last_matching_quote_at={state['last_matching_quote_at']}"
|
|
f" published_quotes={state['published_quotes']}"
|
|
f" strategy_armed={state['strategy_armed']}"
|
|
f" executor_armed={state['executor_armed']}"
|
|
f" signer_registered={state['signer_registered']}"
|
|
f" btc={state['btc_spendable']}"
|
|
f" eure={state['eure_spendable']}"
|
|
f" trade_pnl_eure={state['trade_pnl_eure']}"
|
|
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
|
|
f" latest_reason={state['latest_reason']}"
|
|
f" latest_quote={state['latest_quote_id']}"
|
|
)
|
|
|
|
|
|
def print_state_line(state: dict[str, str]) -> None:
|
|
print(
|
|
f"{state['ts']} STATE"
|
|
f" strategy_armed={state['strategy_armed']}"
|
|
f" executor_armed={state['executor_armed']}"
|
|
f" signer_registered={state['signer_registered']}"
|
|
f" btc={state['btc_spendable']}"
|
|
f" eure={state['eure_spendable']}"
|
|
f" trade_pnl_eure={state['trade_pnl_eure']}"
|
|
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
|
|
f" last_matching_quote_at={state['last_matching_quote_at']}"
|
|
f" latest_reason={state['latest_reason']}"
|
|
f" latest_quote={state['latest_quote_id']}"
|
|
)
|
|
|
|
|
|
def print_heartbeat(state: dict[str, str]) -> None:
|
|
print(
|
|
f"{state['ts']} HEARTBEAT"
|
|
f" strategy_armed={state['strategy_armed']}"
|
|
f" executor_armed={state['executor_armed']}"
|
|
f" trade_pnl_eure={state['trade_pnl_eure']}"
|
|
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
|
|
f" latest_reason={state['latest_reason']}"
|
|
f" latest_quote={state['latest_quote_id']}"
|
|
)
|
|
|
|
|
|
def render_quote_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
|
|
asset_in = assets.get(row["asset_in"])
|
|
asset_out = assets.get(row["asset_out"])
|
|
amount_in = format_amount(row["amount_in"], asset_in) if row["amount_in"] else "-"
|
|
amount_out = format_amount(row["amount_out"], asset_out) if row["amount_out"] else "-"
|
|
return (
|
|
f"{row['ingested_at']} QUOTE"
|
|
f" quote={row['quote_id']}"
|
|
f" pair={pair_label(row['asset_in'], row['asset_out'], assets)}"
|
|
f" kind={row['request_kind']}"
|
|
f" amount_in={amount_in}"
|
|
f" amount_out={amount_out}"
|
|
f" deadline_ms={row['min_deadline_ms'] or '-'}"
|
|
)
|
|
|
|
|
|
def render_decision_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
|
|
inventory_asset = assets.get(row["inventory_asset"])
|
|
need = format_amount(row["inventory_required"], inventory_asset) if row["inventory_required"] else "-"
|
|
have = format_amount(row["inventory_available"], inventory_asset) if row["inventory_available"] else "-"
|
|
return (
|
|
f"{row['ingested_at']} DECISION"
|
|
f" quote={row['quote_id']}"
|
|
f" direction={row['direction']}"
|
|
f" decision={row['decision']}"
|
|
f" reason={row['decision_reason']}"
|
|
f" notional_eure={row['eure_notional'] or '-'}"
|
|
f" gross_edge_pct={row['gross_edge_pct'] or '-'}"
|
|
f" need={need}"
|
|
f" have={have}"
|
|
)
|
|
|
|
|
|
def render_command_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
|
|
asset_in = assets.get(row["asset_in"])
|
|
asset_out = assets.get(row["asset_out"])
|
|
proposed_in = format_amount(row["proposed_amount_in"], asset_in) if row["proposed_amount_in"] else "-"
|
|
proposed_out = format_amount(row["proposed_amount_out"], asset_out) if row["proposed_amount_out"] else "-"
|
|
return (
|
|
f"{row['ingested_at']} COMMAND"
|
|
f" quote={row['quote_id']}"
|
|
f" command={row['command_id']}"
|
|
f" kind={row['request_kind']}"
|
|
f" proposed_in={proposed_in}"
|
|
f" proposed_out={proposed_out}"
|
|
)
|
|
|
|
|
|
def render_result_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
|
|
extra = row["venue_response"] or row["error_message"] or "-"
|
|
return (
|
|
f"{row['ingested_at']} RESULT"
|
|
f" quote={row['quote_id']}"
|
|
f" status={row['status']}"
|
|
f" code={row['result_code']}"
|
|
f" detail={extra}"
|
|
)
|
|
|
|
|
|
def pair_label(asset_in: str, asset_out: str, assets: dict[str, AssetMeta]) -> str:
|
|
return f"{asset_symbol(asset_in, assets)}->{asset_symbol(asset_out, assets)}"
|
|
|
|
|
|
def asset_symbol(asset_id: str, assets: dict[str, AssetMeta]) -> str:
|
|
asset = assets.get(asset_id)
|
|
return asset.symbol if asset else asset_id
|
|
|
|
|
|
def find_symbol_asset_id(assets: dict[str, AssetMeta], symbol: str) -> str:
|
|
for asset_id, asset in assets.items():
|
|
if asset.symbol == symbol:
|
|
return asset_id
|
|
raise KeyError(f"missing asset for symbol {symbol}")
|
|
|
|
|
|
def format_amount(raw_amount: str | None, asset: AssetMeta | None, *, max_places: int = 8) -> str:
|
|
if raw_amount in (None, ""):
|
|
return "-"
|
|
if asset is None:
|
|
return str(raw_amount)
|
|
value = int(raw_amount)
|
|
scale = Decimal(10) ** asset.decimals
|
|
quant = Decimal(1) / (Decimal(10) ** min(asset.decimals, max_places))
|
|
decimal_value = (Decimal(value) / scale).quantize(quant, rounding=ROUND_HALF_UP)
|
|
return f"{format_decimal(decimal_value)} {asset.symbol}"
|
|
|
|
|
|
def format_decimal(value: Decimal) -> str:
|
|
normalized = format(value, "f")
|
|
if "." in normalized:
|
|
normalized = normalized.rstrip("0").rstrip(".")
|
|
return normalized or "0"
|
|
|
|
|
|
def now_string() -> str:
|
|
return time.strftime("%Y-%m-%dT%H:%M:%S%z")
|
|
|
|
|
|
QUOTE_COLUMNS = [
|
|
"event_id",
|
|
"ingested_at",
|
|
"quote_id",
|
|
"pair",
|
|
"request_kind",
|
|
"asset_in",
|
|
"amount_in",
|
|
"asset_out",
|
|
"amount_out",
|
|
"min_deadline_ms",
|
|
]
|
|
|
|
DECISION_COLUMNS = [
|
|
"event_id",
|
|
"ingested_at",
|
|
"quote_id",
|
|
"direction",
|
|
"decision",
|
|
"decision_reason",
|
|
"gross_edge_pct",
|
|
"eure_notional",
|
|
"inventory_asset",
|
|
"inventory_available",
|
|
"inventory_required",
|
|
]
|
|
|
|
COMMAND_COLUMNS = [
|
|
"event_id",
|
|
"ingested_at",
|
|
"command_id",
|
|
"quote_id",
|
|
"request_kind",
|
|
"asset_in",
|
|
"asset_out",
|
|
"proposed_amount_in",
|
|
"proposed_amount_out",
|
|
]
|
|
|
|
RESULT_COLUMNS = [
|
|
"event_id",
|
|
"ingested_at",
|
|
"quote_id",
|
|
"status",
|
|
"result_code",
|
|
"venue_response",
|
|
"error_message",
|
|
]
|
|
|
|
QUOTE_QUERY = """
|
|
select
|
|
event_id,
|
|
coalesce(ingested_at::text, ''),
|
|
coalesce(payload->>'quote_id', ''),
|
|
coalesce(payload->>'pair', ''),
|
|
coalesce(payload->>'request_kind', ''),
|
|
coalesce(payload->>'asset_in', ''),
|
|
coalesce(payload->>'amount_in', ''),
|
|
coalesce(payload->>'asset_out', ''),
|
|
coalesce(payload->>'amount_out', ''),
|
|
coalesce(payload->>'min_deadline_ms', '')
|
|
from swap_demand_events
|
|
order by ingested_at desc
|
|
limit {limit};
|
|
"""
|
|
|
|
DECISION_QUERY = """
|
|
select
|
|
event_id,
|
|
coalesce(ingested_at::text, ''),
|
|
coalesce(payload->>'quote_id', ''),
|
|
coalesce(payload->>'direction', ''),
|
|
coalesce(payload->>'decision', ''),
|
|
coalesce(payload->>'decision_reason', ''),
|
|
coalesce(payload->>'gross_edge_pct', ''),
|
|
coalesce(payload->>'eure_notional', ''),
|
|
coalesce(payload->>'inventory_asset', ''),
|
|
coalesce(payload->>'inventory_available', ''),
|
|
coalesce(payload->>'inventory_required', '')
|
|
from trade_decisions
|
|
order by ingested_at desc
|
|
limit {limit};
|
|
"""
|
|
|
|
COMMAND_QUERY = """
|
|
select
|
|
event_id,
|
|
coalesce(ingested_at::text, ''),
|
|
coalesce(payload->>'command_id', ''),
|
|
coalesce(payload->>'quote_id', ''),
|
|
coalesce(payload->>'request_kind', ''),
|
|
coalesce(payload->>'asset_in', ''),
|
|
coalesce(payload->>'asset_out', ''),
|
|
coalesce(payload->>'proposed_amount_in', ''),
|
|
coalesce(payload->>'proposed_amount_out', '')
|
|
from execute_trade_commands
|
|
order by ingested_at desc
|
|
limit {limit};
|
|
"""
|
|
|
|
RESULT_QUERY = """
|
|
select
|
|
event_id,
|
|
coalesce(ingested_at::text, ''),
|
|
coalesce(payload->>'quote_id', ''),
|
|
coalesce(payload->>'status', ''),
|
|
coalesce(payload->>'result_code', ''),
|
|
coalesce(payload->>'venue_response', ''),
|
|
coalesce(payload->'error'->>'message', '')
|
|
from trade_execution_results
|
|
order by ingested_at desc
|
|
limit {limit};
|
|
"""
|
|
|
|
STREAM_FORMATTERS = {
|
|
"quote": render_quote_row,
|
|
"decision": render_decision_row,
|
|
"command": render_command_row,
|
|
"result": render_result_row,
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|