diff --git a/src/apps/trade-executor.mjs b/src/apps/trade-executor.mjs index 2462edf..56c39f8 100644 --- a/src/apps/trade-executor.mjs +++ b/src/apps/trade-executor.mjs @@ -20,6 +20,7 @@ import { ensureHistorySchema, insertHistoryEvent, loadIntentRequestPreflightByIdOrKey, + loadIntentRequestSubmissionsForStatusRefresh, loadLatestIntentRequestSubmission, loadLatestInventorySnapshot, loadLatestMarketPrice, @@ -111,6 +112,8 @@ const state = { accepted_count: 0, blocked_count: 0, failed_count: 0, + maker_suppressed: false, + maker_suppressed_until: null, }, }; @@ -122,9 +125,27 @@ const requestController = createIntentRequestController({ signer, isArmed: () => state.armed, isPaused: () => state.paused, + withMakerSuppressed, logger: logger.child({ component: 'intent-request-controller' }), }); +async function withMakerSuppressed(operation) { + const previousSuppressed = state.request_creation.maker_suppressed; + const previousUntil = state.request_creation.maker_suppressed_until; + state.request_creation.maker_suppressed = true; + state.request_creation.maker_suppressed_until = new Date(Date.now() + (config.intentRequestQuoteTimeoutMs || config.executorResponseTimeoutMs || 10_000)).toISOString(); + try { + return await operation(); + } finally { + state.request_creation.maker_suppressed = previousSuppressed; + state.request_creation.maker_suppressed_until = previousUntil; + } +} + +function isMakerSuppressedForOwnRequest() { + return state.request_creation.maker_suppressed === true; +} + await consumer.subscribe({ topic: config.kafkaTopicCmdExecuteTrade, fromBeginning: false }); await consumer.run({ eachMessage: async ({ message }) => { @@ -161,6 +182,15 @@ async function handleCommand(event) { return; } + if (isMakerSuppressedForOwnRequest()) { + await publishResult(payload, { + status: 'rejected', + result_code: 'own_request_preflight_in_progress', + note: 'Own request preflight is suppressing maker quote responses to avoid self-matching.', + }); + return; + } + if (state.paused) return; if (!state.armed) { @@ -385,6 +415,9 @@ function createIntentRequestStore() { findSubmissionByRequest: ({ requestId } = {}) => ( loadLatestIntentRequestSubmission(requestPool, { requestId }) ), + loadSubmissionsForStatusRefresh: () => ( + loadIntentRequestSubmissionsForStatusRefresh(requestPool, { limit: 20 }) + ), async insertPreflight(payload) { const event = buildEventEnvelope({ source: 'trade-executor', @@ -410,7 +443,7 @@ function createIntentRequestStore() { source: 'trade-executor', venue: 'near-intents', eventType: 'intent_request_submission_result', - observedAt: payload.submitted_at, + observedAt: payload.status_checked_at || payload.submitted_at, payload, }); assertIntentRequestSubmissionResultEvent(event); diff --git a/src/core/intent-request-controller.mjs b/src/core/intent-request-controller.mjs index a5c4d2e..31f2476 100644 --- a/src/core/intent-request-controller.mjs +++ b/src/core/intent-request-controller.mjs @@ -24,6 +24,7 @@ export function createIntentRequestController({ isPaused = () => false, now = () => Date.now(), uuid = () => crypto.randomUUID(), + withMakerSuppressed = async (operation) => operation(), logger = null, } = {}) { if (!config) throw new Error('config is required'); @@ -126,14 +127,17 @@ export function createIntentRequestController({ }); minDestinationAmountUnits = applySlippageBps(expectedDestinationAmountUnits, slippageBps); - solverQuoteResponse = await relayRpcClient.quote( - buildSolverQuoteRequest({ - sourceAssetId: sourceAsset.assetId, - destinationAssetId: destinationAsset.assetId, - sourceAmountUnits, - minDeadlineMs, - }), - { timeoutMs: config.intentRequestQuoteTimeoutMs || config.executorResponseTimeoutMs }, + solverQuoteResponse = await withMakerSuppressed( + () => relayRpcClient.quote( + buildSolverQuoteRequest({ + sourceAssetId: sourceAsset.assetId, + destinationAssetId: destinationAsset.assetId, + sourceAmountUnits, + minDeadlineMs, + }), + { timeoutMs: config.intentRequestQuoteTimeoutMs || config.executorResponseTimeoutMs }, + ), + { requestId, idempotencyKey }, ); solverQuotes = normalizeSolverQuotes(solverQuoteResponse); selectedQuote = selectBestSolverQuote(solverQuotes, { minDestinationAmountUnits }); @@ -345,8 +349,49 @@ export function createIntentRequestController({ } async function refreshOutcomes() { + const refreshedStatuses = await refreshRelayStatuses(); const outcomes = await store.refreshOutcomes?.(); - return { ok: true, outcomes: outcomes || [] }; + return { ok: true, refreshed_statuses: refreshedStatuses, outcomes: outcomes || [] }; + } + + async function refreshRelayStatuses() { + const submissions = await store.loadSubmissionsForStatusRefresh?.(); + if (!submissions?.length) return []; + + const refreshed = []; + for (const submission of submissions) { + if (!submission?.request_id || !submission?.intent_hash) continue; + const preflight = await store.findPreflight({ + requestId: submission.request_id, + idempotencyKey: submission.idempotency_key || null, + }); + if (!preflight) continue; + + const statusCheckedAt = new Date(now()).toISOString(); + const relayStatusResponse = await relayRpcClient.getStatus( + submission.intent_hash, + { timeoutMs: config.intentRequestStatusTimeoutMs || config.executorResponseTimeoutMs }, + ).catch((error) => ({ error: serializeError(error) })); + const relayStatus = relayStatusResponse?.status || submission.relay_status || null; + const result = await recordSubmissionResult(preflight, { + submissionId: submission.submission_id || null, + status: 'accepted_by_relay', + result_code: 'relay_status_refreshed', + result_text: relayStatus === 'SETTLED' + ? 'Relay reports the intent settled. This is checked against durable inventory before completion.' + : 'Relay status was refreshed. This is not settlement by itself.', + submitted_at: submission.submitted_at || statusCheckedAt, + quote_hash: submission.quote_hash || null, + intent_hash: submission.intent_hash, + destination_amount_units: submission.destination_amount_units || null, + nonce: submission.nonce || null, + relay_status: relayStatus, + relay_status_response: relayStatusResponse, + status_checked_at: statusCheckedAt, + }); + refreshed.push(result); + } + return refreshed; } async function recordSubmissionResult(preflight, { diff --git a/src/core/intent-request-outcomes.mjs b/src/core/intent-request-outcomes.mjs index a2e3be9..933dc18 100644 --- a/src/core/intent-request-outcomes.mjs +++ b/src/core/intent-request-outcomes.mjs @@ -183,6 +183,27 @@ function deriveOneOutcome({ }); } + if (submission.relay_status === 'SETTLED' && hasFreshInventoryAfterStatusCheck({ submission, latestInventoryAt })) { + return baseOutcomeRecord({ + preflight, + submission, + outcome_status: 'failed', + outcome_observed_at: latestInventoryAt || submission.status_checked_at || submission.submitted_at, + outcome_source: 'solver_relay_status_and_inventory_snapshots', + outcome_reason: 'relay_settled_without_expected_inventory_delta', + attribution_status: 'unattributed', + attribution_method: null, + attributed_inventory_delta: null, + evidence: { + relay_status: submission.relay_status, + relay_status_response: submission.relay_status_response, + latest_inventory_observed_at: latestInventoryAt, + uncertainty: + 'Relay reported settlement, but durable inventory did not show the expected source decrease and destination increase.', + }, + }); + } + if (submission.relay_status === 'NOT_FOUND_OR_NOT_VALID') { return baseOutcomeRecord({ preflight, @@ -335,6 +356,14 @@ function movementMatchesExpectedDelta({ return true; } +function hasFreshInventoryAfterStatusCheck({ submission, latestInventoryAt }) { + const statusCheckedTs = timestampValue(submission.status_checked_at || submission.submitted_at); + const latestInventoryTs = timestampValue(latestInventoryAt); + return Number.isFinite(statusCheckedTs) + && Number.isFinite(latestInventoryTs) + && latestInventoryTs >= statusCheckedTs; +} + function getExpiredSettlementWindow({ submission, preflight, diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index 7744ada..21211ab 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -567,6 +567,43 @@ export async function loadLatestIntentRequestSubmission(pool, { requestId } = {} return normalizeEventPayloadRow(result.rows[0])?.payload || null; } +export async function loadIntentRequestSubmissionsForStatusRefresh(pool, { + limit = 20, +} = {}) { + const result = await pool.query( + ` + WITH latest_submissions AS ( + SELECT DISTINCT ON (payload->>'request_id') + observed_at, ingested_at, payload + FROM intent_request_submission_results + WHERE COALESCE(payload->>'request_id', '') <> '' + ORDER BY + payload->>'request_id', + COALESCE(observed_at, ingested_at) DESC, + CASE payload->>'status' + WHEN 'accepted_by_relay' THEN 0 + WHEN 'failed' THEN 1 + WHEN 'blocked' THEN 2 + WHEN 'submit_requested' THEN 3 + ELSE 4 + END + ) + SELECT observed_at, ingested_at, payload + FROM latest_submissions + WHERE payload->>'status' = 'accepted_by_relay' + AND COALESCE(payload->>'intent_hash', '') <> '' + AND COALESCE(payload->>'relay_status', '') NOT IN ('SETTLED', 'NOT_FOUND_OR_NOT_VALID') + ORDER BY COALESCE(observed_at, ingested_at) DESC + LIMIT $1 + `, + [Math.max(1, Number(limit) || 20)], + ); + + return result.rows + .map((row) => normalizeIntentRequestSubmissionPayload(normalizeEventPayloadRow(row)?.payload || null)) + .filter(Boolean); +} + export async function refreshIntentRequestOutcomes(pool, { btcAsset = null, eureAsset = null, @@ -1211,6 +1248,26 @@ function normalizeEventPayloadRow(row) { }; } +function normalizeIntentRequestSubmissionPayload(payload = null) { + if (!payload) return null; + return { + request_id: payload.request_id || null, + idempotency_key: payload.idempotency_key || null, + submission_id: payload.submission_id || null, + status: payload.status || null, + result_code: payload.result_code || null, + result_text: payload.result_text || null, + quote_hash: payload.quote_hash || null, + intent_hash: payload.intent_hash || null, + destination_amount_units: payload.destination_amount_units || null, + nonce: payload.nonce || null, + submitted_at: toIsoTimestamp(payload.submitted_at), + relay_status: payload.relay_status || null, + relay_status_response: payload.relay_status_response || null, + status_checked_at: toIsoTimestamp(payload.status_checked_at), + }; +} + export function normalizeIntentRequestRow(row) { const preflight = row.preflight_payload || {}; const submission = row.submission_payload || null; @@ -1304,6 +1361,8 @@ function humanizeIntentRequestReason(reason) { 'More than one inventory movement could match this request; no completion is assigned.', relay_not_found_or_not_valid: 'Relay reported the intent as not found or not valid.', + relay_settled_without_expected_inventory_delta: + 'Relay reports settlement, but durable inventory does not show the expected EURe decrease and BTC increase.', }; return labels[normalized] || normalized.replaceAll('_', ' '); } diff --git a/test/intent-request-outcomes.test.mjs b/test/intent-request-outcomes.test.mjs index 3b328ef..81c032f 100644 --- a/test/intent-request-outcomes.test.mjs +++ b/test/intent-request-outcomes.test.mjs @@ -63,7 +63,7 @@ function outcomes({ preflights = [preflight()], submissions = [], inventorySnaps }); } -test('request submitted or relay accepted does not become completed without inventory delta', () => { +test('relay settled without expected inventory delta is failed, not completed', () => { const [record] = outcomes({ submissions: [submission({ relay_status: 'SETTLED' })], inventorySnapshots: [ @@ -86,11 +86,31 @@ test('request submitted or relay accepted does not become completed without inve assert.equal(record.submission_status, 'accepted_by_relay'); assert.equal(record.relay_status, 'SETTLED'); - assert.equal(record.outcome_status, 'awaiting_settlement'); - assert.equal(record.outcome_reason, 'relay_settled_but_inventory_delta_missing'); + assert.equal(record.outcome_status, 'failed'); + assert.equal(record.outcome_reason, 'relay_settled_without_expected_inventory_delta'); assert.equal(record.attribution_status, 'unattributed'); assert.equal(record.attributed_inventory_delta, null); assert.notEqual(record.outcome_status, 'completed'); + assert.notEqual(record.outcome_status, 'not_filled'); +}); + +test('accepted pending request without inventory delta remains awaiting settlement', () => { + const [record] = outcomes({ + submissions: [submission({ relay_status: 'PENDING' })], + inventorySnapshots: [ + { + observed_at: '2026-04-12T10:00:20.000Z', + spendable: { + [EURE.assetId]: '5000000000000000000', + [BTC.assetId]: '0', + }, + }, + ], + }); + + assert.equal(record.outcome_status, 'awaiting_settlement'); + assert.equal(record.outcome_reason, 'accepted_by_relay_without_settlement'); + assert.equal(record.attributed_inventory_delta, null); }); test('completed request requires exact EURe decrease and BTC increase after submission', () => { diff --git a/test/intent-requests.test.mjs b/test/intent-requests.test.mjs index 151465f..8c9951e 100644 --- a/test/intent-requests.test.mjs +++ b/test/intent-requests.test.mjs @@ -122,7 +122,7 @@ function buildRelay() { }; } -function buildController({ store = buildStore(), relay = buildRelay(), armed = true, verifierRegistered = true } = {}) { +function buildController({ store = buildStore(), relay = buildRelay(), armed = true, verifierRegistered = true, withMakerSuppressed = async (operation) => operation() } = {}) { return { store, relay, @@ -137,6 +137,7 @@ function buildController({ store = buildStore(), relay = buildRelay(), armed = t signer: KeyPair.fromRandom('ed25519'), isArmed: () => armed, isPaused: () => false, + withMakerSuppressed, now: () => Date.parse('2026-04-12T10:00:00.000Z'), uuid: (() => { let next = 1; @@ -273,3 +274,67 @@ test('relay publish failure records submit_requested first and never reports com assert.equal(store.submissions[1].status, 'failed'); assert.notEqual(result.submission_result.status, 'completed'); }); + + +test('preflight suppresses maker responses while collecting solver quotes', async () => { + let suppressed = false; + let wrapperCalls = 0; + const relay = buildRelay(); + relay.quote = async function quote() { + this.quoteCalls += 1; + assert.equal(suppressed, true); + return [{ + quote_hash: 'external-quote-hash', + amount_out: '10000', + expiration_time: '2026-04-12T10:01:00.000Z', + }]; + }; + + const { controller } = buildController({ + relay, + withMakerSuppressed: async (operation, context) => { + wrapperCalls += 1; + assert.match(context.requestId, /^id-/); + suppressed = true; + try { + return await operation(); + } finally { + suppressed = false; + } + }, + }); + + const preflight = await controller.preflight({ amount_eure: '5', slippage_bps: 200 }); + + assert.equal(preflight.state, 'draft'); + assert.equal(preflight.selected_quote.quote_hash, 'external-quote-hash'); + assert.equal(wrapperCalls, 1); + assert.equal(suppressed, false); +}); + + +test('refreshOutcomes refreshes relay status before recomputing settlement truth', async () => { + const { controller, store, relay } = buildController(); + const preflight = await controller.preflight({ amount_eure: '5', slippage_bps: 200 }); + await controller.submit({ request_id: preflight.request_id }); + const accepted = store.submissions.find((entry) => entry.status === 'accepted_by_relay'); + store.loadSubmissionsForStatusRefresh = async () => [accepted]; + store.refreshOutcomes = async () => [{ request_id: preflight.request_id, outcome_status: 'failed' }]; + relay.getStatus = async () => ({ + intent_hash: accepted.intent_hash, + status: 'SETTLED', + data: { hash: 'settlement-tx-hash' }, + }); + + const refreshed = await controller.refreshOutcomes(); + const statusRefresh = store.submissions.at(-1); + + assert.equal(refreshed.refreshed_statuses.length, 1); + assert.equal(refreshed.outcomes[0].outcome_status, 'failed'); + assert.equal(statusRefresh.status, 'accepted_by_relay'); + assert.equal(statusRefresh.result_code, 'relay_status_refreshed'); + assert.equal(statusRefresh.relay_status, 'SETTLED'); + assert.equal(statusRefresh.relay_status_response.data.hash, 'settlement-tx-hash'); + assert.equal(statusRefresh.submitted_at, accepted.submitted_at); + assert.ok(statusRefresh.status_checked_at); +}); diff --git a/test/postgres-intent-requests.test.mjs b/test/postgres-intent-requests.test.mjs index 3e060ba..6b4f586 100644 --- a/test/postgres-intent-requests.test.mjs +++ b/test/postgres-intent-requests.test.mjs @@ -1,7 +1,10 @@ import test from 'node:test'; import assert from 'node:assert/strict'; -import { normalizeIntentRequestRow } from '../src/lib/postgres.mjs'; +import { + loadIntentRequestSubmissionsForStatusRefresh, + normalizeIntentRequestRow, +} from '../src/lib/postgres.mjs'; test('intent request normalization prefers terminal outcome reason text over relay acceptance text', () => { const row = normalizeIntentRequestRow({ @@ -66,3 +69,49 @@ test('intent request normalization prefers terminal outcome reason text over rel assert.doesNotMatch(row.reason_text, /Relay accepted the signed request/i); assert.equal(row.has_settlement_evidence, false); }); + + +test('intent request status refresh loader normalizes accepted relay submissions', async () => { + const queries = []; + const pool = { + async query(sql, params) { + queries.push({ sql, params }); + return { + rows: [ + { + observed_at: '2026-04-12T16:45:45.000Z', + ingested_at: '2026-04-12T16:45:46.000Z', + payload: { + request_id: 'request-1', + idempotency_key: 'intent-request:request-1', + submission_id: 'submission-1', + status: 'accepted_by_relay', + result_code: 'publish_intent_accepted', + result_text: 'Relay accepted the signed request. This is not settlement.', + submitted_at: '2026-04-12T16:45:43.133Z', + intent_hash: 'intent-hash-1', + quote_hash: 'quote-hash-1', + destination_amount_units: '8214', + nonce: 'nonce-1', + relay_status: 'PENDING', + relay_status_response: { status: 'PENDING' }, + status_checked_at: '2026-04-12T16:45:44.000Z', + }, + }, + ], + }; + }, + }; + + const [row] = await loadIntentRequestSubmissionsForStatusRefresh(pool, { limit: 3 }); + + assert.equal(queries[0].params[0], 3); + assert.equal(row.request_id, 'request-1'); + assert.equal(row.idempotency_key, 'intent-request:request-1'); + assert.equal(row.status, 'accepted_by_relay'); + assert.equal(row.intent_hash, 'intent-hash-1'); + assert.equal(row.relay_status, 'PENDING'); + assert.equal(row.destination_amount_units, '8214'); + assert.equal(row.submitted_at, '2026-04-12T16:45:43.133Z'); + assert.equal(row.status_checked_at, '2026-04-12T16:45:44.000Z'); +}); diff --git a/test/trade-executor-static.test.mjs b/test/trade-executor-static.test.mjs new file mode 100644 index 0000000..d82cc7f --- /dev/null +++ b/test/trade-executor-static.test.mjs @@ -0,0 +1,16 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; +import { readFileSync } from 'node:fs'; + +const source = readFileSync(new URL('../src/apps/trade-executor.mjs', import.meta.url), 'utf8'); + +test('trade executor dispatches each execute command once', () => { + const calls = source.match(/await handleCommand\(event\);/g) || []; + assert.equal(calls.length, 1); +}); + +test('own request preflight suppresses maker quote responses to avoid self-matching', () => { + assert.match(source, /withMakerSuppressed/); + assert.match(source, /own_request_preflight_in_progress/); + assert.match(source, /avoid self-matching/); +});