Add durable portfolio metrics
All checks were successful
deploy / deploy (push) Successful in 22s

Proof: Persist portfolio value and PnL snapshots from the live inventory and reference-price path so operators can inspect trading performance from repo-controlled data.
Assumptions: The last credited inventory snapshot before the first live command is the correct baseline for trade-driven PnL, and EURe remains explicit 1:1 with EUR.
Still fake: The new portfolio metrics and watch output are implemented and tested locally but are not live until the updated app image is deployed to k3s.
This commit is contained in:
philipp 2026-04-03 01:02:27 +02:00
parent b4186d9715
commit 16e7b79978
7 changed files with 1255 additions and 6 deletions

View file

@ -71,10 +71,38 @@ Common inspection:
```bash ```bash
curl -s http://127.0.0.1:8081/healthz curl -s http://127.0.0.1:8081/healthz
curl -s http://127.0.0.1:8081/state curl -s http://127.0.0.1:8081/state
curl -s http://127.0.0.1:8085/portfolio-metrics
curl -s http://127.0.0.1:8086/state curl -s http://127.0.0.1:8086/state
curl -s http://127.0.0.1:8087/state curl -s http://127.0.0.1:8087/state
``` ```
Live watch:
```bash
python3 scripts/ops/watch_live_mm.py --once
python3 scripts/ops/watch_live_mm.py --backfill 3
```
The watch command tails the live market-maker loop through PostgreSQL history plus
service `/state`:
- matching quotes from `swap_demand_events`
- decisions from `trade_decisions`
- emitted commands from `execute_trade_commands`
- execution results from `trade_execution_results`
- current credited BTC and EURe inventory
- current strategy and executor arming state
Use `--heartbeat-every 30` or a larger value if you want less idle output.
Portfolio metrics:
- `history-writer` now derives durable portfolio metrics from the latest inventory snapshot plus the latest reference price.
- The PnL baseline is the last credited inventory snapshot before the first live `cmd.execute_trade`.
- `trade_pnl_eure` keeps the baseline BTC price fixed to isolate execution PnL from later BTC moves.
- `mark_to_market_pnl_eure` values both the baseline and current books at the latest reference price.
- `price_move_pnl_eure` is the difference between those two, so operators can separate execution edge from subsequent BTC repricing.
Useful controls: Useful controls:
```bash ```bash
@ -139,6 +167,7 @@ Notes:
- `swap_demand_events` - `swap_demand_events`
- `market_price_events` - `market_price_events`
- `intent_inventory_snapshots` - `intent_inventory_snapshots`
- `portfolio_metrics_snapshots`
- `liquidity_actions` - `liquidity_actions`
- `trade_decisions` - `trade_decisions`
- `execute_trade_commands` - `execute_trade_commands`

View file

