Ship dashboard postgres loaders
All checks were successful
deploy / deploy (push) Successful in 23s

Proof: Include the postgres loader exports required by the deployed operator-dashboard backend bootstrap path.

Assumptions: The current postgres loader additions in the worktree are part of the operator-dashboard runtime surface and intended to ship with the dashboard backend.

Still fake: External alert delivery remains unconfigured; this commit only resolves the live operator-dashboard startup blocker.
This commit is contained in:
philipp 2026-04-08 19:38:19 +02:00
parent b8d731408e
commit 3fd0d8fe46

View file

@ -14,6 +14,8 @@ const TABLES = [
];
const PORTFOLIO_METRICS_TABLE = 'portfolio_metrics_snapshots';
const CREDITED_LIQUIDITY_STATUSES = ['CREDITED', 'COMPLETED', 'FINALIZED', 'SETTLED'];
const COMPLETED_WITHDRAWAL_STATUSES = ['COMPLETED', 'FINALIZED', 'SETTLED'];
export function createPostgresPool({ connectionString }) {
return new Pool({
@ -147,7 +149,10 @@ export async function insertHistoryEvent(pool, { table, topic, event, record })
);
}
export async function loadPortfolioMetricInputs(pool) {
export async function loadPortfolioMetricInputs(pool, {
btcAsset = null,
eureAsset = null,
} = {}) {
const [currentInventory, currentPrice, commandAggregate, resultAggregate] = await Promise.all([
loadLatestEventPayload(pool, 'intent_inventory_snapshots'),
loadLatestEventPayload(pool, 'market_price_events'),
@ -184,6 +189,18 @@ export async function loadPortfolioMetricInputs(pool) {
[firstCommandAt],
);
const baselinePrice = await loadNearestPricePayload(pool, baselineInventory?.ingested_at || firstCommandAt);
const externalFlows = (
baselineInventory
&& baselinePrice
&& btcAsset?.assetId
&& eureAsset?.assetId
)
? await loadExternalAssetFlowsSince(pool, {
since: firstCommandAt,
btcAsset,
eureAsset,
})
: [];
return {
currentInventory,
@ -194,6 +211,7 @@ export async function loadPortfolioMetricInputs(pool) {
inventory: baselineInventory.payload,
price: baselinePrice.payload,
} : null,
externalFlows,
commandCount,
resultCount,
};
@ -242,6 +260,196 @@ export async function loadLatestPortfolioMetric(pool) {
return normalizePortfolioMetricRow(result.rows[0]);
}
export async function loadLatestInventorySnapshot(pool) {
const latest = await loadLatestEventPayload(pool, 'intent_inventory_snapshots');
if (!latest) return null;
return {
ingested_at: latest.ingested_at,
payload: latest.payload,
};
}
export async function loadLatestMarketPrice(pool) {
const latest = await loadLatestEventPayload(pool, 'market_price_events');
if (!latest) return null;
return {
ingested_at: latest.ingested_at,
payload: latest.payload,
};
}
export async function loadRecentQuotes(pool, { limit = 10 } = {}) {
const result = await pool.query(
`
SELECT observed_at, ingested_at, payload
FROM swap_demand_events
ORDER BY ingested_at DESC
LIMIT $1
`,
[limit],
);
return result.rows.map(normalizeRecentQuoteRow);
}
export async function loadSuccessfulTradeSummary(pool) {
const result = await pool.query(`
SELECT
COUNT(*)::INT AS total,
MAX(COALESCE(observed_at, ingested_at)) AS last_successful_trade_at
FROM trade_execution_results
WHERE payload->>'status' = 'submitted'
`);
return {
total: Number(result.rows[0]?.total || 0),
last_successful_trade_at: toIsoTimestamp(result.rows[0]?.last_successful_trade_at),
};
}
export async function loadSuccessfulTradesPage(pool, { page = 1, pageSize = 20 } = {}) {
const normalizedPage = Math.max(1, Number(page) || 1);
const normalizedPageSize = Math.max(1, Number(pageSize) || 20);
const offset = (normalizedPage - 1) * normalizedPageSize;
const [countResult, rowsResult] = await Promise.all([
pool.query(`
SELECT COUNT(*)::INT AS total
FROM trade_execution_results
WHERE payload->>'status' = 'submitted'
`),
pool.query(
`
SELECT
r.observed_at AS result_observed_at,
r.ingested_at AS result_ingested_at,
r.payload AS result_payload,
c.payload AS command_payload,
d.payload AS decision_payload
FROM trade_execution_results r
LEFT JOIN execute_trade_commands c
ON c.decision_key = r.decision_key
LEFT JOIN trade_decisions d
ON d.decision_key = COALESCE(c.payload->>'decision_id', r.payload->>'decision_id')
WHERE r.payload->>'status' = 'submitted'
ORDER BY COALESCE(r.observed_at, r.ingested_at) DESC
LIMIT $1
OFFSET $2
`,
[normalizedPageSize, offset],
),
]);
const total = Number(countResult.rows[0]?.total || 0);
return {
page: normalizedPage,
page_size: normalizedPageSize,
total,
total_pages: total > 0 ? Math.ceil(total / normalizedPageSize) : 1,
items: rowsResult.rows.map(normalizeSuccessfulTradeRow),
};
}
export async function loadCurrentFundingObservations(pool) {
const result = await pool.query(`
SELECT DISTINCT ON (decision_key)
observed_at,
ingested_at,
payload
FROM funding_observations
WHERE decision_key IS NOT NULL
ORDER BY decision_key, ingested_at DESC
`);
return result.rows
.map((row) => ({
observed_at: toIsoTimestamp(row.observed_at),
ingested_at: toIsoTimestamp(row.ingested_at),
payload: row.payload,
}))
.sort((left, right) => (
Date.parse(
right.payload?.last_seen_at
|| right.observed_at
|| right.ingested_at
|| '',
) - Date.parse(
left.payload?.last_seen_at
|| left.observed_at
|| left.ingested_at
|| '',
)
));
}
export async function loadRecentDepositStatuses(pool, { limit = 20 } = {}) {
const normalizedLimit = Math.max(1, Number(limit) || 20);
const fetchLimit = Math.max(normalizedLimit * 4, normalizedLimit);
const result = await pool.query(
`
SELECT observed_at, ingested_at, payload
FROM liquidity_actions
WHERE payload->>'action_type' = 'deposit_status_observed'
ORDER BY COALESCE(observed_at, ingested_at) DESC
LIMIT $1
`,
[fetchLimit],
);
const seen = new Set();
const uniqueRows = [];
for (const row of result.rows) {
const key = buildDepositStatusRowKey(row.payload);
if (seen.has(key)) continue;
seen.add(key);
uniqueRows.push(row);
if (uniqueRows.length >= normalizedLimit) break;
}
return uniqueRows.map((row) => ({
observed_at: toIsoTimestamp(row.observed_at),
ingested_at: toIsoTimestamp(row.ingested_at),
payload: row.payload,
}));
}
export async function loadRecentAlertTransitions(pool, { limit = 20 } = {}) {
const result = await pool.query(
`
SELECT observed_at, ingested_at, payload
FROM ops_alerts
ORDER BY ingested_at DESC
LIMIT $1
`,
[limit],
);
return result.rows.map((row) => ({
observed_at: toIsoTimestamp(row.observed_at),
ingested_at: toIsoTimestamp(row.ingested_at),
payload: row.payload,
}));
}
export async function loadRecentTradeDecisions(pool, { limit = 20 } = {}) {
const result = await pool.query(
`
SELECT observed_at, ingested_at, payload
FROM trade_decisions
ORDER BY COALESCE(observed_at, ingested_at) DESC
LIMIT $1
`,
[limit],
);
return result.rows.map((row) => ({
observed_at: toIsoTimestamp(row.observed_at),
ingested_at: toIsoTimestamp(row.ingested_at),
payload: row.payload,
}));
}
async function loadLatestEventPayload(pool, table, clause = 'ORDER BY ingested_at DESC LIMIT 1', params = []) {
const result = await pool.query(
`
@ -274,6 +482,138 @@ async function loadNearestPricePayload(pool, anchorAt) {
);
}
async function loadExternalAssetFlowsSince(pool, {
since,
btcAsset,
eureAsset,
} = {}) {
const [depositRows, withdrawalRows] = await Promise.all([
loadCreditedDepositRowsSince(pool, since),
loadCompletedWithdrawalRowsSince(pool, since),
]);
const flows = [];
for (const row of depositRows) {
flows.push(await normalizeExternalFlowRow(pool, {
row,
kind: 'deposit',
sign: 1n,
btcAsset,
eureAsset,
}));
}
for (const row of withdrawalRows) {
flows.push(await normalizeExternalFlowRow(pool, {
row,
kind: 'withdrawal',
sign: -1n,
btcAsset,
eureAsset,
}));
}
return flows
.filter(Boolean)
.sort((left, right) => timestampValue(left.effective_at) - timestampValue(right.effective_at));
}
async function loadCreditedDepositRowsSince(pool, since) {
const result = await pool.query(
`
SELECT DISTINCT ON (
payload->'details'->>'tx_hash',
payload->>'chain',
payload->>'asset_id',
payload->'details'->>'amount'
)
observed_at,
ingested_at,
payload
FROM liquidity_actions
WHERE ingested_at > $1
AND payload->>'action_type' = 'deposit_status_observed'
AND UPPER(payload->>'status') = ANY($2)
AND COALESCE(payload->'details'->>'tx_hash', '') <> ''
ORDER BY
payload->'details'->>'tx_hash',
payload->>'chain',
payload->>'asset_id',
payload->'details'->>'amount',
ingested_at DESC
`,
[since, CREDITED_LIQUIDITY_STATUSES],
);
return result.rows;
}
async function loadCompletedWithdrawalRowsSince(pool, since) {
const result = await pool.query(
`
SELECT DISTINCT ON (
payload->'details'->>'withdrawal_hash',
payload->>'chain',
payload->>'asset_id'
)
observed_at,
ingested_at,
payload
FROM liquidity_actions
WHERE ingested_at > $1
AND payload->>'action_type' = 'withdrawal_status_changed'
AND UPPER(payload->>'status') = ANY($2)
AND COALESCE(payload->'details'->>'withdrawal_hash', '') <> ''
ORDER BY
payload->'details'->>'withdrawal_hash',
payload->>'chain',
payload->>'asset_id',
ingested_at DESC
`,
[since, COMPLETED_WITHDRAWAL_STATUSES],
);
return result.rows;
}
async function normalizeExternalFlowRow(pool, {
row,
kind,
sign,
btcAsset,
eureAsset,
} = {}) {
const payload = row?.payload || {};
const details = payload.details || {};
const assetId = payload.asset_id || details.asset_id || null;
if (!assetId) return null;
const amount = String(details.amount || '0');
const effectiveAt = toIsoTimestamp(row.observed_at || row.ingested_at);
const signedUnits = (sign * BigInt(amount)).toString();
let referencePriceAtFlowTime = null;
if (assetId === btcAsset?.assetId) {
const nearestPrice = await loadNearestPricePayload(pool, effectiveAt);
referencePriceAtFlowTime = nearestPrice?.payload?.eure_per_btc || null;
} else if (assetId !== eureAsset?.assetId) {
return null;
}
return {
flow_id:
kind === 'deposit'
? `deposit:${details.tx_hash || effectiveAt || Math.random().toString(16).slice(2)}`
: `withdrawal:${details.withdrawal_hash || effectiveAt || Math.random().toString(16).slice(2)}`,
kind,
asset_id: assetId,
effective_at: effectiveAt,
signed_units: signedUnits,
tx_hash: details.tx_hash || null,
withdrawal_hash: details.withdrawal_hash || null,
reference_price_eure_per_btc_at_flow_time: referencePriceAtFlowTime,
};
}
function normalizePortfolioMetricRow(row) {
return {
metric_id: row.metric_id,
@ -284,6 +624,87 @@ function normalizePortfolioMetricRow(row) {
};
}
function normalizeRecentQuoteRow(row) {
const payload = row.payload || {};
return {
quote_id: payload.quote_id || null,
pair: payload.pair || buildPair(payload.asset_in, payload.asset_out),
asset_in: payload.asset_in || null,
asset_out: payload.asset_out || null,
request_kind: payload.request_kind || null,
amount_in: payload.amount_in ?? null,
amount_out: payload.amount_out ?? null,
min_deadline_ms: payload.min_deadline_ms ?? null,
observed_at: toIsoTimestamp(row.observed_at),
ingested_at: toIsoTimestamp(row.ingested_at),
};
}
function normalizeSuccessfulTradeRow(row) {
const resultPayload = row.result_payload || {};
const commandPayload = row.command_payload || {};
const decisionPayload = row.decision_payload || {};
return {
command_id: resultPayload.command_id || commandPayload.command_id || null,
decision_id:
commandPayload.decision_id
|| resultPayload.decision_id
|| decisionPayload.decision_id
|| null,
execution_key: resultPayload.execution_key || commandPayload.execution_key || null,
quote_id: resultPayload.quote_id || commandPayload.quote_id || decisionPayload.quote_id || null,
pair: resultPayload.pair || commandPayload.pair || decisionPayload.pair || null,
observed_at: toIsoTimestamp(row.result_observed_at || row.result_ingested_at),
ingested_at: toIsoTimestamp(row.result_ingested_at),
status: resultPayload.status || null,
result_code: resultPayload.result_code || null,
request_kind: commandPayload.request_kind || decisionPayload.request_kind || null,
asset_in: commandPayload.asset_in || null,
asset_out: commandPayload.asset_out || null,
amount_in: resolveTradeAmount(commandPayload, 'amount_in'),
amount_out: resolveTradeAmount(commandPayload, 'amount_out'),
quoted_amount_in: commandPayload.amount_in || null,
quoted_amount_out: commandPayload.amount_out || null,
gross_edge_pct: decisionPayload.gross_edge_pct || null,
decision_reason: decisionPayload.decision_reason || null,
direction: decisionPayload.direction || null,
};
}
function resolveTradeAmount(commandPayload, field) {
const quoteOutputField = commandPayload?.quote_output?.[field];
const proposedField = commandPayload?.[`proposed_${field}`];
return quoteOutputField || proposedField || commandPayload?.[field] || null;
}
function buildPair(assetIn, assetOut) {
if (!assetIn || !assetOut) return null;
return `${assetIn}->${assetOut}`;
}
function toIsoTimestamp(value) {
if (!value) return null;
const date = new Date(value);
return Number.isNaN(date.getTime()) ? null : date.toISOString();
}
function timestampValue(value) {
const parsed = Date.parse(value || '');
return Number.isFinite(parsed) ? parsed : -Infinity;
}
function buildDepositStatusRowKey(payload) {
const details = payload?.details || {};
return [
details.tx_hash || 'no-tx',
payload?.chain || details.chain || 'no-chain',
payload?.asset_id || details.asset_id || 'no-asset',
details.address || 'no-address',
details.amount || 'no-amount',
].join('|');
}
async function ensureExpressionIndex(pool, { name, table, expression }) {
await pool.query(`
CREATE INDEX IF NOT EXISTS ${name}