From a73ea1c83f96cedf77e2ba4b0d2adad37c245037 Mon Sep 17 00:00:00 2001 From: philipp Date: Mon, 18 May 2026 13:52:44 +0200 Subject: [PATCH] Bound dashboard intent outcome refresh Proof: npm test (215/215), npm run operator-dashboard:build, and focused intent/dashboard regressions cover bounded outcome refresh and dashboard bootstrap skipping synchronous refresh. Assumptions: history-writer and executor remain responsible for durable intent outcome refresh; dashboard bootstrap should read persisted outcomes instead of recomputing them inline. Still fake: dashboard quote outcomes still depend on inventory-delta attribution instead of venue-native terminal fill events. --- src/apps/operator-dashboard.mjs | 1 + src/lib/postgres.mjs | 188 +++++++++++++++++--- test/operator-dashboard-app-static.test.mjs | 4 + test/postgres-intent-requests.test.mjs | 146 +++++++++++++++ 4 files changed, 317 insertions(+), 22 deletions(-) diff --git a/src/apps/operator-dashboard.mjs b/src/apps/operator-dashboard.mjs index 4f1ac8c..26047b9 100644 --- a/src/apps/operator-dashboard.mjs +++ b/src/apps/operator-dashboard.mjs @@ -507,6 +507,7 @@ async function loadBootstrapPayload({ auth, page, pageSize }) { limit: 20, btcAsset: runtimeConfig.tradingBtc, eureAsset: runtimeConfig.tradingEure, + refreshOutcomes: false, }), [], sourceErrors, diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index a13b068..3a41ff3 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -46,6 +46,12 @@ const PAIR_PRICE_ROUTES_TABLE = 'pair_price_routes'; const PAIR_CONFIG_AUDIT_LOG_TABLE = 'pair_config_audit_log'; const CREDITED_LIQUIDITY_STATUSES = ['CREDITED', 'COMPLETED', 'FINALIZED', 'SETTLED']; const COMPLETED_WITHDRAWAL_STATUSES = ['COMPLETED', 'FINALIZED', 'SETTLED']; +const REFRESHABLE_INTENT_REQUEST_OUTCOME_STATUSES = [ + 'draft', + 'submitted', + 'accepted_by_relay', + 'awaiting_settlement', +]; export function createPostgresPool({ connectionString }) { return new Pool({ @@ -2454,33 +2460,40 @@ export async function refreshIntentRequestOutcomes(pool, { btcAsset = null, eureAsset = null, now = Date.now(), + preflightLimit = 100, + submissionLimit = 500, + inventoryLimit = 5000, } = {}) { if (!btcAsset?.assetId || !eureAsset?.assetId) return []; - const [preflightResult, submissionResult, inventoryResult] = await Promise.all([ - pool.query(` - SELECT DISTINCT ON (payload->>'request_id') - event_id, observed_at, ingested_at, payload - FROM intent_request_preflights - WHERE COALESCE(payload->>'request_id', '') <> '' - ORDER BY payload->>'request_id', COALESCE(observed_at, ingested_at) DESC - `), - pool.query(` - SELECT event_id, observed_at, ingested_at, payload - FROM intent_request_submission_results - ORDER BY COALESCE(observed_at, ingested_at) ASC - `), - pool.query(` - SELECT event_id, observed_at, ingested_at, payload - FROM intent_inventory_snapshots - ORDER BY COALESCE(observed_at, ingested_at) ASC - `), - ]); + const safePreflightLimit = Math.max(1, Number(preflightLimit) || 100); + const safeSubmissionLimit = Math.max(1, Number(submissionLimit) || 500); + const safeInventoryLimit = Math.max(1, Number(inventoryLimit) || 5000); + const preflightResult = await loadIntentRequestPreflightRefreshCandidates(pool, { + limit: safePreflightLimit, + }); + if (!preflightResult.rows.length) return []; + + const requestIds = [ + ...new Set(preflightResult.rows + .map((row) => row.payload?.request_id) + .filter(Boolean)), + ]; + if (!requestIds.length) return []; + + const submissions = await loadIntentRequestSubmissionsForRefresh(pool, { + requestIds, + limit: safeSubmissionLimit, + }); + const inventorySnapshots = await loadIntentInventorySnapshotsForRequestRefresh(pool, { + submissions: submissions.rows, + limit: safeInventoryLimit, + }); const records = deriveIntentRequestOutcomeRecords({ preflights: preflightResult.rows, - submissions: submissionResult.rows, - inventorySnapshots: inventoryResult.rows, + submissions: submissions.rows, + inventorySnapshots: inventorySnapshots.rows, btcAsset, eureAsset, now, @@ -2500,6 +2513,136 @@ export async function refreshIntentRequestOutcomes(pool, { return records; } +async function loadIntentRequestPreflightRefreshCandidates(pool, { limit }) { + return pool.query( + ` + WITH latest_preflights AS ( + SELECT DISTINCT ON (payload->>'request_id') + event_id, + observed_at, + ingested_at, + payload + FROM intent_request_preflights + WHERE COALESCE(payload->>'request_id', '') <> '' + ORDER BY payload->>'request_id', COALESCE(observed_at, ingested_at) DESC + ), latest_submissions AS ( + SELECT DISTINCT ON (payload->>'request_id') + payload->>'request_id' AS request_id, + observed_at, + ingested_at, + payload + FROM intent_request_submission_results + WHERE COALESCE(payload->>'request_id', '') <> '' + ORDER BY + payload->>'request_id', + COALESCE(observed_at, ingested_at) DESC, + CASE payload->>'status' + WHEN 'accepted_by_relay' THEN 0 + WHEN 'failed' THEN 1 + WHEN 'blocked' THEN 2 + WHEN 'submit_requested' THEN 3 + ELSE 4 + END + ), refresh_candidates AS ( + SELECT + p.event_id, + p.observed_at, + p.ingested_at, + p.payload, + COALESCE( + s.observed_at, + s.ingested_at, + p.observed_at, + p.ingested_at + ) AS refresh_sort_at + FROM latest_preflights p + LEFT JOIN latest_submissions s + ON s.request_id = p.payload->>'request_id' + LEFT JOIN ${INTENT_REQUEST_OUTCOMES_TABLE} o + ON o.request_id = p.payload->>'request_id' + WHERE o.request_id IS NULL + OR o.outcome_status = ANY($1::text[]) + OR COALESCE( + s.observed_at, + s.ingested_at, + p.observed_at, + p.ingested_at + ) > o.computed_at + ORDER BY refresh_sort_at DESC + LIMIT $2 + ) + SELECT event_id, observed_at, ingested_at, payload + FROM refresh_candidates + ORDER BY COALESCE(observed_at, ingested_at) ASC + `, + [REFRESHABLE_INTENT_REQUEST_OUTCOME_STATUSES, limit], + ); +} + +async function loadIntentRequestSubmissionsForRefresh(pool, { requestIds, limit }) { + return pool.query( + ` + SELECT event_id, observed_at, ingested_at, payload + FROM intent_request_submission_results + WHERE payload->>'request_id' = ANY($1::text[]) + ORDER BY COALESCE(observed_at, ingested_at) ASC + LIMIT $2 + `, + [requestIds, limit], + ); +} + +async function loadIntentInventorySnapshotsForRequestRefresh(pool, { + submissions, + limit, +}) { + const anchorAt = earliestIntentRequestInventoryAnchor({ + submissions, + }); + if (!anchorAt) return { rows: [] }; + + return pool.query( + ` + WITH previous_snapshot AS ( + SELECT event_id, observed_at, ingested_at, quote_id, payload, COALESCE(observed_at, ingested_at) AS sort_at + FROM intent_inventory_snapshots + WHERE COALESCE(observed_at, ingested_at) < $1 + ORDER BY COALESCE(observed_at, ingested_at) DESC + LIMIT 1 + ), following_snapshots AS ( + SELECT event_id, observed_at, ingested_at, quote_id, payload, COALESCE(observed_at, ingested_at) AS sort_at + FROM intent_inventory_snapshots + WHERE COALESCE(observed_at, ingested_at) >= $1 + ORDER BY COALESCE(observed_at, ingested_at) ASC + LIMIT $2 + ), bounded_snapshots AS ( + SELECT event_id, observed_at, ingested_at, quote_id, payload, sort_at + FROM previous_snapshot + UNION ALL + SELECT event_id, observed_at, ingested_at, quote_id, payload, sort_at + FROM following_snapshots + ) + SELECT event_id, observed_at, ingested_at, quote_id, payload + FROM bounded_snapshots + ORDER BY sort_at ASC + `, + [anchorAt, limit], + ); +} + +function earliestIntentRequestInventoryAnchor({ submissions = [] } = {}) { + const anchors = submissions + .map((row) => row.payload?.submitted_at || row.observed_at || row.ingested_at) + .map((value) => { + const timestamp = timestampValue(value); + return Number.isFinite(timestamp) ? timestamp : null; + }) + .filter((value) => value != null) + .sort((left, right) => left - right); + + return anchors.length ? new Date(anchors[0]).toISOString() : null; +} + export async function upsertIntentRequestOutcome(pool, { request_id, idempotency_key, @@ -2583,8 +2726,9 @@ export async function loadRecentIntentRequests(pool, { btcAsset = null, eureAsset = null, now = Date.now(), + refreshOutcomes = true, } = {}) { - if (btcAsset?.assetId && eureAsset?.assetId) { + if (refreshOutcomes && btcAsset?.assetId && eureAsset?.assetId) { await refreshIntentRequestOutcomes(pool, { btcAsset, eureAsset, now }).catch(() => []); } diff --git a/test/operator-dashboard-app-static.test.mjs b/test/operator-dashboard-app-static.test.mjs index b2ebe4d..d4c9124 100644 --- a/test/operator-dashboard-app-static.test.mjs +++ b/test/operator-dashboard-app-static.test.mjs @@ -33,3 +33,7 @@ test('operator dashboard API auth failures are JSON for frontend fetches', () => assert.match(source, /req\.url \|\| ''\)\.startsWith\('\/api\/'\)/); assert.match(source, /sendJson\(res, 401, \{ error: 'authentication_required' \}\)/); }); + +test('operator dashboard bootstrap does not synchronously refresh intent request outcomes', () => { + assert.match(source, /loadRecentIntentRequests\(pool, \{[\s\S]*refreshOutcomes: false/); +}); diff --git a/test/postgres-intent-requests.test.mjs b/test/postgres-intent-requests.test.mjs index 1c8f90e..6f8fdb8 100644 --- a/test/postgres-intent-requests.test.mjs +++ b/test/postgres-intent-requests.test.mjs @@ -2,8 +2,10 @@ import test from 'node:test'; import assert from 'node:assert/strict'; import { + loadRecentIntentRequests, loadIntentRequestSubmissionsForStatusRefresh, normalizeIntentRequestRow, + refreshIntentRequestOutcomes, } from '../src/lib/postgres.mjs'; test('intent request normalization prefers terminal outcome reason text over relay acceptance text', () => { @@ -152,3 +154,147 @@ test('intent request normalization explains unanswered solver quotes in plain te assert.equal(row.reason_text, 'The relay returned no solver quotes for this request.'); assert.equal(row.live_submit_capable, false); }); + +test('intent request outcome refresh bounds source queries to refresh candidates', async () => { + const btcAsset = { assetId: 'nep141:nbtc.bridge.near' }; + const eureAsset = { assetId: 'nep141:eure.omft.near' }; + const queries = []; + const pool = { + async query(sql, params = []) { + queries.push({ sql, params }); + + if (sql.includes('WITH latest_preflights')) { + assert.match(sql, /outcome_status = ANY\(\$1::text\[\]\)/); + assert.match(sql, /LIMIT \$2/); + assert.equal(params[1], 7); + return { + rows: [ + eventRow({ + eventId: 'preflight-1', + at: '2026-05-13T10:00:00.000Z', + payload: { + request_id: 'request-1', + state: 'draft', + source_asset_id: eureAsset.assetId, + destination_asset_id: btcAsset.assetId, + source_amount_units: '1000000000000000000', + selected_quote: { amount_out: '1000' }, + created_at: '2026-05-13T10:00:00.000Z', + }, + }), + ], + }; + } + + if (sql.includes('FROM intent_request_submission_results')) { + assert.match(sql, /payload->>'request_id' = ANY\(\$1::text\[\]\)/); + assert.match(sql, /LIMIT \$2/); + assert.deepEqual(params[0], ['request-1']); + assert.equal(params[1], 8); + return { + rows: [ + eventRow({ + eventId: 'submission-1', + at: '2026-05-13T10:00:05.000Z', + payload: { + request_id: 'request-1', + status: 'accepted_by_relay', + submitted_at: '2026-05-13T10:00:05.000Z', + destination_amount_units: '1000', + }, + }), + ], + }; + } + + if (sql.includes('FROM intent_inventory_snapshots')) { + assert.match(sql, /LIMIT \$2/); + assert.equal(params[0], '2026-05-13T10:00:05.000Z'); + assert.equal(params[1], 9); + return { + rows: [ + eventRow({ + eventId: 'inventory-1', + at: '2026-05-13T09:59:59.000Z', + payload: { + inventory_id: 'inventory-1', + spendable: { + [btcAsset.assetId]: '0', + [eureAsset.assetId]: '1000000000000000000', + }, + }, + }), + eventRow({ + eventId: 'inventory-2', + at: '2026-05-13T10:00:06.000Z', + payload: { + inventory_id: 'inventory-2', + spendable: { + [btcAsset.assetId]: '1000', + [eureAsset.assetId]: '0', + }, + }, + }), + ], + }; + } + + if (sql.includes('INSERT INTO intent_request_outcomes')) { + return { rows: [], rowCount: 1 }; + } + + throw new Error(`unexpected query: ${sql}`); + }, + }; + + const records = await refreshIntentRequestOutcomes(pool, { + btcAsset, + eureAsset, + now: Date.parse('2026-05-13T10:00:20.000Z'), + preflightLimit: 7, + submissionLimit: 8, + inventoryLimit: 9, + }); + + assert.equal(records.length, 1); + assert.equal(records[0].request_id, 'request-1'); + assert.equal(records[0].outcome_status, 'completed'); + assert.equal(queries.filter((entry) => entry.sql.includes('INSERT INTO intent_request_outcomes')).length, 1); +}); + +test('recent intent request loader can skip synchronous outcome refresh for dashboard bootstrap', async () => { + const queries = []; + const pool = { + async query(sql, params = []) { + queries.push({ sql, params }); + assert.match(sql, /WITH latest_preflights AS/); + assert.doesNotMatch(sql, /refresh_candidates/); + assert.equal(params[0], 3); + return { rows: [] }; + }, + }; + + const rows = await loadRecentIntentRequests(pool, { + limit: 3, + btcAsset: { assetId: 'nep141:nbtc.bridge.near' }, + eureAsset: { assetId: 'nep141:eure.omft.near' }, + refreshOutcomes: false, + }); + + assert.deepEqual(rows, []); + assert.equal(queries.length, 1); +}); + +function eventRow({ + eventId, + at, + payload, +}) { + return { + event_id: eventId, + observed_at: at, + ingested_at: at, + quote_id: null, + payload, + }; +}