Proof: npm test (215/215), npm run operator-dashboard:build, and focused intent/dashboard regressions cover bounded outcome refresh and dashboard bootstrap skipping synchronous refresh. Assumptions: history-writer and executor remain responsible for durable intent outcome refresh; dashboard bootstrap should read persisted outcomes instead of recomputing them inline. Still fake: dashboard quote outcomes still depend on inventory-delta attribution instead of venue-native terminal fill events.
This commit is contained in:
parent
fd899a3788
commit
a73ea1c83f
4 changed files with 317 additions and 22 deletions
|
|
@ -507,6 +507,7 @@ async function loadBootstrapPayload({ auth, page, pageSize }) {
|
|||
limit: 20,
|
||||
btcAsset: runtimeConfig.tradingBtc,
|
||||
eureAsset: runtimeConfig.tradingEure,
|
||||
refreshOutcomes: false,
|
||||
}),
|
||||
[],
|
||||
sourceErrors,
|
||||
|
|
|
|||
|
|
@ -46,6 +46,12 @@ const PAIR_PRICE_ROUTES_TABLE = 'pair_price_routes';
|
|||
const PAIR_CONFIG_AUDIT_LOG_TABLE = 'pair_config_audit_log';
|
||||
const CREDITED_LIQUIDITY_STATUSES = ['CREDITED', 'COMPLETED', 'FINALIZED', 'SETTLED'];
|
||||
const COMPLETED_WITHDRAWAL_STATUSES = ['COMPLETED', 'FINALIZED', 'SETTLED'];
|
||||
const REFRESHABLE_INTENT_REQUEST_OUTCOME_STATUSES = [
|
||||
'draft',
|
||||
'submitted',
|
||||
'accepted_by_relay',
|
||||
'awaiting_settlement',
|
||||
];
|
||||
|
||||
export function createPostgresPool({ connectionString }) {
|
||||
return new Pool({
|
||||
|
|
@ -2454,33 +2460,40 @@ export async function refreshIntentRequestOutcomes(pool, {
|
|||
btcAsset = null,
|
||||
eureAsset = null,
|
||||
now = Date.now(),
|
||||
preflightLimit = 100,
|
||||
submissionLimit = 500,
|
||||
inventoryLimit = 5000,
|
||||
} = {}) {
|
||||
if (!btcAsset?.assetId || !eureAsset?.assetId) return [];
|
||||
|
||||
const [preflightResult, submissionResult, inventoryResult] = await Promise.all([
|
||||
pool.query(`
|
||||
SELECT DISTINCT ON (payload->>'request_id')
|
||||
event_id, observed_at, ingested_at, payload
|
||||
FROM intent_request_preflights
|
||||
WHERE COALESCE(payload->>'request_id', '') <> ''
|
||||
ORDER BY payload->>'request_id', COALESCE(observed_at, ingested_at) DESC
|
||||
`),
|
||||
pool.query(`
|
||||
SELECT event_id, observed_at, ingested_at, payload
|
||||
FROM intent_request_submission_results
|
||||
ORDER BY COALESCE(observed_at, ingested_at) ASC
|
||||
`),
|
||||
pool.query(`
|
||||
SELECT event_id, observed_at, ingested_at, payload
|
||||
FROM intent_inventory_snapshots
|
||||
ORDER BY COALESCE(observed_at, ingested_at) ASC
|
||||
`),
|
||||
]);
|
||||
const safePreflightLimit = Math.max(1, Number(preflightLimit) || 100);
|
||||
const safeSubmissionLimit = Math.max(1, Number(submissionLimit) || 500);
|
||||
const safeInventoryLimit = Math.max(1, Number(inventoryLimit) || 5000);
|
||||
const preflightResult = await loadIntentRequestPreflightRefreshCandidates(pool, {
|
||||
limit: safePreflightLimit,
|
||||
});
|
||||
if (!preflightResult.rows.length) return [];
|
||||
|
||||
const requestIds = [
|
||||
...new Set(preflightResult.rows
|
||||
.map((row) => row.payload?.request_id)
|
||||
.filter(Boolean)),
|
||||
];
|
||||
if (!requestIds.length) return [];
|
||||
|
||||
const submissions = await loadIntentRequestSubmissionsForRefresh(pool, {
|
||||
requestIds,
|
||||
limit: safeSubmissionLimit,
|
||||
});
|
||||
const inventorySnapshots = await loadIntentInventorySnapshotsForRequestRefresh(pool, {
|
||||
submissions: submissions.rows,
|
||||
limit: safeInventoryLimit,
|
||||
});
|
||||
|
||||
const records = deriveIntentRequestOutcomeRecords({
|
||||
preflights: preflightResult.rows,
|
||||
submissions: submissionResult.rows,
|
||||
inventorySnapshots: inventoryResult.rows,
|
||||
submissions: submissions.rows,
|
||||
inventorySnapshots: inventorySnapshots.rows,
|
||||
btcAsset,
|
||||
eureAsset,
|
||||
now,
|
||||
|
|
@ -2500,6 +2513,136 @@ export async function refreshIntentRequestOutcomes(pool, {
|
|||
return records;
|
||||
}
|
||||
|
||||
async function loadIntentRequestPreflightRefreshCandidates(pool, { limit }) {
|
||||
return pool.query(
|
||||
`
|
||||
WITH latest_preflights AS (
|
||||
SELECT DISTINCT ON (payload->>'request_id')
|
||||
event_id,
|
||||
observed_at,
|
||||
ingested_at,
|
||||
payload
|
||||
FROM intent_request_preflights
|
||||
WHERE COALESCE(payload->>'request_id', '') <> ''
|
||||
ORDER BY payload->>'request_id', COALESCE(observed_at, ingested_at) DESC
|
||||
), latest_submissions AS (
|
||||
SELECT DISTINCT ON (payload->>'request_id')
|
||||
payload->>'request_id' AS request_id,
|
||||
observed_at,
|
||||
ingested_at,
|
||||
payload
|
||||
FROM intent_request_submission_results
|
||||
WHERE COALESCE(payload->>'request_id', '') <> ''
|
||||
ORDER BY
|
||||
payload->>'request_id',
|
||||
COALESCE(observed_at, ingested_at) DESC,
|
||||
CASE payload->>'status'
|
||||
WHEN 'accepted_by_relay' THEN 0
|
||||
WHEN 'failed' THEN 1
|
||||
WHEN 'blocked' THEN 2
|
||||
WHEN 'submit_requested' THEN 3
|
||||
ELSE 4
|
||||
END
|
||||
), refresh_candidates AS (
|
||||
SELECT
|
||||
p.event_id,
|
||||
p.observed_at,
|
||||
p.ingested_at,
|
||||
p.payload,
|
||||
COALESCE(
|
||||
s.observed_at,
|
||||
s.ingested_at,
|
||||
p.observed_at,
|
||||
p.ingested_at
|
||||
) AS refresh_sort_at
|
||||
FROM latest_preflights p
|
||||
LEFT JOIN latest_submissions s
|
||||
ON s.request_id = p.payload->>'request_id'
|
||||
LEFT JOIN ${INTENT_REQUEST_OUTCOMES_TABLE} o
|
||||
ON o.request_id = p.payload->>'request_id'
|
||||
WHERE o.request_id IS NULL
|
||||
OR o.outcome_status = ANY($1::text[])
|
||||
OR COALESCE(
|
||||
s.observed_at,
|
||||
s.ingested_at,
|
||||
p.observed_at,
|
||||
p.ingested_at
|
||||
) > o.computed_at
|
||||
ORDER BY refresh_sort_at DESC
|
||||
LIMIT $2
|
||||
)
|
||||
SELECT event_id, observed_at, ingested_at, payload
|
||||
FROM refresh_candidates
|
||||
ORDER BY COALESCE(observed_at, ingested_at) ASC
|
||||
`,
|
||||
[REFRESHABLE_INTENT_REQUEST_OUTCOME_STATUSES, limit],
|
||||
);
|
||||
}
|
||||
|
||||
async function loadIntentRequestSubmissionsForRefresh(pool, { requestIds, limit }) {
|
||||
return pool.query(
|
||||
`
|
||||
SELECT event_id, observed_at, ingested_at, payload
|
||||
FROM intent_request_submission_results
|
||||
WHERE payload->>'request_id' = ANY($1::text[])
|
||||
ORDER BY COALESCE(observed_at, ingested_at) ASC
|
||||
LIMIT $2
|
||||
`,
|
||||
[requestIds, limit],
|
||||
);
|
||||
}
|
||||
|
||||
async function loadIntentInventorySnapshotsForRequestRefresh(pool, {
|
||||
submissions,
|
||||
limit,
|
||||
}) {
|
||||
const anchorAt = earliestIntentRequestInventoryAnchor({
|
||||
submissions,
|
||||
});
|
||||
if (!anchorAt) return { rows: [] };
|
||||
|
||||
return pool.query(
|
||||
`
|
||||
WITH previous_snapshot AS (
|
||||
SELECT event_id, observed_at, ingested_at, quote_id, payload, COALESCE(observed_at, ingested_at) AS sort_at
|
||||
FROM intent_inventory_snapshots
|
||||
WHERE COALESCE(observed_at, ingested_at) < $1
|
||||
ORDER BY COALESCE(observed_at, ingested_at) DESC
|
||||
LIMIT 1
|
||||
), following_snapshots AS (
|
||||
SELECT event_id, observed_at, ingested_at, quote_id, payload, COALESCE(observed_at, ingested_at) AS sort_at
|
||||
FROM intent_inventory_snapshots
|
||||
WHERE COALESCE(observed_at, ingested_at) >= $1
|
||||
ORDER BY COALESCE(observed_at, ingested_at) ASC
|
||||
LIMIT $2
|
||||
), bounded_snapshots AS (
|
||||
SELECT event_id, observed_at, ingested_at, quote_id, payload, sort_at
|
||||
FROM previous_snapshot
|
||||
UNION ALL
|
||||
SELECT event_id, observed_at, ingested_at, quote_id, payload, sort_at
|
||||
FROM following_snapshots
|
||||
)
|
||||
SELECT event_id, observed_at, ingested_at, quote_id, payload
|
||||
FROM bounded_snapshots
|
||||
ORDER BY sort_at ASC
|
||||
`,
|
||||
[anchorAt, limit],
|
||||
);
|
||||
}
|
||||
|
||||
function earliestIntentRequestInventoryAnchor({ submissions = [] } = {}) {
|
||||
const anchors = submissions
|
||||
.map((row) => row.payload?.submitted_at || row.observed_at || row.ingested_at)
|
||||
.map((value) => {
|
||||
const timestamp = timestampValue(value);
|
||||
return Number.isFinite(timestamp) ? timestamp : null;
|
||||
})
|
||||
.filter((value) => value != null)
|
||||
.sort((left, right) => left - right);
|
||||
|
||||
return anchors.length ? new Date(anchors[0]).toISOString() : null;
|
||||
}
|
||||
|
||||
export async function upsertIntentRequestOutcome(pool, {
|
||||
request_id,
|
||||
idempotency_key,
|
||||
|
|
@ -2583,8 +2726,9 @@ export async function loadRecentIntentRequests(pool, {
|
|||
btcAsset = null,
|
||||
eureAsset = null,
|
||||
now = Date.now(),
|
||||
refreshOutcomes = true,
|
||||
} = {}) {
|
||||
if (btcAsset?.assetId && eureAsset?.assetId) {
|
||||
if (refreshOutcomes && btcAsset?.assetId && eureAsset?.assetId) {
|
||||
await refreshIntentRequestOutcomes(pool, { btcAsset, eureAsset, now }).catch(() => []);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -33,3 +33,7 @@ test('operator dashboard API auth failures are JSON for frontend fetches', () =>
|
|||
assert.match(source, /req\.url \|\| ''\)\.startsWith\('\/api\/'\)/);
|
||||
assert.match(source, /sendJson\(res, 401, \{ error: 'authentication_required' \}\)/);
|
||||
});
|
||||
|
||||
test('operator dashboard bootstrap does not synchronously refresh intent request outcomes', () => {
|
||||
assert.match(source, /loadRecentIntentRequests\(pool, \{[\s\S]*refreshOutcomes: false/);
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,8 +2,10 @@ import test from 'node:test';
|
|||
import assert from 'node:assert/strict';
|
||||
|
||||
import {
|
||||
loadRecentIntentRequests,
|
||||
loadIntentRequestSubmissionsForStatusRefresh,
|
||||
normalizeIntentRequestRow,
|
||||
refreshIntentRequestOutcomes,
|
||||
} from '../src/lib/postgres.mjs';
|
||||
|
||||
test('intent request normalization prefers terminal outcome reason text over relay acceptance text', () => {
|
||||
|
|
@ -152,3 +154,147 @@ test('intent request normalization explains unanswered solver quotes in plain te
|
|||
assert.equal(row.reason_text, 'The relay returned no solver quotes for this request.');
|
||||
assert.equal(row.live_submit_capable, false);
|
||||
});
|
||||
|
||||
test('intent request outcome refresh bounds source queries to refresh candidates', async () => {
|
||||
const btcAsset = { assetId: 'nep141:nbtc.bridge.near' };
|
||||
const eureAsset = { assetId: 'nep141:eure.omft.near' };
|
||||
const queries = [];
|
||||
const pool = {
|
||||
async query(sql, params = []) {
|
||||
queries.push({ sql, params });
|
||||
|
||||
if (sql.includes('WITH latest_preflights')) {
|
||||
assert.match(sql, /outcome_status = ANY\(\$1::text\[\]\)/);
|
||||
assert.match(sql, /LIMIT \$2/);
|
||||
assert.equal(params[1], 7);
|
||||
return {
|
||||
rows: [
|
||||
eventRow({
|
||||
eventId: 'preflight-1',
|
||||
at: '2026-05-13T10:00:00.000Z',
|
||||
payload: {
|
||||
request_id: 'request-1',
|
||||
state: 'draft',
|
||||
source_asset_id: eureAsset.assetId,
|
||||
destination_asset_id: btcAsset.assetId,
|
||||
source_amount_units: '1000000000000000000',
|
||||
selected_quote: { amount_out: '1000' },
|
||||
created_at: '2026-05-13T10:00:00.000Z',
|
||||
},
|
||||
}),
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
if (sql.includes('FROM intent_request_submission_results')) {
|
||||
assert.match(sql, /payload->>'request_id' = ANY\(\$1::text\[\]\)/);
|
||||
assert.match(sql, /LIMIT \$2/);
|
||||
assert.deepEqual(params[0], ['request-1']);
|
||||
assert.equal(params[1], 8);
|
||||
return {
|
||||
rows: [
|
||||
eventRow({
|
||||
eventId: 'submission-1',
|
||||
at: '2026-05-13T10:00:05.000Z',
|
||||
payload: {
|
||||
request_id: 'request-1',
|
||||
status: 'accepted_by_relay',
|
||||
submitted_at: '2026-05-13T10:00:05.000Z',
|
||||
destination_amount_units: '1000',
|
||||
},
|
||||
}),
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
if (sql.includes('FROM intent_inventory_snapshots')) {
|
||||
assert.match(sql, /LIMIT \$2/);
|
||||
assert.equal(params[0], '2026-05-13T10:00:05.000Z');
|
||||
assert.equal(params[1], 9);
|
||||
return {
|
||||
rows: [
|
||||
eventRow({
|
||||
eventId: 'inventory-1',
|
||||
at: '2026-05-13T09:59:59.000Z',
|
||||
payload: {
|
||||
inventory_id: 'inventory-1',
|
||||
spendable: {
|
||||
[btcAsset.assetId]: '0',
|
||||
[eureAsset.assetId]: '1000000000000000000',
|
||||
},
|
||||
},
|
||||
}),
|
||||
eventRow({
|
||||
eventId: 'inventory-2',
|
||||
at: '2026-05-13T10:00:06.000Z',
|
||||
payload: {
|
||||
inventory_id: 'inventory-2',
|
||||
spendable: {
|
||||
[btcAsset.assetId]: '1000',
|
||||
[eureAsset.assetId]: '0',
|
||||
},
|
||||
},
|
||||
}),
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
if (sql.includes('INSERT INTO intent_request_outcomes')) {
|
||||
return { rows: [], rowCount: 1 };
|
||||
}
|
||||
|
||||
throw new Error(`unexpected query: ${sql}`);
|
||||
},
|
||||
};
|
||||
|
||||
const records = await refreshIntentRequestOutcomes(pool, {
|
||||
btcAsset,
|
||||
eureAsset,
|
||||
now: Date.parse('2026-05-13T10:00:20.000Z'),
|
||||
preflightLimit: 7,
|
||||
submissionLimit: 8,
|
||||
inventoryLimit: 9,
|
||||
});
|
||||
|
||||
assert.equal(records.length, 1);
|
||||
assert.equal(records[0].request_id, 'request-1');
|
||||
assert.equal(records[0].outcome_status, 'completed');
|
||||
assert.equal(queries.filter((entry) => entry.sql.includes('INSERT INTO intent_request_outcomes')).length, 1);
|
||||
});
|
||||
|
||||
test('recent intent request loader can skip synchronous outcome refresh for dashboard bootstrap', async () => {
|
||||
const queries = [];
|
||||
const pool = {
|
||||
async query(sql, params = []) {
|
||||
queries.push({ sql, params });
|
||||
assert.match(sql, /WITH latest_preflights AS/);
|
||||
assert.doesNotMatch(sql, /refresh_candidates/);
|
||||
assert.equal(params[0], 3);
|
||||
return { rows: [] };
|
||||
},
|
||||
};
|
||||
|
||||
const rows = await loadRecentIntentRequests(pool, {
|
||||
limit: 3,
|
||||
btcAsset: { assetId: 'nep141:nbtc.bridge.near' },
|
||||
eureAsset: { assetId: 'nep141:eure.omft.near' },
|
||||
refreshOutcomes: false,
|
||||
});
|
||||
|
||||
assert.deepEqual(rows, []);
|
||||
assert.equal(queries.length, 1);
|
||||
});
|
||||
|
||||
function eventRow({
|
||||
eventId,
|
||||
at,
|
||||
payload,
|
||||
}) {
|
||||
return {
|
||||
event_id: eventId,
|
||||
observed_at: at,
|
||||
ingested_at: at,
|
||||
quote_id: null,
|
||||
payload,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue