diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 76a5dae..e3ec9fe 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -26,6 +26,7 @@ import { insertHistoryEvents, loadLatestPortfolioMetric, loadPortfolioMetricInputs, + pruneRawNearIntentsQuoteHistory, refreshIntentRequestOutcomes, refreshQuoteOutcomes, seedTradingConfig, @@ -120,6 +121,9 @@ const topics = [ const rawQuoteTopics = [ config.kafkaTopicRawNearIntentsQuote, ]; +const rawQuoteHistoryPruneIntervalMs = 15 * 60 * 1000; +const rawQuoteHistoryRetainRecentMs = 6 * 60 * 60 * 1000; +const rawQuoteHistoryPruneBatchSize = 100_000; const liveEvidenceTopics = [ config.kafkaTopicNormSwapDemand, config.kafkaTopicDecisionTradeDecision, @@ -189,6 +193,9 @@ const state = { last_intent_request_outcomes_at: null, latest_intent_request_outcomes: null, intent_request_outcomes_error: null, + last_raw_quote_prune_at: null, + raw_quote_prune_deleted_count: 0, + raw_quote_prune_error: null, derived_refresh_skipped_count: 0, last_derived_refresh_skipped_at: null, last_derived_refresh_skipped_topic: null, @@ -276,6 +283,10 @@ async function runHistoryConsumer(historyConsumer) { await heartbeat(); } + if (batch.topic === config.kafkaTopicRawNearIntentsQuote) { + await maybePruneRawQuoteHistory(); + } + if (state.draining) { setTimeout(() => shutdown(), 0); } @@ -283,6 +294,45 @@ async function runHistoryConsumer(historyConsumer) { }); } +async function maybePruneRawQuoteHistory({ force = false } = {}) { + const nowMs = Date.now(); + const lastPruneMs = state.last_raw_quote_prune_at + ? Date.parse(state.last_raw_quote_prune_at) + : 0; + if ( + !force + && Number.isFinite(lastPruneMs) + && nowMs - lastPruneMs < rawQuoteHistoryPruneIntervalMs + ) { + return null; + } + + try { + const result = await pruneRawNearIntentsQuoteHistory(pool, { + now: new Date(nowMs).toISOString(), + retainRecentMs: rawQuoteHistoryRetainRecentMs, + batchSize: rawQuoteHistoryPruneBatchSize, + }); + state.last_raw_quote_prune_at = new Date(nowMs).toISOString(); + state.raw_quote_prune_deleted_count += result.deletedCount; + state.raw_quote_prune_error = null; + if (result.deletedCount > 0) { + logger.info('raw_near_intents_quote_history_pruned', { + details: result, + }); + } + return result; + } catch (error) { + state.raw_quote_prune_error = serializeError(error); + logger.error('raw_near_intents_quote_history_prune_failed', { + details: { + error: state.raw_quote_prune_error, + }, + }); + return null; + } +} + async function handleWrittenHistoryEvent({ topic, partition, diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index 1f6cf33..c6bc3e1 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -2125,6 +2125,59 @@ export async function insertHistoryEvents(pool, entries = []) { return { insertedEventIds }; } +export async function pruneRawNearIntentsQuoteHistory(pool, { + now = new Date().toISOString(), + retainRecentMs = 6 * 60 * 60 * 1000, + batchSize = 100_000, +} = {}) { + const nowMs = Date.parse(now); + if (!Number.isFinite(nowMs)) throw new Error('now must be a valid timestamp'); + if (!Number.isInteger(retainRecentMs) || retainRecentMs <= 0) { + throw new Error('retain_recent_ms must be a positive integer'); + } + const boundedBatchSize = Math.max(1, Math.min(500_000, Math.floor(Number(batchSize) || 0))); + const cutoff = new Date(nowMs - retainRecentMs).toISOString(); + + const result = await pool.query( + ` + WITH stale_raw_quotes AS ( + SELECT raw.event_id + FROM raw_near_intents_quotes raw + WHERE raw.ingested_at < $1::timestamptz + AND NOT EXISTS ( + SELECT 1 FROM trade_decisions decision + WHERE decision.quote_id = raw.quote_id + ) + AND NOT EXISTS ( + SELECT 1 FROM execute_trade_commands command + WHERE command.quote_id = raw.quote_id + ) + AND NOT EXISTS ( + SELECT 1 FROM trade_execution_results execution + WHERE execution.quote_id = raw.quote_id + ) + AND NOT EXISTS ( + SELECT 1 FROM quote_outcome_attributions outcome + WHERE outcome.quote_id = raw.quote_id + ) + ORDER BY raw.ingested_at ASC + LIMIT $2 + ) + DELETE FROM raw_near_intents_quotes raw + USING stale_raw_quotes + WHERE raw.event_id = stale_raw_quotes.event_id + `, + [cutoff, boundedBatchSize], + ); + + return { + deletedCount: Number(result.rowCount || 0), + cutoff, + retainRecentMs, + batchSize: boundedBatchSize, + }; +} + export async function insertEnvironmentStatusChange(pool, { topic, event, record }) { const fingerprint = event.payload?.status_fingerprint || null; const environmentKey = event.payload?.environment_key || record.decision_key || null; diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index 8522cd9..2fb2175 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -20,6 +20,9 @@ test('history writer replays durable topics but joins the raw quote firehose liv assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/); assert.match(source, /eachBatch/); assert.match(source, /insertHistoryEvents/); + assert.match(source, /rawQuoteHistoryRetainRecentMs\s*=\s*6 \* 60 \* 60 \* 1000/); + assert.match(source, /pruneRawNearIntentsQuoteHistory/); + assert.match(source, /batch\.topic === config\.kafkaTopicRawNearIntentsQuote/); }); test('history writer passes tracked assets into portfolio valuation', () => { diff --git a/test/inventory-and-history.test.mjs b/test/inventory-and-history.test.mjs index ce3fee3..73bc9d1 100644 --- a/test/inventory-and-history.test.mjs +++ b/test/inventory-and-history.test.mjs @@ -3,7 +3,7 @@ import assert from 'node:assert/strict'; import { buildInventorySnapshot } from '../src/core/inventory.mjs'; import { routeHistoryRecord } from '../src/core/history-records.mjs'; -import { insertHistoryEvents } from '../src/lib/postgres.mjs'; +import { insertHistoryEvents, pruneRawNearIntentsQuoteHistory } from '../src/lib/postgres.mjs'; test('inventory snapshot keeps pending funding out of spendable balances', () => { const snapshot = buildInventorySnapshot({ @@ -101,6 +101,33 @@ test('bulk history insert writes multiple routed events in one table statement', assert.deepEqual([...result.insertedEventIds].sort(), ['evt-1', 'evt-2']); }); +test('raw quote retention preserves rows linked to maker lifecycle evidence', async () => { + const queries = []; + const pool = { + async query(sql, params) { + queries.push({ sql, params }); + return { rows: [], rowCount: 42 }; + }, + }; + + const result = await pruneRawNearIntentsQuoteHistory(pool, { + now: '2026-05-21T12:00:00.000Z', + retainRecentMs: 6 * 60 * 60 * 1000, + batchSize: 123, + }); + + assert.equal(result.deletedCount, 42); + assert.equal(result.cutoff, '2026-05-21T06:00:00.000Z'); + assert.equal(result.batchSize, 123); + assert.equal(queries.length, 1); + assert.match(queries[0].sql, /DELETE FROM raw_near_intents_quotes/); + assert.match(queries[0].sql, /trade_decisions/); + assert.match(queries[0].sql, /execute_trade_commands/); + assert.match(queries[0].sql, /trade_execution_results/); + assert.match(queries[0].sql, /quote_outcome_attributions/); + assert.deepEqual(queries[0].params, ['2026-05-21T06:00:00.000Z', 123]); +}); + function historyEvent(eventId, payload) { return { event_id: eventId,