@ -0,0 +1,138 @@
from __future__ import annotations
import sys
import unittest
from decimal import Decimal
from pathlib import Path
from unittest import mock
sys.path.insert(0, str(Path(__file__).resolve().parent))
import watch_live_mm
ASSETS = {
"nep141:btc.omft.near": watch_live_mm.AssetMeta(
asset_id="nep141:btc.omft.near",
symbol="BTC",
decimals=8,
),
"nep141:eure": watch_live_mm.AssetMeta(
asset_id="nep141:eure",
symbol="EURe",
decimals=18,
),
}
class WatchLiveMmTests(unittest.TestCase):
def test_format_amount_humanizes_btc(self) -> None:
self.assertEqual(
watch_live_mm.format_amount("100000", ASSETS["nep141:btc.omft.near"]),
"0.001 BTC",
)
def test_format_amount_humanizes_eure(self) -> None:
self.assertEqual(
watch_live_mm.format_amount("122753877246000000000", ASSETS["nep141:eure"]),
"122.75387725 EURe",
)
def test_format_decimal_trims_trailing_zeroes(self) -> None:
self.assertEqual(watch_live_mm.format_decimal(Decimal("60.00000000")), "60")
def test_render_quote_row_formats_assets(self) -> None:
row = {
"ingested_at": "2026-04-02 14:00:00+00",
"quote_id": "quote-1",
"asset_in": "nep141:eure",
"asset_out": "nep141:btc.omft.near",
"request_kind": "exact_in",
"amount_in": "122753877246000000000",
"amount_out": "",
"min_deadline_ms": "15000",
}
rendered = watch_live_mm.render_quote_row(row, ASSETS)
self.assertIn("QUOTE", rendered)
self.assertIn("EURe->BTC", rendered)
self.assertIn("122.75387725 EURe", rendered)
def test_render_decision_row_formats_inventory(self) -> None:
row = {
"ingested_at": "2026-04-02 14:00:01+00",
"quote_id": "quote-2",
"direction": "eure_to_btc",
"decision": "rejected",
"decision_reason": "insufficient_inventory",
"gross_edge_pct": "2.000000",
"eure_notional": "16.246379",
"inventory_asset": "nep141:btc.omft.near",
"inventory_available": "100000",
"inventory_required": "1592145",
}
rendered = watch_live_mm.render_decision_row(row, ASSETS)
self.assertIn("DECISION", rendered)
self.assertIn("insufficient_inventory", rendered)
self.assertIn("0.001 BTC", rendered)
def test_select_new_rows_returns_oldest_first_and_marks_seen(self) -> None:
seen = {"existing"}
rows = [
{"event_id": "newer"},
{"event_id": "middle"},
{"event_id": "existing"},
]
fresh = watch_live_mm.select_new_rows(rows, seen)
self.assertEqual([row["event_id"] for row in fresh], ["middle", "newer"])
self.assertEqual(seen, {"existing", "middle", "newer"})
def test_current_state_fingerprint_includes_portfolio_metrics(self) -> None:
responses = {
"near-intents-ingest": {
"ingest": {
"last_matching_quote_at": "2026-04-02T20:17:44.623Z",
"published_count": 7,
}
},
"inventory-sync": {
"last_snapshot": {
"spendable": {
"nep141:btc.omft.near": "137014",
"nep141:eure": "38999978799978799978",
}
}
},
"history-writer": {
"latest_portfolio_metrics": {
"trade_pnl_eure": "0.391183707978799978",
"mark_to_market_pnl_eure": "0.497413887978799978",
}
},
"strategy-engine": {
"armed": False,
"latest_decision": {
"decision_reason": "actionable",
"quote_id": "quote-1",
},
},
"trade-executor": {
"armed": False,
"signer_registered": True,
},
}
with mock.patch.object(
watch_live_mm,
"fetch_service_state",
side_effect=lambda *, namespace, deployment: responses[deployment],
):
state = watch_live_mm.current_state_fingerprint(namespace="unrip", assets=ASSETS)
self.assertEqual(state["trade_pnl_eure"], "0.391183707978799978")
self.assertEqual(state["mark_to_market_pnl_eure"], "0.497413887978799978")
self.assertEqual(state["btc_spendable"], "0.00137014 BTC")
self.assertEqual(state["eure_spendable"], "38.9999788 EURe")
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,556 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
import time
from dataclasses import dataclass
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from common import DEFAULT_NAMESPACE, config_value, kubectl
CONTROL_PORTS = {
"near-intents-ingest": 8081,
"inventory-sync": 8083,
"history-writer": 8085,
"strategy-engine": 8086,
"trade-executor": 8087,
}
@dataclass(frozen=True)
class AssetMeta:
asset_id: str
symbol: str
decimals: int
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Live-watch the active market-maker loop: matching quotes, trade decisions, "
"execution commands, execution results, arming state, and credited inventory."
)
)
parser.add_argument(
"--namespace",
default=DEFAULT_NAMESPACE,
help=f"Kubernetes namespace to inspect (default: {DEFAULT_NAMESPACE})",
)
parser.add_argument(
"--interval",
type=float,
default=5.0,
help="Polling interval in seconds (default: 5.0)",
)
parser.add_argument(
"--backfill",
type=int,
default=0,
help="Print the most recent N events from each stream before live tailing.",
)
parser.add_argument(
"--event-limit",
type=int,
default=12,
help="How many recent rows to fetch per stream on each poll (default: 12).",
)
parser.add_argument(
"--once",
action="store_true",
help="Print one snapshot and exit without live polling.",
)
parser.add_argument(
"--heartbeat-every",
type=float,
default=30.0,
help="Print an idle heartbeat after this many quiet seconds (default: 30.0).",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
assets = load_asset_registry(namespace=args.namespace)
seen = {
"quote": set(),
"decision": set(),
"command": set(),
"result": set(),
}
bootstrap_rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit)
if args.backfill > 0:
print_snapshot(namespace=args.namespace, assets=assets)
emit_backfill(bootstrap_rows, seen, assets=assets, backfill=args.backfill)
else:
mark_seen(bootstrap_rows, seen)
print_snapshot(namespace=args.namespace, assets=assets)
if args.once:
return 0
previous_state = current_state_fingerprint(namespace=args.namespace, assets=assets)
last_heartbeat_at = time.monotonic()
try:
while True:
rows = fetch_event_rows(namespace=args.namespace, limit=args.event_limit)
emitted = emit_new_rows(rows, seen, assets=assets)
current_state = current_state_fingerprint(namespace=args.namespace, assets=assets)
if current_state != previous_state:
print_state_line(current_state)
previous_state = current_state
emitted = True
if emitted:
last_heartbeat_at = time.monotonic()
elif time.monotonic() - last_heartbeat_at >= args.heartbeat_every:
print_heartbeat(current_state)
last_heartbeat_at = time.monotonic()
time.sleep(args.interval)
except KeyboardInterrupt:
return 130
def load_asset_registry(*, namespace: str) -> dict[str, AssetMeta]:
btc = AssetMeta(
asset_id=config_value("TRADING_BTC_ASSET_ID", namespace=namespace),
symbol=config_value("TRADING_BTC_SYMBOL", namespace=namespace) or "BTC",
decimals=int(config_value("TRADING_BTC_DECIMALS", namespace=namespace) or "8"),
)
eure = AssetMeta(
asset_id=config_value("TRADING_EURE_ASSET_ID", namespace=namespace),
symbol=config_value("TRADING_EURE_SYMBOL", namespace=namespace) or "EURe",
decimals=int(config_value("TRADING_EURE_DECIMALS", namespace=namespace) or "18"),
)
return {
btc.asset_id: btc,
eure.asset_id: eure,
}
def fetch_event_rows(*, namespace: str, limit: int) -> dict[str, list[dict[str, str]]]:
return {
"quote": query_rows(QUOTE_QUERY.format(limit=limit), namespace=namespace, columns=QUOTE_COLUMNS),
"decision": query_rows(
DECISION_QUERY.format(limit=limit),
namespace=namespace,
columns=DECISION_COLUMNS,
),
"command": query_rows(
COMMAND_QUERY.format(limit=limit),
namespace=namespace,
columns=COMMAND_COLUMNS,
),
"result": query_rows(
RESULT_QUERY.format(limit=limit),
namespace=namespace,
columns=RESULT_COLUMNS,
),
}
def query_rows(query: str, *, namespace: str, columns: list[str]) -> list[dict[str, str]]:
result = kubectl(
"exec",
"-n",
namespace,
"deploy/postgres",
"--",
"psql",
"-U",
"unrip",
"-d",
"unrip",
"-At",
"-F",
"\t",
"-c",
query,
)
rows: list[dict[str, str]] = []
for line in result.stdout.splitlines():
if not line.strip():
continue
values = line.split("\t")
if len(values) < len(columns):
values.extend([""] * (len(columns) - len(values)))
rows.append(dict(zip(columns, values)))
return rows
def fetch_service_state(*, namespace: str, deployment: str) -> dict:
port = CONTROL_PORTS[deployment]
expression = (
f"fetch('http://127.0.0.1:{port}/state')"
".then(async (response) => {"
" if (!response.ok) throw new Error(`state fetch failed: ${response.status}`);"
" process.stdout.write(await response.text());"
"})"
".catch((error) => {"
" console.error(error.message);"
" process.exit(1);"
"});"
)
result = kubectl(
"exec",
"-n",
namespace,
f"deploy/{deployment}",
"--",
"node",
"-e",
expression,
)
return json.loads(result.stdout)
def current_state_fingerprint(*, namespace: str, assets: dict[str, AssetMeta]) -> dict[str, str]:
ingest = fetch_service_state(namespace=namespace, deployment="near-intents-ingest")
inventory = fetch_service_state(namespace=namespace, deployment="inventory-sync")
history = fetch_service_state(namespace=namespace, deployment="history-writer")
strategy = fetch_service_state(namespace=namespace, deployment="strategy-engine")
executor = fetch_service_state(namespace=namespace, deployment="trade-executor")
snapshot = inventory.get("last_snapshot") or {}
spendable = snapshot.get("spendable") or {}
latest_decision = strategy.get("latest_decision") or {}
latest_metrics = history.get("latest_portfolio_metrics") or {}
return {
"ts": now_string(),
"last_matching_quote_at": str(ingest.get("ingest", {}).get("last_matching_quote_at") or "none"),
"published_quotes": str(ingest.get("ingest", {}).get("published_count") or 0),
"strategy_armed": str(bool(strategy.get("armed"))).lower(),
"executor_armed": str(bool(executor.get("armed"))).lower(),
"signer_registered": str(executor.get("signer_registered")),
"btc_spendable": format_amount(spendable.get(find_symbol_asset_id(assets, "BTC"), "0"), assets.get(find_symbol_asset_id(assets, "BTC"))),
"eure_spendable": format_amount(spendable.get(find_symbol_asset_id(assets, "EURe"), "0"), assets.get(find_symbol_asset_id(assets, "EURe"))),
"latest_reason": str(latest_decision.get("decision_reason") or "none"),
"latest_quote_id": str(latest_decision.get("quote_id") or "none"),
"trade_pnl_eure": str(latest_metrics.get("trade_pnl_eure") or "-"),
"mark_to_market_pnl_eure": str(latest_metrics.get("mark_to_market_pnl_eure") or "-"),
}
def emit_backfill(
rows: dict[str, list[dict[str, str]]],
seen: dict[str, set[str]],
*,
assets: dict[str, AssetMeta],
backfill: int,
) -> None:
for stream, formatter in STREAM_FORMATTERS.items():
stream_rows = list(reversed(rows[stream][:backfill]))
for row in stream_rows:
if row["event_id"] in seen[stream]:
continue
print(formatter(row, assets))
seen[stream].add(row["event_id"])
mark_seen(rows, seen)
def emit_new_rows(
rows: dict[str, list[dict[str, str]]],
seen: dict[str, set[str]],
*,
assets: dict[str, AssetMeta],
) -> bool:
emitted = False
for stream, formatter in STREAM_FORMATTERS.items():
fresh = select_new_rows(rows[stream], seen[stream])
for row in fresh:
print(formatter(row, assets))
emitted = True
return emitted
def mark_seen(rows: dict[str, list[dict[str, str]]], seen: dict[str, set[str]]) -> None:
for stream, stream_rows in rows.items():
for row in stream_rows:
seen[stream].add(row["event_id"])
def select_new_rows(rows: list[dict[str, str]], seen: set[str]) -> list[dict[str, str]]:
fresh = [row for row in reversed(rows) if row["event_id"] not in seen]
for row in fresh:
seen.add(row["event_id"])
return fresh
def print_snapshot(*, namespace: str, assets: dict[str, AssetMeta]) -> None:
state = current_state_fingerprint(namespace=namespace, assets=assets)
print(
"WATCH"
f" namespace={namespace}"
f" last_matching_quote_at={state['last_matching_quote_at']}"
f" published_quotes={state['published_quotes']}"
f" strategy_armed={state['strategy_armed']}"
f" executor_armed={state['executor_armed']}"
f" signer_registered={state['signer_registered']}"
f" btc={state['btc_spendable']}"
f" eure={state['eure_spendable']}"
f" trade_pnl_eure={state['trade_pnl_eure']}"
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
f" latest_reason={state['latest_reason']}"
f" latest_quote={state['latest_quote_id']}"
)
def print_state_line(state: dict[str, str]) -> None:
print(
f"{state['ts']} STATE"
f" strategy_armed={state['strategy_armed']}"
f" executor_armed={state['executor_armed']}"
f" signer_registered={state['signer_registered']}"
f" btc={state['btc_spendable']}"
f" eure={state['eure_spendable']}"
f" trade_pnl_eure={state['trade_pnl_eure']}"
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
f" last_matching_quote_at={state['last_matching_quote_at']}"
f" latest_reason={state['latest_reason']}"
f" latest_quote={state['latest_quote_id']}"
)
def print_heartbeat(state: dict[str, str]) -> None:
print(
f"{state['ts']} HEARTBEAT"
f" strategy_armed={state['strategy_armed']}"
f" executor_armed={state['executor_armed']}"
f" trade_pnl_eure={state['trade_pnl_eure']}"
f" mtm_pnl_eure={state['mark_to_market_pnl_eure']}"
f" latest_reason={state['latest_reason']}"
f" latest_quote={state['latest_quote_id']}"
)
def render_quote_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
asset_in = assets.get(row["asset_in"])
asset_out = assets.get(row["asset_out"])
amount_in = format_amount(row["amount_in"], asset_in) if row["amount_in"] else "-"
amount_out = format_amount(row["amount_out"], asset_out) if row["amount_out"] else "-"
return (
f"{row['ingested_at']} QUOTE"
f" quote={row['quote_id']}"
f" pair={pair_label(row['asset_in'], row['asset_out'], assets)}"
f" kind={row['request_kind']}"
f" amount_in={amount_in}"
f" amount_out={amount_out}"
f" deadline_ms={row['min_deadline_ms'] or '-'}"
)
def render_decision_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
inventory_asset = assets.get(row["inventory_asset"])
need = format_amount(row["inventory_required"], inventory_asset) if row["inventory_required"] else "-"
have = format_amount(row["inventory_available"], inventory_asset) if row["inventory_available"] else "-"
return (
f"{row['ingested_at']} DECISION"
f" quote={row['quote_id']}"
f" direction={row['direction']}"
f" decision={row['decision']}"
f" reason={row['decision_reason']}"
f" notional_eure={row['eure_notional'] or '-'}"
f" gross_edge_pct={row['gross_edge_pct'] or '-'}"
f" need={need}"
f" have={have}"
)
def render_command_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
asset_in = assets.get(row["asset_in"])
asset_out = assets.get(row["asset_out"])
proposed_in = format_amount(row["proposed_amount_in"], asset_in) if row["proposed_amount_in"] else "-"
proposed_out = format_amount(row["proposed_amount_out"], asset_out) if row["proposed_amount_out"] else "-"
return (
f"{row['ingested_at']} COMMAND"
f" quote={row['quote_id']}"
f" command={row['command_id']}"
f" kind={row['request_kind']}"
f" proposed_in={proposed_in}"
f" proposed_out={proposed_out}"
)
def render_result_row(row: dict[str, str], assets: dict[str, AssetMeta]) -> str:
extra = row["venue_response"] or row["error_message"] or "-"
return (
f"{row['ingested_at']} RESULT"
f" quote={row['quote_id']}"
f" status={row['status']}"
f" code={row['result_code']}"
f" detail={extra}"
)
def pair_label(asset_in: str, asset_out: str, assets: dict[str, AssetMeta]) -> str:
return f"{asset_symbol(asset_in, assets)}->{asset_symbol(asset_out, assets)}"
def asset_symbol(asset_id: str, assets: dict[str, AssetMeta]) -> str:
asset = assets.get(asset_id)
return asset.symbol if asset else asset_id
def find_symbol_asset_id(assets: dict[str, AssetMeta], symbol: str) -> str:
for asset_id, asset in assets.items():
if asset.symbol == symbol:
return asset_id
raise KeyError(f"missing asset for symbol {symbol}")
def format_amount(raw_amount: str | None, asset: AssetMeta | None, *, max_places: int = 8) -> str:
if raw_amount in (None, ""):
return "-"
if asset is None:
return str(raw_amount)
value = int(raw_amount)
scale = Decimal(10) ** asset.decimals
quant = Decimal(1) / (Decimal(10) ** min(asset.decimals, max_places))
decimal_value = (Decimal(value) / scale).quantize(quant, rounding=ROUND_HALF_UP)
return f"{format_decimal(decimal_value)} {asset.symbol}"
def format_decimal(value: Decimal) -> str:
normalized = format(value, "f")
if "." in normalized:
normalized = normalized.rstrip("0").rstrip(".")
return normalized or "0"
def now_string() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%S%z")
QUOTE_COLUMNS = [
"event_id",
"ingested_at",
"quote_id",
"pair",
"request_kind",
"asset_in",
"amount_in",
"asset_out",
"amount_out",
"min_deadline_ms",
]
DECISION_COLUMNS = [
"event_id",
"ingested_at",
"quote_id",
"direction",
"decision",
"decision_reason",
"gross_edge_pct",
"eure_notional",
"inventory_asset",
"inventory_available",
"inventory_required",
]
COMMAND_COLUMNS = [
"event_id",
"ingested_at",
"command_id",
"quote_id",
"request_kind",
"asset_in",
"asset_out",
"proposed_amount_in",
"proposed_amount_out",
]
RESULT_COLUMNS = [
"event_id",
"ingested_at",
"quote_id",
"status",
"result_code",
"venue_response",
"error_message",
]
QUOTE_QUERY = """
select
event_id,
coalesce(ingested_at::text, ''),
coalesce(payload->>'quote_id', ''),
coalesce(payload->>'pair', ''),
coalesce(payload->>'request_kind', ''),
coalesce(payload->>'asset_in', ''),
coalesce(payload->>'amount_in', ''),
coalesce(payload->>'asset_out', ''),
coalesce(payload->>'amount_out', ''),
coalesce(payload->>'min_deadline_ms', '')
from swap_demand_events
order by ingested_at desc
limit {limit};
"""
DECISION_QUERY = """
select
event_id,
coalesce(ingested_at::text, ''),
coalesce(payload->>'quote_id', ''),
coalesce(payload->>'direction', ''),
coalesce(payload->>'decision', ''),
coalesce(payload->>'decision_reason', ''),
coalesce(payload->>'gross_edge_pct', ''),
coalesce(payload->>'eure_notional', ''),
coalesce(payload->>'inventory_asset', ''),
coalesce(payload->>'inventory_available', ''),
coalesce(payload->>'inventory_required', '')
from trade_decisions
order by ingested_at desc
limit {limit};
"""
COMMAND_QUERY = """
select
event_id,
coalesce(ingested_at::text, ''),
coalesce(payload->>'command_id', ''),
coalesce(payload->>'quote_id', ''),
coalesce(payload->>'request_kind', ''),
coalesce(payload->>'asset_in', ''),
coalesce(payload->>'asset_out', ''),
coalesce(payload->>'proposed_amount_in', ''),
coalesce(payload->>'proposed_amount_out', '')
from execute_trade_commands
order by ingested_at desc
limit {limit};
"""
RESULT_QUERY = """
select
event_id,
coalesce(ingested_at::text, ''),
coalesce(payload->>'quote_id', ''),
coalesce(payload->>'status', ''),
coalesce(payload->>'result_code', ''),
coalesce(payload->>'venue_response', ''),
coalesce(payload->'error'->>'message', '')
from trade_execution_results
order by ingested_at desc
limit {limit};
"""
STREAM_FORMATTERS = {
"quote": render_quote_row,
"decision": render_decision_row,
"command": render_command_row,
"result": render_result_row,
}
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -4,9 +4,17 @@ import { createConsumer } from '../bus/kafka/consumer.mjs';
import { startControlApi } from '../core/control-api.mjs'; import { startControlApi } from '../core/control-api.mjs';
import { routeHistoryRecord } from '../core/history-records.mjs'; import { routeHistoryRecord } from '../core/history-records.mjs';
import { createLogger, serializeError } from '../core/log.mjs'; import { createLogger, serializeError } from '../core/log.mjs';
import { buildPortfolioMetricId, computePortfolioMetric } from '../core/portfolio-metrics.mjs';
import { parseEventMessage } from '../core/event-envelope.mjs'; import { parseEventMessage } from '../core/event-envelope.mjs';
import { loadConfig } from '../lib/config.mjs'; import { loadConfig } from '../lib/config.mjs';
import { createPostgresPool, ensureHistorySchema, insertHistoryEvent } from '../lib/postgres.mjs'; import {
createPostgresPool,
ensureHistorySchema,
insertHistoryEvent,
loadLatestPortfolioMetric,
loadPortfolioMetricInputs,
upsertPortfolioMetric,
} from '../lib/postgres.mjs';
const config = loadConfig(); const config = loadConfig();
const logger = createLogger({ const logger = createLogger({
@ -37,6 +45,12 @@ const topics = [
config.kafkaTopicCmdExecuteTrade, config.kafkaTopicCmdExecuteTrade,
config.kafkaTopicExecTradeResult, config.kafkaTopicExecTradeResult,
]; ];
const portfolioMetricTopics = new Set([
config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory,
config.kafkaTopicCmdExecuteTrade,
config.kafkaTopicExecTradeResult,
]);
for (const topic of topics) { for (const topic of topics) {
await consumer.subscribe({ topic, fromBeginning: false }); await consumer.subscribe({ topic, fromBeginning: false });
@ -46,11 +60,18 @@ const state = {
paused: false, paused: false,
draining: false, draining: false,
last_write_at: null, last_write_at: null,
last_metrics_at: null,
last_error: null, last_error: null,
error_count: 0, error_count: 0,
offsets: {}, offsets: {},
latest_portfolio_metrics: null,
metrics_error: null,
}; };
await refreshPortfolioMetrics().catch((error) => {
state.metrics_error = serializeError(error);
});
await consumer.run({ await consumer.run({
eachMessage: async ({ topic, partition, message }) => { eachMessage: async ({ topic, partition, message }) => {
if (!message.value || state.paused) return; if (!message.value || state.paused) return;
@ -71,6 +92,19 @@ await consumer.run({
partition, partition,
offset: message.offset, offset: message.offset,
}; };
if (portfolioMetricTopics.has(topic)) {
try {
await refreshPortfolioMetrics();
} catch (error) {
state.metrics_error = serializeError(error);
logger.error('portfolio_metrics_refresh_failed', {
topic,
details: {
error: serializeError(error),
},
});
}
}
} catch (error) { } catch (error) {
state.last_error = serializeError(error); state.last_error = serializeError(error);
state.error_count += 1; state.error_count += 1;
@ -97,13 +131,30 @@ const controlApi = startControlApi({
stateProvider: { stateProvider: {
async getState() { async getState() {
const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false);
return { return {
...state, ...state,
database_connectivity: connectivity, database_connectivity: connectivity,
}; };
}, },
}, },
routes: [ routes: [
{
method: 'GET',
path: '/portfolio-metrics',
readBody: false,
handler: async () => {
const latest = await loadLatestPortfolioMetric(pool);
if (!latest) {
return {
statusCode: 404,
payload: {
error: 'portfolio_metrics_unavailable',
},
};
}
return latest;
},
},
{ {
method: 'POST', method: 'POST',
path: '/pause', path: '/pause',
@ -136,6 +187,60 @@ const controlApi = startControlApi({
], ],
}); });
async function refreshPortfolioMetrics() {
const inputs = await loadPortfolioMetricInputs(pool);
const payload = computePortfolioMetric({
baseline: inputs.baseline,
currentInventory: inputs.currentInventory?.payload,
currentPrice: inputs.currentPrice?.payload,
btcAsset: config.tradingBtc,
eureAsset: config.tradingEure,
commandCount: inputs.commandCount,
resultCount: inputs.resultCount,
});
if (!payload) return null;
const computedAt = new Date().toISOString();
const metricId = buildPortfolioMetricId({
baselineInventoryId: inputs.baseline?.inventory?.inventory_id || null,
currentInventoryId: inputs.currentInventory?.payload?.inventory_id || null,
currentPriceId: inputs.currentPrice?.payload?.price_id || null,
});
await upsertPortfolioMetric(pool, {
metricId,
computedAt,
baselineAnchorAt: inputs.baseline?.command_at || null,
baselineStatus: payload.baseline_status,
payload,
});
state.last_metrics_at = computedAt;
state.metrics_error = null;
state.latest_portfolio_metrics = summarizePortfolioMetric({
metric_id: metricId,
computed_at: computedAt,
baseline_anchor_at: inputs.baseline?.command_at || null,
baseline_status: payload.baseline_status,
payload,
});
return state.latest_portfolio_metrics;
}
function summarizePortfolioMetric(metric) {
if (!metric) return null;
return {
metric_id: metric.metric_id,
computed_at: metric.computed_at,
baseline_anchor_at: metric.baseline_anchor_at,
baseline_status: metric.baseline_status,
current_portfolio_value_eure: metric.payload?.current_portfolio_value_eure ?? null,
trade_pnl_eure: metric.payload?.trade_pnl_eure ?? null,
mark_to_market_pnl_eure: metric.payload?.mark_to_market_pnl_eure ?? null,
price_move_pnl_eure: metric.payload?.price_move_pnl_eure ?? null,
command_count: metric.payload?.command_count ?? 0,
result_count: metric.payload?.result_count ?? 0,
};
}
async function shutdown() { async function shutdown() {
await controlApi.close().catch(() => {}); await controlApi.close().catch(() => {});
await consumer.disconnect(); await consumer.disconnect();

View file

@ -0,0 +1,162 @@
const VALUE_SCALE = 18;
const VALUE_FACTOR = 10n ** BigInt(VALUE_SCALE);
export function computePortfolioMetric({
baseline = null,
currentInventory,
currentPrice,
btcAsset,
eureAsset,
commandCount = 0,
resultCount = 0,
} = {}) {
if (!currentInventory || !currentPrice || !btcAsset?.assetId || !eureAsset?.assetId) {
return null;
}
const currentBtcUnits = String(currentInventory.spendable?.[btcAsset.assetId] || '0');
const currentEureUnits = String(currentInventory.spendable?.[eureAsset.assetId] || '0');
const currentBtc = unitsToScaledDecimal(currentBtcUnits, btcAsset.decimals);
const currentEure = unitsToScaledDecimal(currentEureUnits, eureAsset.decimals);
const currentPriceScaled = parseScaledDecimal(currentPrice.eure_per_btc);
const currentBtcMarkValue = multiplyScaled(currentBtc, currentPriceScaled);
const currentPortfolioValue = currentEure + currentBtcMarkValue;
const payload = {
metric_version: 1,
baseline_status: baseline ? 'active' : 'awaiting_first_execution',
command_count: commandCount,
result_count: resultCount,
current_price: {
price_id: currentPrice.price_id || null,
observed_at: currentPrice.observed_at || null,
eure_per_btc: String(currentPrice.eure_per_btc),
},
current_inventory: buildInventoryView({
inventory: currentInventory,
btcAsset,
eureAsset,
}),
current_portfolio_value_eure: formatScaledDecimal(currentPortfolioValue),
current_btc_mark_value_eure: formatScaledDecimal(currentBtcMarkValue),
current_eure_cash_value_eure: formatScaledDecimal(currentEure),
trade_pnl_eure: null,
mark_to_market_pnl_eure: null,
price_move_pnl_eure: null,
baseline_portfolio_value_eure_at_baseline_price: null,
baseline_portfolio_value_eure_at_current_price: null,
current_portfolio_value_eure_at_baseline_price: null,
inventory_delta: null,
baseline: null,
};
if (!baseline?.inventory || !baseline?.price) {
return payload;
}
const baselineBtcUnits = String(baseline.inventory.spendable?.[btcAsset.assetId] || '0');
const baselineEureUnits = String(baseline.inventory.spendable?.[eureAsset.assetId] || '0');
const baselineBtc = unitsToScaledDecimal(baselineBtcUnits, btcAsset.decimals);
const baselineEure = unitsToScaledDecimal(baselineEureUnits, eureAsset.decimals);
const baselinePriceScaled = parseScaledDecimal(baseline.price.eure_per_btc);
const baselinePortfolioAtBaselinePrice = baselineEure + multiplyScaled(baselineBtc, baselinePriceScaled);
const baselinePortfolioAtCurrentPrice = baselineEure + multiplyScaled(baselineBtc, currentPriceScaled);
const currentPortfolioAtBaselinePrice = currentEure + multiplyScaled(currentBtc, baselinePriceScaled);
const tradePnl = currentPortfolioAtBaselinePrice - baselinePortfolioAtBaselinePrice;
const markToMarketPnl = currentPortfolioValue - baselinePortfolioAtCurrentPrice;
const priceMovePnl = markToMarketPnl - tradePnl;
payload.trade_pnl_eure = formatScaledDecimal(tradePnl);
payload.mark_to_market_pnl_eure = formatScaledDecimal(markToMarketPnl);
payload.price_move_pnl_eure = formatScaledDecimal(priceMovePnl);
payload.baseline_portfolio_value_eure_at_baseline_price = formatScaledDecimal(
baselinePortfolioAtBaselinePrice,
);
payload.baseline_portfolio_value_eure_at_current_price = formatScaledDecimal(
baselinePortfolioAtCurrentPrice,
);
payload.current_portfolio_value_eure_at_baseline_price = formatScaledDecimal(
currentPortfolioAtBaselinePrice,
);
payload.inventory_delta = {
btc_units: (BigInt(currentBtcUnits) - BigInt(baselineBtcUnits)).toString(),
btc: formatScaledDecimal(currentBtc - baselineBtc),
eure_units: (BigInt(currentEureUnits) - BigInt(baselineEureUnits)).toString(),
eure: formatScaledDecimal(currentEure - baselineEure),
};
payload.baseline = {
anchor: baseline.anchor || 'latest_inventory_before_first_command',
command_at: baseline.command_at || null,
price: {
price_id: baseline.price.price_id || null,
observed_at: baseline.price.observed_at || null,
eure_per_btc: String(baseline.price.eure_per_btc),
},
inventory: buildInventoryView({
inventory: baseline.inventory,
btcAsset,
eureAsset,
}),
};
return payload;
}
export function buildPortfolioMetricId({ baselineInventoryId, currentInventoryId, currentPriceId }) {
return [
'portfolio-metric',
baselineInventoryId || 'no-baseline',
currentInventoryId || 'no-current-inventory',
currentPriceId || 'no-current-price',
].join(':');
}
function buildInventoryView({ inventory, btcAsset, eureAsset }) {
const spendable = inventory?.spendable || {};
const btcUnits = String(spendable[btcAsset.assetId] || '0');
const eureUnits = String(spendable[eureAsset.assetId] || '0');
return {
inventory_id: inventory?.inventory_id || null,
synced_at: inventory?.synced_at || null,
btc_units: btcUnits,
btc: formatAssetUnits(btcUnits, btcAsset.decimals),
eure_units: eureUnits,
eure: formatAssetUnits(eureUnits, eureAsset.decimals),
};
}
function unitsToScaledDecimal(units, decimals) {
return BigInt(units || '0') * 10n ** BigInt(VALUE_SCALE - decimals);
}
function formatAssetUnits(units, decimals) {
return formatScaledDecimal(BigInt(units || '0') * 10n ** BigInt(VALUE_SCALE - decimals));
}
function parseScaledDecimal(value) {
const normalized = String(value ?? '0').trim();
const negative = normalized.startsWith('-');
const unsigned = normalized.replace(/^[+-]/, '');
const [wholePart, fractionalPart = ''] = unsigned.split('.');
const whole = BigInt(wholePart || '0');
const fractional = BigInt((fractionalPart.padEnd(VALUE_SCALE, '0')).slice(0, VALUE_SCALE) || '0');
const scaled = (whole * VALUE_FACTOR) + fractional;
return negative ? -scaled : scaled;
}
function multiplyScaled(left, right) {
return (left * right) / VALUE_FACTOR;
}
function formatScaledDecimal(value) {
const negative = value < 0n;
const absolute = negative ? -value : value;
const whole = absolute / VALUE_FACTOR;
const fractional = absolute % VALUE_FACTOR;
if (fractional === 0n) {
return `${negative ? '-' : ''}${whole}`;
}
const fractionalText = fractional.toString().padStart(VALUE_SCALE, '0').replace(/0+$/, '');
return `${negative ? '-' : ''}${whole}.${fractionalText}`;
}

View file

@ -11,6 +11,8 @@ const TABLES = [
'trade_execution_results', 'trade_execution_results',
]; ];
const PORTFOLIO_METRICS_TABLE = 'portfolio_metrics_snapshots';
export function createPostgresPool({ connectionString }) { export function createPostgresPool({ connectionString }) {
return new Pool({ return new Pool({
connectionString, connectionString,
@ -48,6 +50,20 @@ export async function ensureHistorySchema(pool) {
ON ${table} (ingested_at DESC) ON ${table} (ingested_at DESC)
`); `);
} }
await pool.query(`
CREATE TABLE IF NOT EXISTS ${PORTFOLIO_METRICS_TABLE} (
metric_id TEXT PRIMARY KEY,
computed_at TIMESTAMPTZ NOT NULL,
baseline_anchor_at TIMESTAMPTZ,
baseline_status TEXT NOT NULL,
payload JSONB NOT NULL
)
`);
await pool.query(`
CREATE INDEX IF NOT EXISTS ${PORTFOLIO_METRICS_TABLE}_computed_at_idx
ON ${PORTFOLIO_METRICS_TABLE} (computed_at DESC)
`);
} }
export async function insertHistoryEvent(pool, { table, topic, event, record }) { export async function insertHistoryEvent(pool, { table, topic, event, record }) {
@ -87,3 +103,140 @@ export async function insertHistoryEvent(pool, { table, topic, event, record })
], ],
); );
} }
export async function loadPortfolioMetricInputs(pool) {
const [currentInventory, currentPrice, commandAggregate, resultAggregate] = await Promise.all([
loadLatestEventPayload(pool, 'intent_inventory_snapshots'),
loadLatestEventPayload(pool, 'market_price_events'),
pool.query(`
SELECT
MIN(ingested_at) AS first_command_at,
COUNT(*)::INT AS command_count
FROM execute_trade_commands
`),
pool.query(`
SELECT COUNT(*)::INT AS result_count
FROM trade_execution_results
`),
]);
const firstCommandAt = commandAggregate.rows[0]?.first_command_at || null;
const commandCount = Number(commandAggregate.rows[0]?.command_count || 0);
const resultCount = Number(resultAggregate.rows[0]?.result_count || 0);
if (!firstCommandAt) {
return {
currentInventory,
currentPrice,
baseline: null,
commandCount,
resultCount,
};
}
const baselineInventory = await loadLatestEventPayload(
pool,
'intent_inventory_snapshots',
'WHERE ingested_at <= $1 ORDER BY ingested_at DESC LIMIT 1',
[firstCommandAt],
);
const baselinePrice = await loadNearestPricePayload(pool, baselineInventory?.ingested_at || firstCommandAt);
return {
currentInventory,
currentPrice,
baseline: baselineInventory && baselinePrice ? {
anchor: 'latest_inventory_before_first_command',
command_at: new Date(firstCommandAt).toISOString(),
inventory: baselineInventory.payload,
price: baselinePrice.payload,
} : null,
commandCount,
resultCount,
};
}
export async function upsertPortfolioMetric(pool, {
metricId,
computedAt,
baselineAnchorAt = null,
baselineStatus,
payload,
}) {
await pool.query(
`
INSERT INTO ${PORTFOLIO_METRICS_TABLE} (
metric_id,
computed_at,
baseline_anchor_at,
baseline_status,
payload
) VALUES ($1, $2, $3, $4, $5::jsonb)
ON CONFLICT (metric_id) DO UPDATE SET
computed_at = EXCLUDED.computed_at,
baseline_anchor_at = EXCLUDED.baseline_anchor_at,
baseline_status = EXCLUDED.baseline_status,
payload = EXCLUDED.payload
`,
[
metricId,
computedAt,
baselineAnchorAt,
baselineStatus,
JSON.stringify(payload),
],
);
}
export async function loadLatestPortfolioMetric(pool) {
const result = await pool.query(`
SELECT metric_id, computed_at, baseline_anchor_at, baseline_status, payload
FROM ${PORTFOLIO_METRICS_TABLE}
ORDER BY computed_at DESC
LIMIT 1
`);
if (!result.rows[0]) return null;
return normalizePortfolioMetricRow(result.rows[0]);
}
async function loadLatestEventPayload(pool, table, clause = 'ORDER BY ingested_at DESC LIMIT 1', params = []) {
const result = await pool.query(
`
SELECT ingested_at, payload
FROM ${table}
${clause}
`,
params,
);
if (!result.rows[0]) return null;
return {
ingested_at: result.rows[0].ingested_at ? new Date(result.rows[0].ingested_at).toISOString() : null,
payload: result.rows[0].payload,
};
}
async function loadNearestPricePayload(pool, anchorAt) {
const before = await loadLatestEventPayload(
pool,
'market_price_events',
'WHERE ingested_at <= $1 ORDER BY ingested_at DESC LIMIT 1',
[anchorAt],
);
if (before) return before;
return loadLatestEventPayload(
pool,
'market_price_events',
'WHERE ingested_at >= $1 ORDER BY ingested_at ASC LIMIT 1',
[anchorAt],
);
}
function normalizePortfolioMetricRow(row) {
return {
metric_id: row.metric_id,
computed_at: row.computed_at ? new Date(row.computed_at).toISOString() : null,
baseline_anchor_at: row.baseline_anchor_at ? new Date(row.baseline_anchor_at).toISOString() : null,
baseline_status: row.baseline_status,
payload: row.payload,
};
}

View file

@ -0,0 +1,106 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { buildPortfolioMetricId, computePortfolioMetric } from '../src/core/portfolio-metrics.mjs';
const btcAsset = {
assetId: 'nep141:btc.omft.near',
symbol: 'BTC',
decimals: 8,
};
const eureAsset = {
assetId: 'nep141:eure.omft.near',
symbol: 'EURe',
decimals: 18,
};
test('portfolio metrics compute trade pnl and mark-to-market pnl from baseline funding inventory', () => {
const metric = computePortfolioMetric({
baseline: {
anchor: 'latest_inventory_before_first_command',
command_at: '2026-04-02T18:10:43.569Z',
inventory: {
inventory_id: 'baseline-1',
synced_at: '2026-04-02T18:10:33.381Z',
spendable: {
'nep141:btc.omft.near': '100000',
'nep141:eure.omft.near': '60000000000000000000',
},
},
price: {
price_id: 'price-baseline-1',
observed_at: '2026-04-02T18:10:30.109Z',
eure_per_btc: '57792.20000000',
},
},
currentInventory: {
inventory_id: 'current-1',
synced_at: '2026-04-02T20:46:48.492Z',
spendable: {
'nep141:btc.omft.near': '137014',
'nep141:eure.omft.near': '38999978799978799978',
},
},
currentPrice: {
price_id: 'price-current-1',
observed_at: '2026-04-02T20:46:55.305Z',
eure_per_btc: '58079.20000000',
},
btcAsset,
eureAsset,
commandCount: 7,
resultCount: 7,
});
assert.equal(metric.baseline_status, 'active');
assert.equal(metric.current_portfolio_value_eure, '118.576613887978799978');
assert.equal(metric.trade_pnl_eure, '0.391183707978799978');
assert.equal(metric.mark_to_market_pnl_eure, '0.497413887978799978');
assert.equal(metric.price_move_pnl_eure, '0.10623018');
assert.deepEqual(metric.inventory_delta, {
btc_units: '37014',
btc: '0.00037014',
eure_units: '-21000021200021200022',
eure: '-21.000021200021200022',
});
});
test('portfolio metrics stay available before the first live execution', () => {
const metric = computePortfolioMetric({
baseline: null,
currentInventory: {
inventory_id: 'current-2',
synced_at: '2026-04-02T20:46:48.492Z',
spendable: {
'nep141:btc.omft.near': '100000',
'nep141:eure.omft.near': '60000000000000000000',
},
},
currentPrice: {
price_id: 'price-current-2',
observed_at: '2026-04-02T20:46:55.305Z',
eure_per_btc: '58079.20000000',
},
btcAsset,
eureAsset,
commandCount: 0,
resultCount: 0,
});
assert.equal(metric.baseline_status, 'awaiting_first_execution');
assert.equal(metric.current_portfolio_value_eure, '118.0792');
assert.equal(metric.trade_pnl_eure, null);
assert.equal(metric.mark_to_market_pnl_eure, null);
assert.equal(metric.price_move_pnl_eure, null);
});
test('portfolio metric id keys off the baseline and current snapshots', () => {
const metricId = buildPortfolioMetricId({
baselineInventoryId: 'baseline-1',
currentInventoryId: 'current-1',
currentPriceId: 'price-current-1',
});
assert.equal(metricId, 'portfolio-metric:baseline-1:current-1:price-current-1');
});