From 92aa636dc01a7c70e8bc07af02775142f737f308 Mon Sep 17 00:00:00 2001 From: philipp Date: Mon, 18 May 2026 12:51:25 +0200 Subject: [PATCH] Restore live trading under quote pressure Proof: npm test (209/209) covers stale command expiry, bounded executor state, bounded strategy quote cache, bounded quote outcome refresh, and resource guardrails. Assumptions: current DB pair config and armed state remain the operator-approved live trading controls; stale quote commands are unsafe to submit after their min_deadline_ms. Still fake: quote outcomes still infer fills from inventory deltas rather than a venue-native terminal fill event. --- deploy/k8s/base/unrip.yaml | 8 + src/apps/history-writer.mjs | 7 +- src/apps/strategy-engine.mjs | 8 +- src/apps/trade-executor.mjs | 24 ++- src/core/executor-command-expiry.mjs | 54 +++++++ src/core/executor-state-store.mjs | 78 +++++++++- src/core/recent-id-cache.mjs | 30 ++++ src/lib/postgres.mjs | 45 ++++-- test/executor-command-expiry.test.mjs | 45 ++++++ test/executor-state-store.test.mjs | 34 +++++ test/history-writer-static.test.mjs | 6 +- test/postgres-quote-outcomes-refresh.test.mjs | 143 ++++++++++++++++++ test/recent-id-cache.test.mjs | 22 +++ test/strategy-engine-static.test.mjs | 12 ++ test/trade-executor-static.test.mjs | 11 ++ .../unrip-resource-guardrails-static.test.mjs | 2 +- 16 files changed, 503 insertions(+), 26 deletions(-) create mode 100644 src/core/executor-command-expiry.mjs create mode 100644 src/core/recent-id-cache.mjs create mode 100644 test/executor-command-expiry.test.mjs create mode 100644 test/postgres-quote-outcomes-refresh.test.mjs create mode 100644 test/recent-id-cache.test.mjs create mode 100644 test/strategy-engine-static.test.mjs diff --git a/deploy/k8s/base/unrip.yaml b/deploy/k8s/base/unrip.yaml index 62d2894..47fb216 100644 --- a/deploy/k8s/base/unrip.yaml +++ b/deploy/k8s/base/unrip.yaml @@ -547,6 +547,9 @@ spec: image: ghcr.io/example/unrip:bootstrap imagePullPolicy: IfNotPresent command: ["node", "src/apps/trade-executor.mjs"] + env: + - name: NODE_OPTIONS + value: "--max-old-space-size=896" ports: - name: control-api containerPort: 8087 @@ -555,6 +558,11 @@ spec: name: unrip-config - secretRef: name: unrip-secrets + resources: + requests: + memory: 256Mi + limits: + memory: 1280Mi volumeMounts: - name: executor-state mountPath: /var/lib/unrip/executor-state diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index f88a87f..2532390 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -115,7 +115,12 @@ const intentRequestOutcomeTopics = new Set([ ]); for (const topic of topics) { - await consumer.subscribe({ topic, fromBeginning: true }); + // Raw quote volume is a live firehose; replaying retained history can starve + // durable strategy/execution topics and exhaust the writer. + await consumer.subscribe({ + topic, + fromBeginning: topic !== config.kafkaTopicRawNearIntentsQuote, + }); } const state = { diff --git a/src/apps/strategy-engine.mjs b/src/apps/strategy-engine.mjs index 695903c..4d4f28f 100644 --- a/src/apps/strategy-engine.mjs +++ b/src/apps/strategy-engine.mjs @@ -6,6 +6,7 @@ import { createArmedStateStore } from '../core/armed-state-store.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; +import { createRecentIdCache } from '../core/recent-id-cache.mjs'; import { assertInventorySnapshotEvent, assertMarketPriceEvent, assertNormalizedSwapDemand } from '../core/schemas.mjs'; import { evaluateTradeOpportunity } from '../core/strategy.mjs'; import { loadConfig } from '../lib/config.mjs'; @@ -51,6 +52,7 @@ const armedStateStore = createArmedStateStore({ fileName: 'strategy-engine-control.json', initialArmed: config.strategyInitialArmed, }); +const seenQuotes = createRecentIdCache({ limit: 5000 }); await consumer.subscribe({ topic: config.kafkaTopicNormSwapDemand, fromBeginning: false }); await consumer.subscribe({ topic: config.kafkaTopicRefMarketPrice, fromBeginning: false }); @@ -66,7 +68,6 @@ const state = { latest_decision: null, recent_decisions: [], skipped_counts: {}, - seen_quotes: {}, }; await consumer.run({ @@ -109,7 +110,7 @@ async function handleDemand(event) { if (state.paused) return; const tradingConfig = await tradingConfigStore.getConfig(); - if (state.seen_quotes[event.payload.quote_id]) { + if (seenQuotes.has(event.payload.quote_id)) { const pair = tradingConfig.pairByKey?.get(event.payload.pair || `${event.payload.asset_in}->${event.payload.asset_out}`); const strategyConfig = pair?.strategyConfig || null; await publishDecision({ @@ -131,7 +132,7 @@ async function handleDemand(event) { return; } - state.seen_quotes[event.payload.quote_id] = true; + seenQuotes.add(event.payload.quote_id); const evaluation = evaluateTradeOpportunity({ demandEvent: event, @@ -197,6 +198,7 @@ const controlApi = startControlApi({ getState() { return { ...state, + seen_quotes: seenQuotes.getState(), trading_config: tradingConfigStore.getState(), durable_control_state: armedStateStore.getState(), }; diff --git a/src/apps/trade-executor.mjs b/src/apps/trade-executor.mjs index bfff7ac..1a04e9b 100644 --- a/src/apps/trade-executor.mjs +++ b/src/apps/trade-executor.mjs @@ -4,6 +4,7 @@ import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { createArmedStateStore } from '../core/armed-state-store.mjs'; import { startControlApi } from '../core/control-api.mjs'; +import { classifyExecuteCommandExpiry } from '../core/executor-command-expiry.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createExecutorStateStore } from '../core/executor-state-store.mjs'; import { createIntentRequestController } from '../core/intent-request-controller.mjs'; @@ -211,6 +212,27 @@ async function handleCommand(event) { return; } + const expiry = classifyExecuteCommandExpiry(event); + if (expiry.expired) { + stateStore.markFailed(payload.command_id, { + quote_id: payload.quote_id, + error: { + name: 'StaleExecuteTradeCommand', + message: expiry.reason, + }, + }); + await publishResult(payload, { + status: 'rejected', + result_code: 'stale_execute_command', + note: 'execute command deadline elapsed before relay submission', + command_age_ms: expiry.age_ms == null ? null : String(expiry.age_ms), + command_deadline_ms: expiry.deadline_ms == null ? null : String(expiry.deadline_ms), + command_deadline_at: expiry.deadline_at, + stale_reason: expiry.reason, + }); + return; + } + stateStore.markProcessing(payload.command_id, { quote_id: payload.quote_id, idempotency_key: payload.idempotency_key, @@ -308,7 +330,7 @@ const controlApi = startControlApi({ trading_config: tradingConfigStore.getState(), ...state, durable_control_state: armedStateStore.getState(), - durable_state: stateStore.getState(), + durable_state: stateStore.getSummary({ limit: 50 }), }; }, }, diff --git a/src/core/executor-command-expiry.mjs b/src/core/executor-command-expiry.mjs new file mode 100644 index 0000000..ed38c47 --- /dev/null +++ b/src/core/executor-command-expiry.mjs @@ -0,0 +1,54 @@ +const DEFAULT_COMMAND_DEADLINE_MS = 60_000; + +export function classifyExecuteCommandExpiry(event, { now = Date.now() } = {}) { + const payload = event?.payload || {}; + const observedAtMs = parseTimestamp( + event?.observed_at + || payload.quote_observed_at + || payload.decision_at + || event?.ingested_at, + ); + const deadlineMs = parseDeadlineMs(payload.min_deadline_ms); + + if (!Number.isFinite(observedAtMs)) { + return { + expired: true, + reason: 'command_timestamp_missing', + age_ms: null, + deadline_ms: deadlineMs, + deadline_at: null, + }; + } + + if (!Number.isFinite(deadlineMs) || deadlineMs <= 0) { + return { + expired: true, + reason: 'command_deadline_invalid', + age_ms: Math.max(0, now - observedAtMs), + deadline_ms: null, + deadline_at: null, + }; + } + + const deadlineAtMs = observedAtMs + deadlineMs; + const ageMs = Math.max(0, now - observedAtMs); + return { + expired: now > deadlineAtMs, + reason: now > deadlineAtMs ? 'command_deadline_elapsed' : null, + age_ms: ageMs, + deadline_ms: deadlineMs, + deadline_at: new Date(deadlineAtMs).toISOString(), + }; +} + +function parseDeadlineMs(value) { + if (value == null || value === '') return DEFAULT_COMMAND_DEADLINE_MS; + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : Number.NaN; +} + +function parseTimestamp(value) { + if (!value) return Number.NaN; + const parsed = Date.parse(value); + return Number.isFinite(parsed) ? parsed : Number.NaN; +} diff --git a/src/core/executor-state-store.mjs b/src/core/executor-state-store.mjs index 6a607fa..c2a151c 100644 --- a/src/core/executor-state-store.mjs +++ b/src/core/executor-state-store.mjs @@ -2,14 +2,20 @@ import { createJsonStateStore } from './json-state-store.mjs'; const INITIAL_STATE = { commands: {}, + evicted_count: 0, }; -export function createExecutorStateStore({ stateDir, fileName = 'trade-executor-commands.json' }) { +export function createExecutorStateStore({ + stateDir, + fileName = 'trade-executor-commands.json', + maxCommands = 1000, +}) { const store = createJsonStateStore({ stateDir, fileName, initialState: INITIAL_STATE, }); + compactStore(store, maxCommands); return { get(commandId) { @@ -21,21 +27,24 @@ export function createExecutorStateStore({ stateDir, fileName = 'trade-executor- }; }, markProcessing(commandId, metadata) { - return updateCommand(store, commandId, metadata, 'processing'); + return updateCommand(store, commandId, metadata, 'processing', maxCommands); }, markSubmitted(commandId, metadata) { - return updateCommand(store, commandId, metadata, 'submitted'); + return updateCommand(store, commandId, metadata, 'submitted', maxCommands); }, markFailed(commandId, metadata) { - return updateCommand(store, commandId, metadata, 'failed'); + return updateCommand(store, commandId, metadata, 'failed', maxCommands); }, getState() { return normalizeState(store.getState()); }, + getSummary({ limit = 50 } = {}) { + return summarizeState(store.getState(), { limit }); + }, }; } -function updateCommand(store, commandId, metadata, status) { +function updateCommand(store, commandId, metadata, status, maxCommands) { const nextState = store.update((state) => { state.commands[commandId] = { ...(state.commands[commandId] || {}), @@ -43,7 +52,7 @@ function updateCommand(store, commandId, metadata, status) { status, updated_at: new Date().toISOString(), }; - return state; + return pruneState(state, maxCommands); }); return nextState.commands[commandId]; @@ -61,5 +70,62 @@ function normalizeState(state) { }, ]), ), + evicted_count: Number(state.evicted_count || 0), }; } + +function compactStore(store, maxCommands) { + const before = store.getState(); + const beforeCount = Object.keys(before.commands || {}).length; + const next = pruneState(before, maxCommands); + const afterCount = Object.keys(next.commands || {}).length; + if (beforeCount !== afterCount || Number(before.evicted_count || 0) !== Number(next.evicted_count || 0)) { + store.setState(next); + } +} + +function pruneState(state, maxCommands) { + const maxEntries = Math.max(1, Number(maxCommands) || 1000); + const entries = Object.entries(state.commands || {}).map(([commandId, command]) => [ + commandId, + normalizeCommand(command), + ]); + entries.sort((left, right) => timestampValue(right[1].updated_at) - timestampValue(left[1].updated_at)); + const kept = entries.slice(0, maxEntries); + const evicted = Math.max(0, entries.length - kept.length); + return { + ...state, + evicted_count: Number(state.evicted_count || 0) + evicted, + commands: Object.fromEntries(kept), + }; +} + +function summarizeState(state, { limit = 50 } = {}) { + const normalized = normalizeState(state); + const entries = Object.entries(normalized.commands || {}); + entries.sort((left, right) => timestampValue(right[1].updated_at) - timestampValue(left[1].updated_at)); + const byStatus = {}; + for (const [, command] of entries) { + const status = command.status || 'unknown'; + byStatus[status] = (byStatus[status] || 0) + 1; + } + return { + total_commands: entries.length, + evicted_count: normalized.evicted_count, + by_status: byStatus, + latest_updated_at: entries[0]?.[1]?.updated_at || null, + commands: Object.fromEntries(entries.slice(0, Math.max(0, Number(limit) || 50))), + }; +} + +function normalizeCommand(command) { + return { + ...command, + status: command.status === 'completed' ? 'submitted' : command.status, + }; +} + +function timestampValue(value) { + const parsed = Date.parse(value || ''); + return Number.isFinite(parsed) ? parsed : 0; +} diff --git a/src/core/recent-id-cache.mjs b/src/core/recent-id-cache.mjs new file mode 100644 index 0000000..0944276 --- /dev/null +++ b/src/core/recent-id-cache.mjs @@ -0,0 +1,30 @@ +export function createRecentIdCache({ limit = 5000 } = {}) { + const maxEntries = Math.max(1, Number(limit) || 5000); + const ids = new Set(); + const order = []; + let evictedCount = 0; + + return { + has(id) { + return ids.has(id); + }, + add(id) { + if (!id) return this.getState(); + if (ids.has(id)) return this.getState(); + ids.add(id); + order.push(id); + while (order.length > maxEntries) { + const evicted = order.shift(); + if (evicted && ids.delete(evicted)) evictedCount += 1; + } + return this.getState(); + }, + getState() { + return { + count: ids.size, + limit: maxEntries, + evicted_count: evictedCount, + }; + }, + }; +} diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index 9d65da6..a13b068 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -2173,36 +2173,59 @@ export async function refreshQuoteOutcomes(pool, { btcAsset = null, eureAsset = null, now = Date.now(), + submissionLimit = 1000, + inventoryLimit = 5000, } = {}) { if (!btcAsset?.assetId || !eureAsset?.assetId) return []; + const safeSubmissionLimit = Math.max(1, Number(submissionLimit) || 1000); + const safeInventoryLimit = Math.max(1, Number(inventoryLimit) || 5000); + const submissionsResult = await pool.query( + ` + SELECT event_id, observed_at, ingested_at, quote_id, payload + FROM ( + SELECT event_id, observed_at, ingested_at, quote_id, payload + FROM trade_execution_results + WHERE payload->>'status' = 'submitted' + ORDER BY COALESCE(observed_at, ingested_at) DESC + LIMIT $1 + ) recent_submissions + ORDER BY COALESCE(observed_at, ingested_at) ASC + `, + [safeSubmissionLimit], + ); + if (!submissionsResult.rows.length) return []; + + const quoteIds = [...new Set(submissionsResult.rows.map((row) => row.quote_id).filter(Boolean))]; + if (!quoteIds.length) return []; + const [ - submissionsResult, commandsResult, decisionsResult, inventoryResult, ] = await Promise.all([ - pool.query(` - SELECT event_id, observed_at, ingested_at, quote_id, payload - FROM trade_execution_results - WHERE payload->>'status' = 'submitted' - ORDER BY COALESCE(observed_at, ingested_at) ASC - `), pool.query(` SELECT event_id, observed_at, ingested_at, quote_id, payload FROM execute_trade_commands + WHERE quote_id = ANY($1::text[]) ORDER BY COALESCE(observed_at, ingested_at) ASC - `), + `, [quoteIds]), pool.query(` SELECT event_id, observed_at, ingested_at, quote_id, payload FROM trade_decisions + WHERE quote_id = ANY($1::text[]) ORDER BY COALESCE(observed_at, ingested_at) ASC - `), + `, [quoteIds]), pool.query(` SELECT event_id, observed_at, ingested_at, quote_id, payload - FROM intent_inventory_snapshots + FROM ( + SELECT event_id, observed_at, ingested_at, quote_id, payload + FROM intent_inventory_snapshots + ORDER BY COALESCE(observed_at, ingested_at) DESC + LIMIT $1 + ) recent_inventory_snapshots ORDER BY COALESCE(observed_at, ingested_at) ASC - `), + `, [safeInventoryLimit]), ]); const records = deriveQuoteOutcomeRecords({ diff --git a/test/executor-command-expiry.test.mjs b/test/executor-command-expiry.test.mjs new file mode 100644 index 0000000..76a2d83 --- /dev/null +++ b/test/executor-command-expiry.test.mjs @@ -0,0 +1,45 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { classifyExecuteCommandExpiry } from '../src/core/executor-command-expiry.mjs'; + +test('execute command expiry rejects commands older than their quote deadline', () => { + const result = classifyExecuteCommandExpiry({ + observed_at: '2026-05-13T10:00:00.000Z', + ingested_at: '2026-05-13T10:00:01.000Z', + payload: { + min_deadline_ms: '15000', + }, + }, { + now: Date.parse('2026-05-13T10:00:16.000Z'), + }); + + assert.equal(result.expired, true); + assert.equal(result.reason, 'command_deadline_elapsed'); + assert.equal(result.deadline_at, '2026-05-13T10:00:15.000Z'); +}); + +test('execute command expiry keeps fresh commands eligible for relay submission', () => { + const result = classifyExecuteCommandExpiry({ + observed_at: '2026-05-13T10:00:00.000Z', + payload: { + min_deadline_ms: '15000', + }, + }, { + now: Date.parse('2026-05-13T10:00:14.999Z'), + }); + + assert.equal(result.expired, false); + assert.equal(result.reason, null); +}); + +test('execute command expiry fails closed when timestamps are missing', () => { + const result = classifyExecuteCommandExpiry({ + payload: { + min_deadline_ms: '15000', + }, + }); + + assert.equal(result.expired, true); + assert.equal(result.reason, 'command_timestamp_missing'); +}); diff --git a/test/executor-state-store.test.mjs b/test/executor-state-store.test.mjs index 200bc77..1eb88fc 100644 --- a/test/executor-state-store.test.mjs +++ b/test/executor-state-store.test.mjs @@ -40,3 +40,37 @@ test('executor state store normalizes legacy completed markers to submitted', () assert.equal(store.get('cmd-legacy').status, 'submitted'); assert.equal(store.getState().commands['cmd-legacy'].status, 'submitted'); }); + +test('executor state store prunes old command records before serving state', () => { + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), 'unrip-executor-')); + const statePath = path.join(stateDir, 'trade-executor-commands.json'); + fs.writeFileSync( + statePath, + JSON.stringify({ + commands: { + 'cmd-old': { + status: 'failed', + updated_at: '2026-05-13T10:00:00.000Z', + }, + 'cmd-mid': { + status: 'submitted', + updated_at: '2026-05-13T10:01:00.000Z', + }, + 'cmd-new': { + status: 'processing', + updated_at: '2026-05-13T10:02:00.000Z', + }, + }, + }), + ); + + const store = createExecutorStateStore({ stateDir, maxCommands: 2 }); + const state = store.getState(); + assert.deepEqual(Object.keys(state.commands).sort(), ['cmd-mid', 'cmd-new']); + assert.equal(state.evicted_count, 1); + + const summary = store.getSummary({ limit: 1 }); + assert.equal(summary.total_commands, 2); + assert.deepEqual(Object.keys(summary.commands), ['cmd-new']); + assert.equal(summary.by_status.processing, 1); +}); diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index d749448..bdbb8d4 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -4,7 +4,7 @@ import { readFileSync } from 'node:fs'; const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.meta.url), 'utf8'); -test('history writer consumes from beginning so first events on newly-created topics are durable', () => { - assert.match(source, /consumer\.subscribe\(\{ topic, fromBeginning: true \}\)/); - assert.doesNotMatch(source, /consumer\.subscribe\(\{ topic, fromBeginning: false \}\)/); +test('history writer replays durable topics but joins the raw quote firehose live', () => { + assert.match(source, /fromBeginning:\s*topic !== config\.kafkaTopicRawNearIntentsQuote/); + assert.match(source, /Raw quote volume is a live firehose/); }); diff --git a/test/postgres-quote-outcomes-refresh.test.mjs b/test/postgres-quote-outcomes-refresh.test.mjs new file mode 100644 index 0000000..a7a6f83 --- /dev/null +++ b/test/postgres-quote-outcomes-refresh.test.mjs @@ -0,0 +1,143 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { refreshQuoteOutcomes } from '../src/lib/postgres.mjs'; + +const btcAsset = { + assetId: 'nep141:nbtc.bridge.near', + decimals: 8, +}; +const eureAsset = { + assetId: 'nep141:eure.omft.near', + decimals: 18, +}; + +test('quote outcome refresh bounds source queries and joins by recent quote ids', async () => { + const queries = []; + const pool = { + async query(sql, params = []) { + queries.push({ sql, params }); + if (sql.includes('FROM trade_execution_results')) { + assert.match(sql, /LIMIT \$1/); + assert.equal(params[0], 2); + return { + rows: [ + eventRow({ + eventId: 'result-1', + quoteId: 'quote-1', + at: '2026-05-13T10:00:10.000Z', + payload: { + status: 'submitted', + result_code: 'quote_response_ok', + quote_id: 'quote-1', + }, + }), + ], + }; + } + if (sql.includes('FROM execute_trade_commands')) { + assert.match(sql, /quote_id = ANY\(\$1::text\[\]\)/); + assert.deepEqual(params[0], ['quote-1']); + return { + rows: [ + eventRow({ + eventId: 'cmd-1', + quoteId: 'quote-1', + at: '2026-05-13T10:00:09.000Z', + payload: { + command_id: 'cmd-1', + decision_id: 'decision-1', + quote_id: 'quote-1', + min_deadline_ms: '15000', + asset_in: eureAsset.assetId, + asset_out: btcAsset.assetId, + amount_in: '1000000000000000000', + quote_output: { + amount_out: '1000', + }, + }, + }), + ], + }; + } + if (sql.includes('FROM trade_decisions')) { + assert.match(sql, /quote_id = ANY\(\$1::text\[\]\)/); + return { + rows: [ + eventRow({ + eventId: 'decision-1', + quoteId: 'quote-1', + at: '2026-05-13T10:00:08.000Z', + payload: { + decision_id: 'decision-1', + quote_id: 'quote-1', + decision: 'actionable', + }, + }), + ], + }; + } + if (sql.includes('FROM intent_inventory_snapshots')) { + assert.match(sql, /LIMIT \$1/); + assert.equal(params[0], 3); + return { + rows: [ + eventRow({ + eventId: 'inventory-1', + at: '2026-05-13T10:00:00.000Z', + payload: { + inventory_id: 'inventory-1', + spendable: { + [btcAsset.assetId]: '2000', + [eureAsset.assetId]: '1000000000000000000', + }, + }, + }), + eventRow({ + eventId: 'inventory-2', + at: '2026-05-13T10:00:12.000Z', + payload: { + inventory_id: 'inventory-2', + spendable: { + [btcAsset.assetId]: '1000', + [eureAsset.assetId]: '2000000000000000000', + }, + }, + }), + ], + }; + } + if (sql.includes('INSERT INTO quote_outcome_attributions')) { + return { rows: [], rowCount: 1 }; + } + throw new Error(`unexpected query: ${sql}`); + }, + }; + + const records = await refreshQuoteOutcomes(pool, { + btcAsset, + eureAsset, + now: Date.parse('2026-05-13T10:00:20.000Z'), + submissionLimit: 2, + inventoryLimit: 3, + }); + + assert.equal(records.length, 1); + assert.equal(records[0].quote_id, 'quote-1'); + assert.equal(queries.filter((entry) => entry.sql.includes('INSERT INTO quote_outcome_attributions')).length, 1); +}); + +function eventRow({ + eventId, + quoteId = null, + at, + payload, +}) { + return { + event_id: eventId, + observed_at: at, + ingested_at: at, + quote_id: quoteId, + payload, + }; +} diff --git a/test/recent-id-cache.test.mjs b/test/recent-id-cache.test.mjs new file mode 100644 index 0000000..6c2b798 --- /dev/null +++ b/test/recent-id-cache.test.mjs @@ -0,0 +1,22 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { createRecentIdCache } from '../src/core/recent-id-cache.mjs'; + +test('recent id cache evicts old ids while retaining duplicate checks for recent ids', () => { + const cache = createRecentIdCache({ limit: 2 }); + + cache.add('quote-1'); + cache.add('quote-2'); + assert.equal(cache.has('quote-1'), true); + + cache.add('quote-3'); + assert.equal(cache.has('quote-1'), false); + assert.equal(cache.has('quote-2'), true); + assert.equal(cache.has('quote-3'), true); + assert.deepEqual(cache.getState(), { + count: 2, + limit: 2, + evicted_count: 1, + }); +}); diff --git a/test/strategy-engine-static.test.mjs b/test/strategy-engine-static.test.mjs new file mode 100644 index 0000000..32ea85d --- /dev/null +++ b/test/strategy-engine-static.test.mjs @@ -0,0 +1,12 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; +import { readFileSync } from 'node:fs'; + +const source = readFileSync(new URL('../src/apps/strategy-engine.mjs', import.meta.url), 'utf8'); + +test('strategy duplicate quote tracking is bounded and state-safe', () => { + assert.match(source, /createRecentIdCache\(\{ limit: 5000 \}\)/); + assert.match(source, /seenQuotes\.has/); + assert.match(source, /seenQuotes\.getState\(\)/); + assert.doesNotMatch(source, /seen_quotes:\s*\{\}/); +}); diff --git a/test/trade-executor-static.test.mjs b/test/trade-executor-static.test.mjs index d82cc7f..c29c020 100644 --- a/test/trade-executor-static.test.mjs +++ b/test/trade-executor-static.test.mjs @@ -14,3 +14,14 @@ test('own request preflight suppresses maker quote responses to avoid self-match assert.match(source, /own_request_preflight_in_progress/); assert.match(source, /avoid self-matching/); }); + +test('trade executor fails closed on stale execute commands before relay submission', () => { + assert.match(source, /classifyExecuteCommandExpiry/); + assert.match(source, /stale_execute_command/); + assert.match(source, /deadline elapsed before relay submission/); +}); + +test('trade executor exposes summarized durable command state', () => { + assert.match(source, /stateStore\.getSummary\(\{ limit: 50 \}\)/); + assert.doesNotMatch(source, /durable_state:\s*stateStore\.getState\(\)/); +}); diff --git a/test/unrip-resource-guardrails-static.test.mjs b/test/unrip-resource-guardrails-static.test.mjs index 0cc656f..cdd4637 100644 --- a/test/unrip-resource-guardrails-static.test.mjs +++ b/test/unrip-resource-guardrails-static.test.mjs @@ -13,7 +13,7 @@ function deploymentBlock(name) { return match[0]; } -for (const name of ['near-intents-ingest', 'history-writer', 'operator-dashboard']) { +for (const name of ['near-intents-ingest', 'history-writer', 'trade-executor', 'operator-dashboard']) { test(`${name} has memory guardrails for live quote pressure`, () => { const block = deploymentBlock(name);