diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index 6c8d800..3f460ac 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -14,6 +14,8 @@ const TABLES = [ ]; const PORTFOLIO_METRICS_TABLE = 'portfolio_metrics_snapshots'; +const CREDITED_LIQUIDITY_STATUSES = ['CREDITED', 'COMPLETED', 'FINALIZED', 'SETTLED']; +const COMPLETED_WITHDRAWAL_STATUSES = ['COMPLETED', 'FINALIZED', 'SETTLED']; export function createPostgresPool({ connectionString }) { return new Pool({ @@ -147,7 +149,10 @@ export async function insertHistoryEvent(pool, { table, topic, event, record }) ); } -export async function loadPortfolioMetricInputs(pool) { +export async function loadPortfolioMetricInputs(pool, { + btcAsset = null, + eureAsset = null, +} = {}) { const [currentInventory, currentPrice, commandAggregate, resultAggregate] = await Promise.all([ loadLatestEventPayload(pool, 'intent_inventory_snapshots'), loadLatestEventPayload(pool, 'market_price_events'), @@ -184,6 +189,18 @@ export async function loadPortfolioMetricInputs(pool) { [firstCommandAt], ); const baselinePrice = await loadNearestPricePayload(pool, baselineInventory?.ingested_at || firstCommandAt); + const externalFlows = ( + baselineInventory + && baselinePrice + && btcAsset?.assetId + && eureAsset?.assetId + ) + ? await loadExternalAssetFlowsSince(pool, { + since: firstCommandAt, + btcAsset, + eureAsset, + }) + : []; return { currentInventory, @@ -194,6 +211,7 @@ export async function loadPortfolioMetricInputs(pool) { inventory: baselineInventory.payload, price: baselinePrice.payload, } : null, + externalFlows, commandCount, resultCount, }; @@ -242,6 +260,196 @@ export async function loadLatestPortfolioMetric(pool) { return normalizePortfolioMetricRow(result.rows[0]); } +export async function loadLatestInventorySnapshot(pool) { + const latest = await loadLatestEventPayload(pool, 'intent_inventory_snapshots'); + if (!latest) return null; + return { + ingested_at: latest.ingested_at, + payload: latest.payload, + }; +} + +export async function loadLatestMarketPrice(pool) { + const latest = await loadLatestEventPayload(pool, 'market_price_events'); + if (!latest) return null; + return { + ingested_at: latest.ingested_at, + payload: latest.payload, + }; +} + +export async function loadRecentQuotes(pool, { limit = 10 } = {}) { + const result = await pool.query( + ` + SELECT observed_at, ingested_at, payload + FROM swap_demand_events + ORDER BY ingested_at DESC + LIMIT $1 + `, + [limit], + ); + + return result.rows.map(normalizeRecentQuoteRow); +} + +export async function loadSuccessfulTradeSummary(pool) { + const result = await pool.query(` + SELECT + COUNT(*)::INT AS total, + MAX(COALESCE(observed_at, ingested_at)) AS last_successful_trade_at + FROM trade_execution_results + WHERE payload->>'status' = 'submitted' + `); + + return { + total: Number(result.rows[0]?.total || 0), + last_successful_trade_at: toIsoTimestamp(result.rows[0]?.last_successful_trade_at), + }; +} + +export async function loadSuccessfulTradesPage(pool, { page = 1, pageSize = 20 } = {}) { + const normalizedPage = Math.max(1, Number(page) || 1); + const normalizedPageSize = Math.max(1, Number(pageSize) || 20); + const offset = (normalizedPage - 1) * normalizedPageSize; + + const [countResult, rowsResult] = await Promise.all([ + pool.query(` + SELECT COUNT(*)::INT AS total + FROM trade_execution_results + WHERE payload->>'status' = 'submitted' + `), + pool.query( + ` + SELECT + r.observed_at AS result_observed_at, + r.ingested_at AS result_ingested_at, + r.payload AS result_payload, + c.payload AS command_payload, + d.payload AS decision_payload + FROM trade_execution_results r + LEFT JOIN execute_trade_commands c + ON c.decision_key = r.decision_key + LEFT JOIN trade_decisions d + ON d.decision_key = COALESCE(c.payload->>'decision_id', r.payload->>'decision_id') + WHERE r.payload->>'status' = 'submitted' + ORDER BY COALESCE(r.observed_at, r.ingested_at) DESC + LIMIT $1 + OFFSET $2 + `, + [normalizedPageSize, offset], + ), + ]); + + const total = Number(countResult.rows[0]?.total || 0); + + return { + page: normalizedPage, + page_size: normalizedPageSize, + total, + total_pages: total > 0 ? Math.ceil(total / normalizedPageSize) : 1, + items: rowsResult.rows.map(normalizeSuccessfulTradeRow), + }; +} + +export async function loadCurrentFundingObservations(pool) { + const result = await pool.query(` + SELECT DISTINCT ON (decision_key) + observed_at, + ingested_at, + payload + FROM funding_observations + WHERE decision_key IS NOT NULL + ORDER BY decision_key, ingested_at DESC + `); + + return result.rows + .map((row) => ({ + observed_at: toIsoTimestamp(row.observed_at), + ingested_at: toIsoTimestamp(row.ingested_at), + payload: row.payload, + })) + .sort((left, right) => ( + Date.parse( + right.payload?.last_seen_at + || right.observed_at + || right.ingested_at + || '', + ) - Date.parse( + left.payload?.last_seen_at + || left.observed_at + || left.ingested_at + || '', + ) + )); +} + +export async function loadRecentDepositStatuses(pool, { limit = 20 } = {}) { + const normalizedLimit = Math.max(1, Number(limit) || 20); + const fetchLimit = Math.max(normalizedLimit * 4, normalizedLimit); + const result = await pool.query( + ` + SELECT observed_at, ingested_at, payload + FROM liquidity_actions + WHERE payload->>'action_type' = 'deposit_status_observed' + ORDER BY COALESCE(observed_at, ingested_at) DESC + LIMIT $1 + `, + [fetchLimit], + ); + + const seen = new Set(); + const uniqueRows = []; + for (const row of result.rows) { + const key = buildDepositStatusRowKey(row.payload); + if (seen.has(key)) continue; + seen.add(key); + uniqueRows.push(row); + if (uniqueRows.length >= normalizedLimit) break; + } + + return uniqueRows.map((row) => ({ + observed_at: toIsoTimestamp(row.observed_at), + ingested_at: toIsoTimestamp(row.ingested_at), + payload: row.payload, + })); +} + +export async function loadRecentAlertTransitions(pool, { limit = 20 } = {}) { + const result = await pool.query( + ` + SELECT observed_at, ingested_at, payload + FROM ops_alerts + ORDER BY ingested_at DESC + LIMIT $1 + `, + [limit], + ); + + return result.rows.map((row) => ({ + observed_at: toIsoTimestamp(row.observed_at), + ingested_at: toIsoTimestamp(row.ingested_at), + payload: row.payload, + })); +} + +export async function loadRecentTradeDecisions(pool, { limit = 20 } = {}) { + const result = await pool.query( + ` + SELECT observed_at, ingested_at, payload + FROM trade_decisions + ORDER BY COALESCE(observed_at, ingested_at) DESC + LIMIT $1 + `, + [limit], + ); + + return result.rows.map((row) => ({ + observed_at: toIsoTimestamp(row.observed_at), + ingested_at: toIsoTimestamp(row.ingested_at), + payload: row.payload, + })); +} + async function loadLatestEventPayload(pool, table, clause = 'ORDER BY ingested_at DESC LIMIT 1', params = []) { const result = await pool.query( ` @@ -274,6 +482,138 @@ async function loadNearestPricePayload(pool, anchorAt) { ); } +async function loadExternalAssetFlowsSince(pool, { + since, + btcAsset, + eureAsset, +} = {}) { + const [depositRows, withdrawalRows] = await Promise.all([ + loadCreditedDepositRowsSince(pool, since), + loadCompletedWithdrawalRowsSince(pool, since), + ]); + + const flows = []; + + for (const row of depositRows) { + flows.push(await normalizeExternalFlowRow(pool, { + row, + kind: 'deposit', + sign: 1n, + btcAsset, + eureAsset, + })); + } + + for (const row of withdrawalRows) { + flows.push(await normalizeExternalFlowRow(pool, { + row, + kind: 'withdrawal', + sign: -1n, + btcAsset, + eureAsset, + })); + } + + return flows + .filter(Boolean) + .sort((left, right) => timestampValue(left.effective_at) - timestampValue(right.effective_at)); +} + +async function loadCreditedDepositRowsSince(pool, since) { + const result = await pool.query( + ` + SELECT DISTINCT ON ( + payload->'details'->>'tx_hash', + payload->>'chain', + payload->>'asset_id', + payload->'details'->>'amount' + ) + observed_at, + ingested_at, + payload + FROM liquidity_actions + WHERE ingested_at > $1 + AND payload->>'action_type' = 'deposit_status_observed' + AND UPPER(payload->>'status') = ANY($2) + AND COALESCE(payload->'details'->>'tx_hash', '') <> '' + ORDER BY + payload->'details'->>'tx_hash', + payload->>'chain', + payload->>'asset_id', + payload->'details'->>'amount', + ingested_at DESC + `, + [since, CREDITED_LIQUIDITY_STATUSES], + ); + return result.rows; +} + +async function loadCompletedWithdrawalRowsSince(pool, since) { + const result = await pool.query( + ` + SELECT DISTINCT ON ( + payload->'details'->>'withdrawal_hash', + payload->>'chain', + payload->>'asset_id' + ) + observed_at, + ingested_at, + payload + FROM liquidity_actions + WHERE ingested_at > $1 + AND payload->>'action_type' = 'withdrawal_status_changed' + AND UPPER(payload->>'status') = ANY($2) + AND COALESCE(payload->'details'->>'withdrawal_hash', '') <> '' + ORDER BY + payload->'details'->>'withdrawal_hash', + payload->>'chain', + payload->>'asset_id', + ingested_at DESC + `, + [since, COMPLETED_WITHDRAWAL_STATUSES], + ); + return result.rows; +} + +async function normalizeExternalFlowRow(pool, { + row, + kind, + sign, + btcAsset, + eureAsset, +} = {}) { + const payload = row?.payload || {}; + const details = payload.details || {}; + const assetId = payload.asset_id || details.asset_id || null; + if (!assetId) return null; + + const amount = String(details.amount || '0'); + const effectiveAt = toIsoTimestamp(row.observed_at || row.ingested_at); + const signedUnits = (sign * BigInt(amount)).toString(); + let referencePriceAtFlowTime = null; + + if (assetId === btcAsset?.assetId) { + const nearestPrice = await loadNearestPricePayload(pool, effectiveAt); + referencePriceAtFlowTime = nearestPrice?.payload?.eure_per_btc || null; + } else if (assetId !== eureAsset?.assetId) { + return null; + } + + return { + flow_id: + kind === 'deposit' + ? `deposit:${details.tx_hash || effectiveAt || Math.random().toString(16).slice(2)}` + : `withdrawal:${details.withdrawal_hash || effectiveAt || Math.random().toString(16).slice(2)}`, + kind, + asset_id: assetId, + effective_at: effectiveAt, + signed_units: signedUnits, + tx_hash: details.tx_hash || null, + withdrawal_hash: details.withdrawal_hash || null, + reference_price_eure_per_btc_at_flow_time: referencePriceAtFlowTime, + }; +} + function normalizePortfolioMetricRow(row) { return { metric_id: row.metric_id, @@ -284,6 +624,87 @@ function normalizePortfolioMetricRow(row) { }; } +function normalizeRecentQuoteRow(row) { + const payload = row.payload || {}; + return { + quote_id: payload.quote_id || null, + pair: payload.pair || buildPair(payload.asset_in, payload.asset_out), + asset_in: payload.asset_in || null, + asset_out: payload.asset_out || null, + request_kind: payload.request_kind || null, + amount_in: payload.amount_in ?? null, + amount_out: payload.amount_out ?? null, + min_deadline_ms: payload.min_deadline_ms ?? null, + observed_at: toIsoTimestamp(row.observed_at), + ingested_at: toIsoTimestamp(row.ingested_at), + }; +} + +function normalizeSuccessfulTradeRow(row) { + const resultPayload = row.result_payload || {}; + const commandPayload = row.command_payload || {}; + const decisionPayload = row.decision_payload || {}; + + return { + command_id: resultPayload.command_id || commandPayload.command_id || null, + decision_id: + commandPayload.decision_id + || resultPayload.decision_id + || decisionPayload.decision_id + || null, + execution_key: resultPayload.execution_key || commandPayload.execution_key || null, + quote_id: resultPayload.quote_id || commandPayload.quote_id || decisionPayload.quote_id || null, + pair: resultPayload.pair || commandPayload.pair || decisionPayload.pair || null, + observed_at: toIsoTimestamp(row.result_observed_at || row.result_ingested_at), + ingested_at: toIsoTimestamp(row.result_ingested_at), + status: resultPayload.status || null, + result_code: resultPayload.result_code || null, + request_kind: commandPayload.request_kind || decisionPayload.request_kind || null, + asset_in: commandPayload.asset_in || null, + asset_out: commandPayload.asset_out || null, + amount_in: resolveTradeAmount(commandPayload, 'amount_in'), + amount_out: resolveTradeAmount(commandPayload, 'amount_out'), + quoted_amount_in: commandPayload.amount_in || null, + quoted_amount_out: commandPayload.amount_out || null, + gross_edge_pct: decisionPayload.gross_edge_pct || null, + decision_reason: decisionPayload.decision_reason || null, + direction: decisionPayload.direction || null, + }; +} + +function resolveTradeAmount(commandPayload, field) { + const quoteOutputField = commandPayload?.quote_output?.[field]; + const proposedField = commandPayload?.[`proposed_${field}`]; + return quoteOutputField || proposedField || commandPayload?.[field] || null; +} + +function buildPair(assetIn, assetOut) { + if (!assetIn || !assetOut) return null; + return `${assetIn}->${assetOut}`; +} + +function toIsoTimestamp(value) { + if (!value) return null; + const date = new Date(value); + return Number.isNaN(date.getTime()) ? null : date.toISOString(); +} + +function timestampValue(value) { + const parsed = Date.parse(value || ''); + return Number.isFinite(parsed) ? parsed : -Infinity; +} + +function buildDepositStatusRowKey(payload) { + const details = payload?.details || {}; + return [ + details.tx_hash || 'no-tx', + payload?.chain || details.chain || 'no-chain', + payload?.asset_id || details.asset_id || 'no-asset', + details.address || 'no-address', + details.amount || 'no-amount', + ].join('|'); +} + async function ensureExpressionIndex(pool, { name, table, expression }) { await pool.query(` CREATE INDEX IF NOT EXISTS ${name}