From 16e7b79978e0f32a9921209578c4bb5337b2e2ce Mon Sep 17 00:00:00 2001 From: philipp Date: Fri, 3 Apr 2026 01:02:27 +0200 Subject: [PATCH] Add durable portfolio metrics Proof: Persist portfolio value and PnL snapshots from the live inventory and reference-price path so operators can inspect trading performance from repo-controlled data. Assumptions: The last credited inventory snapshot before the first live command is the correct baseline for trade-driven PnL, and EURe remains explicit 1:1 with EUR. Still fake: The new portfolio metrics and watch output are implemented and tested locally but are not live until the updated app image is deployed to k3s. --- docs/operator-runbook.md | 29 ++ scripts/ops/test_watch_live_mm.py | 138 ++++++++ scripts/ops/watch_live_mm.py | 556 ++++++++++++++++++++++++++++++ src/apps/history-writer.mjs | 117 ++++++- src/core/portfolio-metrics.mjs | 162 +++++++++ src/lib/postgres.mjs | 153 ++++++++ test/portfolio-metrics.test.mjs | 106 ++++++ 7 files changed, 1255 insertions(+), 6 deletions(-) create mode 100644 scripts/ops/test_watch_live_mm.py create mode 100644 scripts/ops/watch_live_mm.py create mode 100644 src/core/portfolio-metrics.mjs create mode 100644 test/portfolio-metrics.test.mjs diff --git a/docs/operator-runbook.md b/docs/operator-runbook.md index c1ec1b2..868bd96 100644 --- a/docs/operator-runbook.md +++ b/docs/operator-runbook.md @@ -71,10 +71,38 @@ Common inspection: ```bash curl -s http://127.0.0.1:8081/healthz curl -s http://127.0.0.1:8081/state +curl -s http://127.0.0.1:8085/portfolio-metrics curl -s http://127.0.0.1:8086/state curl -s http://127.0.0.1:8087/state ``` +Live watch: + +```bash +python3 scripts/ops/watch_live_mm.py --once +python3 scripts/ops/watch_live_mm.py --backfill 3 +``` + +The watch command tails the live market-maker loop through PostgreSQL history plus +service `/state`: + +- matching quotes from `swap_demand_events` +- decisions from `trade_decisions` +- emitted commands from `execute_trade_commands` +- execution results from `trade_execution_results` +- current credited BTC and EURe inventory +- current strategy and executor arming state + +Use `--heartbeat-every 30` or a larger value if you want less idle output. + +Portfolio metrics: + +- `history-writer` now derives durable portfolio metrics from the latest inventory snapshot plus the latest reference price. +- The PnL baseline is the last credited inventory snapshot before the first live `cmd.execute_trade`. +- `trade_pnl_eure` keeps the baseline BTC price fixed to isolate execution PnL from later BTC moves. +- `mark_to_market_pnl_eure` values both the baseline and current books at the latest reference price. +- `price_move_pnl_eure` is the difference between those two, so operators can separate execution edge from subsequent BTC repricing. + Useful controls: ```bash @@ -139,6 +167,7 @@ Notes: - `swap_demand_events` - `market_price_events` - `intent_inventory_snapshots` + - `portfolio_metrics_snapshots` - `liquidity_actions` - `trade_decisions` - `execute_trade_commands` diff --git a/scripts/ops/test_watch_live_mm.py b/scripts/ops/test_watch_live_mm.py new file mode 100644 index 0000000..63c0cb0 --- /dev/null +++ b/scripts/ops/test_watch_live_mm.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +import sys +import unittest +from decimal import Decimal +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +import watch_live_mm + + +ASSETS = { + "nep141:btc.omft.near": watch_live_mm.AssetMeta( + asset_id="nep141:btc.omft.near", + symbol="BTC", + decimals=8, + ), + "nep141:eure": watch_live_mm.AssetMeta( + asset_id="nep141:eure", + symbol="EURe", + decimals=18, + ), +} + + +class WatchLiveMmTests(unittest.TestCase): + def test_format_amount_humanizes_btc(self) -> None: + self.assertEqual( + watch_live_mm.format_amount("100000", ASSETS["nep141:btc.omft.near"]), + "0.001 BTC", + ) + + def test_format_amount_humanizes_eure(self) -> None: + self.assertEqual( + watch_live_mm.format_amount("122753877246000000000", ASSETS["nep141:eure"]), + "122.75387725 EURe", + ) + + def test_format_decimal_trims_trailing_zeroes(self) -> None: + self.assertEqual(watch_live_mm.format_decimal(Decimal("60.00000000")), "60") + + def test_render_quote_row_formats_assets(self) -> None: + row = { + "ingested_at": "2026-04-02 14:00:00+00", + "quote_id": "quote-1", + "asset_in": "nep141:eure", + "asset_out": "nep141:btc.omft.near", + "request_kind": "exact_in", + "amount_in": "122753877246000000000", + "amount_out": "", + "min_deadline_ms": "15000", + } + rendered = watch_live_mm.render_quote_row(row, ASSETS) + self.assertIn("QUOTE", rendered) + self.assertIn("EURe->BTC", rendered) + self.assertIn("122.75387725 EURe", rendered) + + def test_render_decision_row_formats_inventory(self) -> None: + row = { + "ingested_at": "2026-04-02 14:00:01+00", + "quote_id": "quote-2", + "direction": "eure_to_btc", + "decision": "rejected", + "decision_reason": "insufficient_inventory", + "gross_edge_pct": "2.000000", + "eure_notional": "16.246379", + "inventory_asset": "nep141:btc.omft.near", + "inventory_available": "100000", + "inventory_required": "1592145", + } + rendered = watch_live_mm.render_decision_row(row, ASSETS) + self.assertIn("DECISION", rendered) + self.assertIn("insufficient_inventory", rendered) + self.assertIn("0.001 BTC", rendered) + + def test_select_new_rows_returns_oldest_first_and_marks_seen(self) -> None: + seen = {"existing"} + rows = [ + {"event_id": "newer"}, + {"event_id": "middle"}, + {"event_id": "existing"}, + ] + fresh = watch_live_mm.select_new_rows(rows, seen) + self.assertEqual([row["event_id"] for row in fresh], ["middle", "newer"]) + self.assertEqual(seen, {"existing", "middle", "newer"}) + + def test_current_state_fingerprint_includes_portfolio_metrics(self) -> None: + responses = { + "near-intents-ingest": { + "ingest": { + "last_matching_quote_at": "2026-04-02T20:17:44.623Z", + "published_count": 7, + } + }, + "inventory-sync": { + "last_snapshot": { + "spendable": { + "nep141:btc.omft.near": "137014", + "nep141:eure": "38999978799978799978", + } + } + }, + "history-writer": { + "latest_portfolio_metrics": { + "trade_pnl_eure": "0.391183707978799978", + "mark_to_market_pnl_eure": "0.497413887978799978", + } + }, + "strategy-engine": { + "armed": False, + "latest_decision": { + "decision_reason": "actionable", + "quote_id": "quote-1", + }, + }, + "trade-executor": { + "armed": False, + "signer_registered": True, + }, + } + + with mock.patch.object( + watch_live_mm, + "fetch_service_state", + side_effect=lambda *, namespace, deployment: responses[deployment], + ): + state = watch_live_mm.current_state_fingerprint(namespace="unrip", assets=ASSETS) + + self.assertEqual(state["trade_pnl_eure"], "0.391183707978799978") + self.assertEqual(state["mark_to_market_pnl_eure"], "0.497413887978799978") + self.assertEqual(state["btc_spendable"], "0.00137014 BTC") + self.assertEqual(state["eure_spendable"], "38.9999788 EURe") + + +if __name__ == "__main__": + unittest.main() diff --git a/scripts/ops/watch_live_mm.py b/scripts/ops/watch_live_mm.py new file mode 100644 index 0000000..b4bef10 --- /dev/null +++ b/scripts/ops/watch_live_mm.py @@ -0,0 +1,556 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import sys +import time +from dataclasses import dataclass +from decimal import Decimal, ROUND_HALF_UP +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +from common import DEFAULT_NAMESPACE, config_value, kubectl + + +CONTROL_PORTS = { + "near-intents-ingest": 8081, + "inventory-sync": 8083, + "history-writer": 8085, + "strategy-engine": 8086, + "trade-executor": 8087, +} + + +@dataclass(frozen=True) +class AssetMeta: + asset_id: str + symbol: str + decimals: int + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Live-watch the active market-maker loop: matching quotes, trade decisions, " + "execution commands, execution results, arming state, and credited inventory." + ) + ) + parser.add_argument( + "--namespace", + default=DEFAULT_NAMESPACE, + help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})", + ) + parser.add_argument( + "--interval", + type=float, + default=5.0, + help="Polling interval in seconds (default: 5.0)", + ) + parser.add_argument( + "--backfill", + type=int, + default=0, + help="Print the most recent N events from each stream before live tailing.", + ) + parser.add_argument( + "--event-limit", + type=int, + default=12, + help="How many recent rows to fetch per stream on each poll (default: 12).", + ) + parser.add_argument( + "--once", + action="store_true", + help="Print one snapshot and exit without live polling.", + ) + parser.add_argument( + "--heartbeat-every", + type=float, + default=30.0, + help="Print an idle heartbeat after this many quiet seconds (default: 30.0).", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + assets = load_asset_registry(namespace=args.namespace) + seen = { + "quote": set(), + "decision": set(), + "command": set(), + "result": set(), + } + + bootstrap_rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit) + if args.backfill > 0: + print_snapshot(namespace=args.namespace, assets=assets) + emit_backfill(bootstrap_rows, seen, assets=assets, backfill=args.backfill) + else: + mark_seen(bootstrap_rows, seen) + print_snapshot(namespace=args.namespace, assets=assets) + + if args.once: + return 0 + + previous_state = current_state_fingerprint(namespace=args.namespace, assets=assets) + last_heartbeat_at = time.monotonic() + try: + while True: + rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit) + emitted = emit_new_rows(rows, seen, assets=assets) + + current_state = current_state_fingerprint(namespace=args.namespace, assets=assets) + if current_state != previous_state: + print_state_line(current_state) + previous_state = current_state + emitted = True + + if emitted: + last_heartbeat_at = time.monotonic() + elif time.monotonic() - last_heartbeat_at >= args.heartbeat_every: + print_heartbeat(current_state) + last_heartbeat_at = time.monotonic() + + time.sleep(args.interval) + except KeyboardInterrupt: + return 130 + + +def load_asset_registry(*, namespace: str) -> dict[str, AssetMeta]: + btc = AssetMeta( + asset_id=config_value("TRADING_BTC_ASSET_ID", namespace=namespace), + symbol=config_value("TRADING_BTC_SYMBOL", namespace=namespace) or "BTC", + decimals=int(config_value("TRADING_BTC_DECIMALS", namespace=namespace) or "8"), + ) + eure = AssetMeta( + asset_id=config_value("TRADING_EURE_ASSET_ID", namespace=namespace), + symbol=config_value("TRADING_EURE_SYMBOL", namespace=namespace) or "EURe", + decimals=int(config_value("TRADING_EURE_DECIMALS", namespace=namespace) or "18"), + ) + return { + btc.asset_id: btc, + eure.asset_id: eure, + } + + +def fetch_event_rows(*, namespace: str, limit: int) -> dict[str, list[dict[str, str]]]: + return { + "quote": query_rows(QUOTE_QUERY.format(limit=limit), namespace=namespace, columns=QUOTE_COLUMNS), + "decision": query_rows( + DECISION_QUERY.format(limit=limit), + namespace=namespace, + columns=DECISION_COLUMNS, + ), + "command": query_rows( + COMMAND_QUERY.format(limit=limit), + namespace=namespace, + columns=COMMAND_COLUMNS, + ), + "result": query_rows( + RESULT_QUERY.format(limit=limit), + namespace=namespace, + columns=RESULT_COLUMNS, + ), + } + + +def query_rows(query: str, *, namespace: str, columns: list[str]) -> list[dict[str, str]]: + result = kubectl( + "exec", + "-n", + namespace, + "deploy/postgres", + "--", + "psql", + "-U", + "unrip", + "-d", + "unrip", + "-At", + "-F", + "\t", + "-c", + query, + ) + rows: list[dict[str, str]] = [] + for line in result.stdout.splitlines(): + if not line.strip(): + continue + values = line.split("\t") + if len(values) < len(columns): + values.extend([""] * (len(columns) - len(values))) + rows.append(dict(zip(columns, values))) + return rows + + +def fetch_service_state(*, namespace: str, deployment: str) -> dict: + port = CONTROL_PORTS[deployment] + expression = ( + f"fetch('http://127.0.0.1:{port}/state')" + ".then(async (response) => {" + " if (!response.ok) throw new Error(`state fetch failed: ${response.status}`);" + " process.stdout.write(await response.text());" + "})" + ".catch((error) => {" + " console.error(error.message);" + " process.exit(1);" + "});" + ) + result = kubectl( + "exec", + "-n", + namespace, + f"deploy/{deployment}", + "--", + "node", + "-e", + expression, + ) + return json.loads(result.stdout) + + +def current_state_fingerprint(*, namespace: str, assets: dict[str, AssetMeta]) -> dict[str, str]: + ingest = fetch_service_state(namespace=namespace, deployment="near-intents-ingest") + inventory = fetch_service_state(namespace=namespace, deployment="inventory-sync") + history = fetch_service_state(namespace=namespace, deployment="history-writer") + strategy = fetch_service_state(namespace=namespace, deployment="strategy-engine") + executor = fetch_service_state(namespace=namespace, deployment="trade-executor") + + snapshot = inventory.get("last_snapshot") or {} + spendable = snapshot.get("spendable") or {} + latest_decision = strategy.get("latest_decision") or {} + latest_metrics = history.get("latest_portfolio_metrics") or {} + + return { + "ts": now_string(), + "last_matching_quote_at": str(ingest.get("ingest", {}).get("last_matching_quote_at") or "none"), + "published_quotes": str(ingest.get("ingest", {}).get("published_count") or 0), + "strategy_armed": str(bool(strategy.get("armed"))).lower(), + "executor_armed": str(bool(executor.get("armed"))).lower(), + "signer_registered": str(executor.get("signer_registered")), + "btc_spendable": format_amount(spendable.get(find_symbol_asset_id(assets, "BTC"), "0"), assets.get(find_symbol_asset_id(assets, "BTC"))), + "eure_spendable": format_amount(spendable.get(find_symbol_asset_id(assets, "EURe"), "0"), assets.get(find_symbol_asset_id(assets, "EURe"))), + "latest_reason": str(latest_decision.get("decision_reason") or "none"), + "latest_quote_id": str(latest_decision.get("quote_id") or "none"), + "trade_pnl_eure": str(latest_metrics.get("trade_pnl_eure") or "-"), + "mark_to_market_pnl_eure": str(latest_metrics.get("mark_to_market_pnl_eure") or "-"), + } + + +def emit_backfill( + rows: dict[str, list[dict[str, str]]], + seen: dict[str, set[str]], + *, + assets: dict[str, AssetMeta], + backfill: int, +) -> None: + for stream, formatter in STREAM_FORMATTERS.items(): + stream_rows = list(reversed(rows[stream][:backfill])) + for row in stream_rows: + if row["event_id"] in seen[stream]: + continue + print(formatter(row, assets)) + seen[stream].add(row["event_id"]) + mark_seen(rows, seen) + + +def emit_new_rows( + rows: dict[str, list[dict[str, str]]], + seen: dict[str, set[str]], + *, + assets: dict[str, AssetMeta], +) -> bool: + emitted = False + for stream, formatter in STREAM_FORMATTERS.items(): + fresh = select_new_rows(rows[stream], seen[stream]) + for row in fresh: + print(formatter(row, assets)) + emitted = True + return emitted + + +def mark_seen(rows: dict[str, list[dict[str, str]]], seen: dict[str, set[str]]) -> None: + for stream, stream_rows in rows.items(): + for row in stream_rows: + seen[stream].add(row["event_id"]) + + +def select_new_rows(rows: list[dict[str, str]], seen: set[str]) -> list[dict[str, str]]: + fresh = [row for row in reversed(rows) if row["event_id"] not in seen] + for row in fresh: + seen.add(row["event_id"]) + return fresh + + +def print_snapshot(*, namespace: str, assets: dict[str, AssetMeta]) -> None: + state = current_state_fingerprint(namespace=namespace, assets=assets) + print( + "WATCH" + f" namespace={namespace}" + f" last_matching_quote_at={state['last_matching_quote_at']}" + f" published_quotes={state['published_quotes']}" + f" strategy_armed={state['strategy_armed']}" + f" executor_armed={state['executor_armed']}" + f" signer_registered={state['signer_registered']}" + f" btc={state['btc_spendable']}" + f" eure={state['eure_spendable']}" + f" trade_pnl_eure={state['trade_pnl_eure']}" + f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}" + f" latest_reason={state['latest_reason']}" + f" latest_quote={state['latest_quote_id']}" + ) + + +def print_state_line(state: dict[str, str]) -> None: + print( + f"{state['ts']} STATE" + f" strategy_armed={state['strategy_armed']}" + f" executor_armed={state['executor_armed']}" + f" signer_registered={state['signer_registered']}" + f" btc={state['btc_spendable']}" + f" eure={state['eure_spendable']}" + f" trade_pnl_eure={state['trade_pnl_eure']}" + f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}" + f" last_matching_quote_at={state['last_matching_quote_at']}" + f" latest_reason={state['latest_reason']}" + f" latest_quote={state['latest_quote_id']}" + ) + + +def print_heartbeat(state: dict[str, str]) -> None: + print( + f"{state['ts']} HEARTBEAT" + f" strategy_armed={state['strategy_armed']}" + f" executor_armed={state['executor_armed']}" + f" trade_pnl_eure={state['trade_pnl_eure']}" + f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}" + f" latest_reason={state['latest_reason']}" + f" latest_quote={state['latest_quote_id']}" + ) + + +def render_quote_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str: + asset_in = assets.get(row["asset_in"]) + asset_out = assets.get(row["asset_out"]) + amount_in = format_amount(row["amount_in"], asset_in) if row["amount_in"] else "-" + amount_out = format_amount(row["amount_out"], asset_out) if row["amount_out"] else "-" + return ( + f"{row['ingested_at']} QUOTE" + f" quote={row['quote_id']}" + f" pair={pair_label(row['asset_in'], row['asset_out'], assets)}" + f" kind={row['request_kind']}" + f" amount_in={amount_in}" + f" amount_out={amount_out}" + f" deadline_ms={row['min_deadline_ms'] or '-'}" + ) + + +def render_decision_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str: + inventory_asset = assets.get(row["inventory_asset"]) + need = format_amount(row["inventory_required"], inventory_asset) if row["inventory_required"] else "-" + have = format_amount(row["inventory_available"], inventory_asset) if row["inventory_available"] else "-" + return ( + f"{row['ingested_at']} DECISION" + f" quote={row['quote_id']}" + f" direction={row['direction']}" + f" decision={row['decision']}" + f" reason={row['decision_reason']}" + f" notional_eure={row['eure_notional'] or '-'}" + f" gross_edge_pct={row['gross_edge_pct'] or '-'}" + f" need={need}" + f" have={have}" + ) + + +def render_command_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str: + asset_in = assets.get(row["asset_in"]) + asset_out = assets.get(row["asset_out"]) + proposed_in = format_amount(row["proposed_amount_in"], asset_in) if row["proposed_amount_in"] else "-" + proposed_out = format_amount(row["proposed_amount_out"], asset_out) if row["proposed_amount_out"] else "-" + return ( + f"{row['ingested_at']} COMMAND" + f" quote={row['quote_id']}" + f" command={row['command_id']}" + f" kind={row['request_kind']}" + f" proposed_in={proposed_in}" + f" proposed_out={proposed_out}" + ) + + +def render_result_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str: + extra = row["venue_response"] or row["error_message"] or "-" + return ( + f"{row['ingested_at']} RESULT" + f" quote={row['quote_id']}" + f" status={row['status']}" + f" code={row['result_code']}" + f" detail={extra}" + ) + + +def pair_label(asset_in: str, asset_out: str, assets: dict[str, AssetMeta]) -> str: + return f"{asset_symbol(asset_in, assets)}->{asset_symbol(asset_out, assets)}" + + +def asset_symbol(asset_id: str, assets: dict[str, AssetMeta]) -> str: + asset = assets.get(asset_id) + return asset.symbol if asset else asset_id + + +def find_symbol_asset_id(assets: dict[str, AssetMeta], symbol: str) -> str: + for asset_id, asset in assets.items(): + if asset.symbol == symbol: + return asset_id + raise KeyError(f"missing asset for symbol {symbol}") + + +def format_amount(raw_amount: str | None, asset: AssetMeta | None, *, max_places: int = 8) -> str: + if raw_amount in (None, ""): + return "-" + if asset is None: + return str(raw_amount) + value = int(raw_amount) + scale = Decimal(10) ** asset.decimals + quant = Decimal(1) / (Decimal(10) ** min(asset.decimals, max_places)) + decimal_value = (Decimal(value) / scale).quantize(quant, rounding=ROUND_HALF_UP) + return f"{format_decimal(decimal_value)} {asset.symbol}" + + +def format_decimal(value: Decimal) -> str: + normalized = format(value, "f") + if "." in normalized: + normalized = normalized.rstrip("0").rstrip(".") + return normalized or "0" + + +def now_string() -> str: + return time.strftime("%Y-%m-%dT%H:%M:%S%z") + + +QUOTE_COLUMNS = [ + "event_id", + "ingested_at", + "quote_id", + "pair", + "request_kind", + "asset_in", + "amount_in", + "asset_out", + "amount_out", + "min_deadline_ms", +] + +DECISION_COLUMNS = [ + "event_id", + "ingested_at", + "quote_id", + "direction", + "decision", + "decision_reason", + "gross_edge_pct", + "eure_notional", + "inventory_asset", + "inventory_available", + "inventory_required", +] + +COMMAND_COLUMNS = [ + "event_id", + "ingested_at", + "command_id", + "quote_id", + "request_kind", + "asset_in", + "asset_out", + "proposed_amount_in", + "proposed_amount_out", +] + +RESULT_COLUMNS = [ + "event_id", + "ingested_at", + "quote_id", + "status", + "result_code", + "venue_response", + "error_message", +] + +QUOTE_QUERY = """ +select + event_id, + coalesce(ingested_at::text, ''), + coalesce(payload->>'quote_id', ''), + coalesce(payload->>'pair', ''), + coalesce(payload->>'request_kind', ''), + coalesce(payload->>'asset_in', ''), + coalesce(payload->>'amount_in', ''), + coalesce(payload->>'asset_out', ''), + coalesce(payload->>'amount_out', ''), + coalesce(payload->>'min_deadline_ms', '') +from swap_demand_events +order by ingested_at desc +limit {limit}; +""" + +DECISION_QUERY = """ +select + event_id, + coalesce(ingested_at::text, ''), + coalesce(payload->>'quote_id', ''), + coalesce(payload->>'direction', ''), + coalesce(payload->>'decision', ''), + coalesce(payload->>'decision_reason', ''), + coalesce(payload->>'gross_edge_pct', ''), + coalesce(payload->>'eure_notional', ''), + coalesce(payload->>'inventory_asset', ''), + coalesce(payload->>'inventory_available', ''), + coalesce(payload->>'inventory_required', '') +from trade_decisions +order by ingested_at desc +limit {limit}; +""" + +COMMAND_QUERY = """ +select + event_id, + coalesce(ingested_at::text, ''), + coalesce(payload->>'command_id', ''), + coalesce(payload->>'quote_id', ''), + coalesce(payload->>'request_kind', ''), + coalesce(payload->>'asset_in', ''), + coalesce(payload->>'asset_out', ''), + coalesce(payload->>'proposed_amount_in', ''), + coalesce(payload->>'proposed_amount_out', '') +from execute_trade_commands +order by ingested_at desc +limit {limit}; +""" + +RESULT_QUERY = """ +select + event_id, + coalesce(ingested_at::text, ''), + coalesce(payload->>'quote_id', ''), + coalesce(payload->>'status', ''), + coalesce(payload->>'result_code', ''), + coalesce(payload->>'venue_response', ''), + coalesce(payload->'error'->>'message', '') +from trade_execution_results +order by ingested_at desc +limit {limit}; +""" + +STREAM_FORMATTERS = { + "quote": render_quote_row, + "decision": render_decision_row, + "command": render_command_row, + "result": render_result_row, +} + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index e886efc..5c7ad53 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -4,9 +4,17 @@ import { createConsumer } from '../bus/kafka/consumer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { routeHistoryRecord } from '../core/history-records.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; +import { buildPortfolioMetricId, computePortfolioMetric } from '../core/portfolio-metrics.mjs'; import { parseEventMessage } from '../core/event-envelope.mjs'; import { loadConfig } from '../lib/config.mjs'; -import { createPostgresPool, ensureHistorySchema, insertHistoryEvent } from '../lib/postgres.mjs'; +import { + createPostgresPool, + ensureHistorySchema, + insertHistoryEvent, + loadLatestPortfolioMetric, + loadPortfolioMetricInputs, + upsertPortfolioMetric, +} from '../lib/postgres.mjs'; const config = loadConfig(); const logger = createLogger({ @@ -37,6 +45,12 @@ const topics = [ config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]; +const portfolioMetricTopics = new Set([ + config.kafkaTopicRefMarketPrice, + config.kafkaTopicStateIntentInventory, + config.kafkaTopicCmdExecuteTrade, + config.kafkaTopicExecTradeResult, +]); for (const topic of topics) { await consumer.subscribe({ topic, fromBeginning: false }); @@ -46,11 +60,18 @@ const state = { paused: false, draining: false, last_write_at: null, + last_metrics_at: null, last_error: null, error_count: 0, offsets: {}, + latest_portfolio_metrics: null, + metrics_error: null, }; +await refreshPortfolioMetrics().catch((error) => { + state.metrics_error = serializeError(error); +}); + await consumer.run({ eachMessage: async ({ topic, partition, message }) => { if (!message.value || state.paused) return; @@ -71,6 +92,19 @@ await consumer.run({ partition, offset: message.offset, }; + if (portfolioMetricTopics.has(topic)) { + try { + await refreshPortfolioMetrics(); + } catch (error) { + state.metrics_error = serializeError(error); + logger.error('portfolio_metrics_refresh_failed', { + topic, + details: { + error: serializeError(error), + }, + }); + } + } } catch (error) { state.last_error = serializeError(error); state.error_count += 1; @@ -97,13 +131,30 @@ const controlApi = startControlApi({ stateProvider: { async getState() { const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); - return { - ...state, - database_connectivity: connectivity, - }; - }, + return { + ...state, + database_connectivity: connectivity, + }; + }, }, routes: [ + { + method: 'GET', + path: '/portfolio-metrics', + readBody: false, + handler: async () => { + const latest = await loadLatestPortfolioMetric(pool); + if (!latest) { + return { + statusCode: 404, + payload: { + error: 'portfolio_metrics_unavailable', + }, + }; + } + return latest; + }, + }, { method: 'POST', path: '/pause', @@ -136,6 +187,60 @@ const controlApi = startControlApi({ ], }); +async function refreshPortfolioMetrics() { + const inputs = await loadPortfolioMetricInputs(pool); + const payload = computePortfolioMetric({ + baseline: inputs.baseline, + currentInventory: inputs.currentInventory?.payload, + currentPrice: inputs.currentPrice?.payload, + btcAsset: config.tradingBtc, + eureAsset: config.tradingEure, + commandCount: inputs.commandCount, + resultCount: inputs.resultCount, + }); + if (!payload) return null; + + const computedAt = new Date().toISOString(); + const metricId = buildPortfolioMetricId({ + baselineInventoryId: inputs.baseline?.inventory?.inventory_id || null, + currentInventoryId: inputs.currentInventory?.payload?.inventory_id || null, + currentPriceId: inputs.currentPrice?.payload?.price_id || null, + }); + await upsertPortfolioMetric(pool, { + metricId, + computedAt, + baselineAnchorAt: inputs.baseline?.command_at || null, + baselineStatus: payload.baseline_status, + payload, + }); + state.last_metrics_at = computedAt; + state.metrics_error = null; + state.latest_portfolio_metrics = summarizePortfolioMetric({ + metric_id: metricId, + computed_at: computedAt, + baseline_anchor_at: inputs.baseline?.command_at || null, + baseline_status: payload.baseline_status, + payload, + }); + return state.latest_portfolio_metrics; +} + +function summarizePortfolioMetric(metric) { + if (!metric) return null; + return { + metric_id: metric.metric_id, + computed_at: metric.computed_at, + baseline_anchor_at: metric.baseline_anchor_at, + baseline_status: metric.baseline_status, + current_portfolio_value_eure: metric.payload?.current_portfolio_value_eure ?? null, + trade_pnl_eure: metric.payload?.trade_pnl_eure ?? null, + mark_to_market_pnl_eure: metric.payload?.mark_to_market_pnl_eure ?? null, + price_move_pnl_eure: metric.payload?.price_move_pnl_eure ?? null, + command_count: metric.payload?.command_count ?? 0, + result_count: metric.payload?.result_count ?? 0, + }; +} + async function shutdown() { await controlApi.close().catch(() => {}); await consumer.disconnect(); diff --git a/src/core/portfolio-metrics.mjs b/src/core/portfolio-metrics.mjs new file mode 100644 index 0000000..be3ad4c --- /dev/null +++ b/src/core/portfolio-metrics.mjs @@ -0,0 +1,162 @@ +const VALUE_SCALE = 18; +const VALUE_FACTOR = 10n ** BigInt(VALUE_SCALE); + +export function computePortfolioMetric({ + baseline = null, + currentInventory, + currentPrice, + btcAsset, + eureAsset, + commandCount = 0, + resultCount = 0, +} = {}) { + if (!currentInventory || !currentPrice || !btcAsset?.assetId || !eureAsset?.assetId) { + return null; + } + + const currentBtcUnits = String(currentInventory.spendable?.[btcAsset.assetId] || '0'); + const currentEureUnits = String(currentInventory.spendable?.[eureAsset.assetId] || '0'); + const currentBtc = unitsToScaledDecimal(currentBtcUnits, btcAsset.decimals); + const currentEure = unitsToScaledDecimal(currentEureUnits, eureAsset.decimals); + const currentPriceScaled = parseScaledDecimal(currentPrice.eure_per_btc); + const currentBtcMarkValue = multiplyScaled(currentBtc, currentPriceScaled); + const currentPortfolioValue = currentEure + currentBtcMarkValue; + + const payload = { + metric_version: 1, + baseline_status: baseline ? 'active' : 'awaiting_first_execution', + command_count: commandCount, + result_count: resultCount, + current_price: { + price_id: currentPrice.price_id || null, + observed_at: currentPrice.observed_at || null, + eure_per_btc: String(currentPrice.eure_per_btc), + }, + current_inventory: buildInventoryView({ + inventory: currentInventory, + btcAsset, + eureAsset, + }), + current_portfolio_value_eure: formatScaledDecimal(currentPortfolioValue), + current_btc_mark_value_eure: formatScaledDecimal(currentBtcMarkValue), + current_eure_cash_value_eure: formatScaledDecimal(currentEure), + trade_pnl_eure: null, + mark_to_market_pnl_eure: null, + price_move_pnl_eure: null, + baseline_portfolio_value_eure_at_baseline_price: null, + baseline_portfolio_value_eure_at_current_price: null, + current_portfolio_value_eure_at_baseline_price: null, + inventory_delta: null, + baseline: null, + }; + + if (!baseline?.inventory || !baseline?.price) { + return payload; + } + + const baselineBtcUnits = String(baseline.inventory.spendable?.[btcAsset.assetId] || '0'); + const baselineEureUnits = String(baseline.inventory.spendable?.[eureAsset.assetId] || '0'); + const baselineBtc = unitsToScaledDecimal(baselineBtcUnits, btcAsset.decimals); + const baselineEure = unitsToScaledDecimal(baselineEureUnits, eureAsset.decimals); + const baselinePriceScaled = parseScaledDecimal(baseline.price.eure_per_btc); + const baselinePortfolioAtBaselinePrice = baselineEure + multiplyScaled(baselineBtc, baselinePriceScaled); + const baselinePortfolioAtCurrentPrice = baselineEure + multiplyScaled(baselineBtc, currentPriceScaled); + const currentPortfolioAtBaselinePrice = currentEure + multiplyScaled(currentBtc, baselinePriceScaled); + const tradePnl = currentPortfolioAtBaselinePrice - baselinePortfolioAtBaselinePrice; + const markToMarketPnl = currentPortfolioValue - baselinePortfolioAtCurrentPrice; + const priceMovePnl = markToMarketPnl - tradePnl; + + payload.trade_pnl_eure = formatScaledDecimal(tradePnl); + payload.mark_to_market_pnl_eure = formatScaledDecimal(markToMarketPnl); + payload.price_move_pnl_eure = formatScaledDecimal(priceMovePnl); + payload.baseline_portfolio_value_eure_at_baseline_price = formatScaledDecimal( + baselinePortfolioAtBaselinePrice, + ); + payload.baseline_portfolio_value_eure_at_current_price = formatScaledDecimal( + baselinePortfolioAtCurrentPrice, + ); + payload.current_portfolio_value_eure_at_baseline_price = formatScaledDecimal( + currentPortfolioAtBaselinePrice, + ); + payload.inventory_delta = { + btc_units: (BigInt(currentBtcUnits) - BigInt(baselineBtcUnits)).toString(), + btc: formatScaledDecimal(currentBtc - baselineBtc), + eure_units: (BigInt(currentEureUnits) - BigInt(baselineEureUnits)).toString(), + eure: formatScaledDecimal(currentEure - baselineEure), + }; + payload.baseline = { + anchor: baseline.anchor || 'latest_inventory_before_first_command', + command_at: baseline.command_at || null, + price: { + price_id: baseline.price.price_id || null, + observed_at: baseline.price.observed_at || null, + eure_per_btc: String(baseline.price.eure_per_btc), + }, + inventory: buildInventoryView({ + inventory: baseline.inventory, + btcAsset, + eureAsset, + }), + }; + + return payload; +} + +export function buildPortfolioMetricId({ baselineInventoryId, currentInventoryId, currentPriceId }) { + return [ + 'portfolio-metric', + baselineInventoryId || 'no-baseline', + currentInventoryId || 'no-current-inventory', + currentPriceId || 'no-current-price', + ].join(':'); +} + +function buildInventoryView({ inventory, btcAsset, eureAsset }) { + const spendable = inventory?.spendable || {}; + const btcUnits = String(spendable[btcAsset.assetId] || '0'); + const eureUnits = String(spendable[eureAsset.assetId] || '0'); + + return { + inventory_id: inventory?.inventory_id || null, + synced_at: inventory?.synced_at || null, + btc_units: btcUnits, + btc: formatAssetUnits(btcUnits, btcAsset.decimals), + eure_units: eureUnits, + eure: formatAssetUnits(eureUnits, eureAsset.decimals), + }; +} + +function unitsToScaledDecimal(units, decimals) { + return BigInt(units || '0') * 10n ** BigInt(VALUE_SCALE - decimals); +} + +function formatAssetUnits(units, decimals) { + return formatScaledDecimal(BigInt(units || '0') * 10n ** BigInt(VALUE_SCALE - decimals)); +} + +function parseScaledDecimal(value) { + const normalized = String(value ?? '0').trim(); + const negative = normalized.startsWith('-'); + const unsigned = normalized.replace(/^[+-]/, ''); + const [wholePart, fractionalPart = ''] = unsigned.split('.'); + const whole = BigInt(wholePart || '0'); + const fractional = BigInt((fractionalPart.padEnd(VALUE_SCALE, '0')).slice(0, VALUE_SCALE) || '0'); + const scaled = (whole * VALUE_FACTOR) + fractional; + return negative ? -scaled : scaled; +} + +function multiplyScaled(left, right) { + return (left * right) / VALUE_FACTOR; +} + +function formatScaledDecimal(value) { + const negative = value < 0n; + const absolute = negative ? -value : value; + const whole = absolute / VALUE_FACTOR; + const fractional = absolute % VALUE_FACTOR; + if (fractional === 0n) { + return `${negative ? '-' : ''}${whole}`; + } + const fractionalText = fractional.toString().padStart(VALUE_SCALE, '0').replace(/0+$/, ''); + return `${negative ? '-' : ''}${whole}.${fractionalText}`; +} diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index c4e34d7..caec4d1 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -11,6 +11,8 @@ const TABLES = [ 'trade_execution_results', ]; +const PORTFOLIO_METRICS_TABLE = 'portfolio_metrics_snapshots'; + export function createPostgresPool({ connectionString }) { return new Pool({ connectionString, @@ -48,6 +50,20 @@ export async function ensureHistorySchema(pool) { ON ${table} (ingested_at DESC) `); } + + await pool.query(` + CREATE TABLE IF NOT EXISTS ${PORTFOLIO_METRICS_TABLE} ( + metric_id TEXT PRIMARY KEY, + computed_at TIMESTAMPTZ NOT NULL, + baseline_anchor_at TIMESTAMPTZ, + baseline_status TEXT NOT NULL, + payload JSONB NOT NULL + ) + `); + await pool.query(` + CREATE INDEX IF NOT EXISTS ${PORTFOLIO_METRICS_TABLE}_computed_at_idx + ON ${PORTFOLIO_METRICS_TABLE} (computed_at DESC) + `); } export async function insertHistoryEvent(pool, { table, topic, event, record }) { @@ -87,3 +103,140 @@ export async function insertHistoryEvent(pool, { table, topic, event, record }) ], ); } + +export async function loadPortfolioMetricInputs(pool) { + const [currentInventory, currentPrice, commandAggregate, resultAggregate] = await Promise.all([ + loadLatestEventPayload(pool, 'intent_inventory_snapshots'), + loadLatestEventPayload(pool, 'market_price_events'), + pool.query(` + SELECT + MIN(ingested_at) AS first_command_at, + COUNT(*)::INT AS command_count + FROM execute_trade_commands + `), + pool.query(` + SELECT COUNT(*)::INT AS result_count + FROM trade_execution_results + `), + ]); + + const firstCommandAt = commandAggregate.rows[0]?.first_command_at || null; + const commandCount = Number(commandAggregate.rows[0]?.command_count || 0); + const resultCount = Number(resultAggregate.rows[0]?.result_count || 0); + + if (!firstCommandAt) { + return { + currentInventory, + currentPrice, + baseline: null, + commandCount, + resultCount, + }; + } + + const baselineInventory = await loadLatestEventPayload( + pool, + 'intent_inventory_snapshots', + 'WHERE ingested_at <= $1 ORDER BY ingested_at DESC LIMIT 1', + [firstCommandAt], + ); + const baselinePrice = await loadNearestPricePayload(pool, baselineInventory?.ingested_at || firstCommandAt); + + return { + currentInventory, + currentPrice, + baseline: baselineInventory && baselinePrice ? { + anchor: 'latest_inventory_before_first_command', + command_at: new Date(firstCommandAt).toISOString(), + inventory: baselineInventory.payload, + price: baselinePrice.payload, + } : null, + commandCount, + resultCount, + }; +} + +export async function upsertPortfolioMetric(pool, { + metricId, + computedAt, + baselineAnchorAt = null, + baselineStatus, + payload, +}) { + await pool.query( + ` + INSERT INTO ${PORTFOLIO_METRICS_TABLE} ( + metric_id, + computed_at, + baseline_anchor_at, + baseline_status, + payload + ) VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (metric_id) DO UPDATE SET + computed_at = EXCLUDED.computed_at, + baseline_anchor_at = EXCLUDED.baseline_anchor_at, + baseline_status = EXCLUDED.baseline_status, + payload = EXCLUDED.payload + `, + [ + metricId, + computedAt, + baselineAnchorAt, + baselineStatus, + JSON.stringify(payload), + ], + ); +} + +export async function loadLatestPortfolioMetric(pool) { + const result = await pool.query(` + SELECT metric_id, computed_at, baseline_anchor_at, baseline_status, payload + FROM ${PORTFOLIO_METRICS_TABLE} + ORDER BY computed_at DESC + LIMIT 1 + `); + if (!result.rows[0]) return null; + return normalizePortfolioMetricRow(result.rows[0]); +} + +async function loadLatestEventPayload(pool, table, clause = 'ORDER BY ingested_at DESC LIMIT 1', params = []) { + const result = await pool.query( + ` + SELECT ingested_at, payload + FROM ${table} + ${clause} + `, + params, + ); + if (!result.rows[0]) return null; + return { + ingested_at: result.rows[0].ingested_at ? new Date(result.rows[0].ingested_at).toISOString() : null, + payload: result.rows[0].payload, + }; +} + +async function loadNearestPricePayload(pool, anchorAt) { + const before = await loadLatestEventPayload( + pool, + 'market_price_events', + 'WHERE ingested_at <= $1 ORDER BY ingested_at DESC LIMIT 1', + [anchorAt], + ); + if (before) return before; + return loadLatestEventPayload( + pool, + 'market_price_events', + 'WHERE ingested_at >= $1 ORDER BY ingested_at ASC LIMIT 1', + [anchorAt], + ); +} + +function normalizePortfolioMetricRow(row) { + return { + metric_id: row.metric_id, + computed_at: row.computed_at ? new Date(row.computed_at).toISOString() : null, + baseline_anchor_at: row.baseline_anchor_at ? new Date(row.baseline_anchor_at).toISOString() : null, + baseline_status: row.baseline_status, + payload: row.payload, + }; +} diff --git a/test/portfolio-metrics.test.mjs b/test/portfolio-metrics.test.mjs new file mode 100644 index 0000000..140572a --- /dev/null +++ b/test/portfolio-metrics.test.mjs @@ -0,0 +1,106 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { buildPortfolioMetricId, computePortfolioMetric } from '../src/core/portfolio-metrics.mjs'; + +const btcAsset = { + assetId: 'nep141:btc.omft.near', + symbol: 'BTC', + decimals: 8, +}; + +const eureAsset = { + assetId: 'nep141:eure.omft.near', + symbol: 'EURe', + decimals: 18, +}; + +test('portfolio metrics compute trade pnl and mark-to-market pnl from baseline funding inventory', () => { + const metric = computePortfolioMetric({ + baseline: { + anchor: 'latest_inventory_before_first_command', + command_at: '2026-04-02T18:10:43.569Z', + inventory: { + inventory_id: 'baseline-1', + synced_at: '2026-04-02T18:10:33.381Z', + spendable: { + 'nep141:btc.omft.near': '100000', + 'nep141:eure.omft.near': '60000000000000000000', + }, + }, + price: { + price_id: 'price-baseline-1', + observed_at: '2026-04-02T18:10:30.109Z', + eure_per_btc: '57792.20000000', + }, + }, + currentInventory: { + inventory_id: 'current-1', + synced_at: '2026-04-02T20:46:48.492Z', + spendable: { + 'nep141:btc.omft.near': '137014', + 'nep141:eure.omft.near': '38999978799978799978', + }, + }, + currentPrice: { + price_id: 'price-current-1', + observed_at: '2026-04-02T20:46:55.305Z', + eure_per_btc: '58079.20000000', + }, + btcAsset, + eureAsset, + commandCount: 7, + resultCount: 7, + }); + + assert.equal(metric.baseline_status, 'active'); + assert.equal(metric.current_portfolio_value_eure, '118.576613887978799978'); + assert.equal(metric.trade_pnl_eure, '0.391183707978799978'); + assert.equal(metric.mark_to_market_pnl_eure, '0.497413887978799978'); + assert.equal(metric.price_move_pnl_eure, '0.10623018'); + assert.deepEqual(metric.inventory_delta, { + btc_units: '37014', + btc: '0.00037014', + eure_units: '-21000021200021200022', + eure: '-21.000021200021200022', + }); +}); + +test('portfolio metrics stay available before the first live execution', () => { + const metric = computePortfolioMetric({ + baseline: null, + currentInventory: { + inventory_id: 'current-2', + synced_at: '2026-04-02T20:46:48.492Z', + spendable: { + 'nep141:btc.omft.near': '100000', + 'nep141:eure.omft.near': '60000000000000000000', + }, + }, + currentPrice: { + price_id: 'price-current-2', + observed_at: '2026-04-02T20:46:55.305Z', + eure_per_btc: '58079.20000000', + }, + btcAsset, + eureAsset, + commandCount: 0, + resultCount: 0, + }); + + assert.equal(metric.baseline_status, 'awaiting_first_execution'); + assert.equal(metric.current_portfolio_value_eure, '118.0792'); + assert.equal(metric.trade_pnl_eure, null); + assert.equal(metric.mark_to_market_pnl_eure, null); + assert.equal(metric.price_move_pnl_eure, null); +}); + +test('portfolio metric id keys off the baseline and current snapshots', () => { + const metricId = buildPortfolioMetricId({ + baselineInventoryId: 'baseline-1', + currentInventoryId: 'current-1', + currentPriceId: 'price-current-1', + }); + + assert.equal(metricId, 'portfolio-metric:baseline-1:current-1:price-current-1'); +});