From d9e7d570f47c29be81601abd414c6c6584738e7e Mon Sep 17 00:00:00 2001 From: philipp Date: Sun, 24 May 2026 14:59:58 +0200 Subject: [PATCH] Bound raw quote retention drain Proof: Raw NEAR Intents quote retention now keeps only a 30 minute raw firehose window and drains up to 10M stale unlinked raw rows per pass. Targeted raw retention tests, full npm test, and operator dashboard bundle build pass. Assumptions: raw quote firehose rows are debug evidence, while normalized quote demand, decisions, commands, executor results, outcomes, inventory, pricing, and DB config remain the durable trading evidence. Still fake: venue-native terminal fill ids and fee-complete realized PnL remain unavailable; raw quote firehose rows truncated during emergency recovery are intentionally no longer readable. --- src/apps/history-writer.mjs | 8 +-- src/lib/postgres.mjs | 78 +++++++++++++++++------------ test/history-writer-static.test.mjs | 6 ++- test/inventory-and-history.test.mjs | 29 +++++++++++ 4 files changed, 84 insertions(+), 37 deletions(-) diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index e3ec9fe..777b04f 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -121,9 +121,10 @@ const topics = [ const rawQuoteTopics = [ config.kafkaTopicRawNearIntentsQuote, ]; -const rawQuoteHistoryPruneIntervalMs = 15 * 60 * 1000; -const rawQuoteHistoryRetainRecentMs = 6 * 60 * 60 * 1000; -const rawQuoteHistoryPruneBatchSize = 100_000; +const rawQuoteHistoryPruneIntervalMs = 60 * 1000; +const rawQuoteHistoryRetainRecentMs = 30 * 60 * 1000; +const rawQuoteHistoryPruneBatchSize = 500_000; +const rawQuoteHistoryPruneMaxBatches = 20; const liveEvidenceTopics = [ config.kafkaTopicNormSwapDemand, config.kafkaTopicDecisionTradeDecision, @@ -312,6 +313,7 @@ async function maybePruneRawQuoteHistory({ force = false } = {}) { now: new Date(nowMs).toISOString(), retainRecentMs: rawQuoteHistoryRetainRecentMs, batchSize: rawQuoteHistoryPruneBatchSize, + maxBatches: rawQuoteHistoryPruneMaxBatches, }); state.last_raw_quote_prune_at = new Date(nowMs).toISOString(); state.raw_quote_prune_deleted_count += result.deletedCount; diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index c6bc3e1..f5a1693 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -2127,8 +2127,9 @@ export async function insertHistoryEvents(pool, entries = []) { export async function pruneRawNearIntentsQuoteHistory(pool, { now = new Date().toISOString(), - retainRecentMs = 6 * 60 * 60 * 1000, + retainRecentMs = 30 * 60 * 1000, batchSize = 100_000, + maxBatches = 1, } = {}) { const nowMs = Date.parse(now); if (!Number.isFinite(nowMs)) throw new Error('now must be a valid timestamp'); @@ -2136,45 +2137,56 @@ export async function pruneRawNearIntentsQuoteHistory(pool, { throw new Error('retain_recent_ms must be a positive integer'); } const boundedBatchSize = Math.max(1, Math.min(500_000, Math.floor(Number(batchSize) || 0))); + const boundedMaxBatches = Math.max(1, Math.min(100, Math.floor(Number(maxBatches) || 0))); const cutoff = new Date(nowMs - retainRecentMs).toISOString(); - const result = await pool.query( - ` - WITH stale_raw_quotes AS ( - SELECT raw.event_id - FROM raw_near_intents_quotes raw - WHERE raw.ingested_at < $1::timestamptz - AND NOT EXISTS ( - SELECT 1 FROM trade_decisions decision - WHERE decision.quote_id = raw.quote_id - ) - AND NOT EXISTS ( - SELECT 1 FROM execute_trade_commands command - WHERE command.quote_id = raw.quote_id - ) - AND NOT EXISTS ( - SELECT 1 FROM trade_execution_results execution - WHERE execution.quote_id = raw.quote_id - ) - AND NOT EXISTS ( - SELECT 1 FROM quote_outcome_attributions outcome - WHERE outcome.quote_id = raw.quote_id - ) - ORDER BY raw.ingested_at ASC - LIMIT $2 - ) - DELETE FROM raw_near_intents_quotes raw - USING stale_raw_quotes - WHERE raw.event_id = stale_raw_quotes.event_id - `, - [cutoff, boundedBatchSize], - ); + let deletedCount = 0; + let batches = 0; + for (let batch = 0; batch < boundedMaxBatches; batch += 1) { + const result = await pool.query( + ` + WITH stale_raw_quotes AS ( + SELECT raw.event_id + FROM raw_near_intents_quotes raw + WHERE raw.ingested_at < $1::timestamptz + AND NOT EXISTS ( + SELECT 1 FROM trade_decisions decision + WHERE decision.quote_id = raw.quote_id + ) + AND NOT EXISTS ( + SELECT 1 FROM execute_trade_commands command + WHERE command.quote_id = raw.quote_id + ) + AND NOT EXISTS ( + SELECT 1 FROM trade_execution_results execution + WHERE execution.quote_id = raw.quote_id + ) + AND NOT EXISTS ( + SELECT 1 FROM quote_outcome_attributions outcome + WHERE outcome.quote_id = raw.quote_id + ) + ORDER BY raw.ingested_at ASC + LIMIT $2 + ) + DELETE FROM raw_near_intents_quotes raw + USING stale_raw_quotes + WHERE raw.event_id = stale_raw_quotes.event_id + `, + [cutoff, boundedBatchSize], + ); + const batchDeleted = Number(result.rowCount || 0); + deletedCount += batchDeleted; + batches += 1; + if (batchDeleted < boundedBatchSize) break; + } return { - deletedCount: Number(result.rowCount || 0), + deletedCount, cutoff, retainRecentMs, batchSize: boundedBatchSize, + maxBatches: boundedMaxBatches, + batches, }; } diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index 2fb2175..6c02160 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -20,8 +20,12 @@ test('history writer replays durable topics but joins the raw quote firehose liv assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/); assert.match(source, /eachBatch/); assert.match(source, /insertHistoryEvents/); - assert.match(source, /rawQuoteHistoryRetainRecentMs\s*=\s*6 \* 60 \* 60 \* 1000/); + assert.match(source, /rawQuoteHistoryPruneIntervalMs\s*=\s*60 \* 1000/); + assert.match(source, /rawQuoteHistoryRetainRecentMs\s*=\s*30 \* 60 \* 1000/); + assert.match(source, /rawQuoteHistoryPruneBatchSize\s*=\s*500_000/); + assert.match(source, /rawQuoteHistoryPruneMaxBatches\s*=\s*20/); assert.match(source, /pruneRawNearIntentsQuoteHistory/); + assert.match(source, /maxBatches:\s*rawQuoteHistoryPruneMaxBatches/); assert.match(source, /batch\.topic === config\.kafkaTopicRawNearIntentsQuote/); }); diff --git a/test/inventory-and-history.test.mjs b/test/inventory-and-history.test.mjs index 73bc9d1..f612b67 100644 --- a/test/inventory-and-history.test.mjs +++ b/test/inventory-and-history.test.mjs @@ -128,6 +128,35 @@ test('raw quote retention preserves rows linked to maker lifecycle evidence', as assert.deepEqual(queries[0].params, ['2026-05-21T06:00:00.000Z', 123]); }); +test('raw quote retention drains multiple bounded batches when firehose backlog exceeds one batch', async () => { + const rowCounts = [500_000, 500_000, 12]; + const queries = []; + const pool = { + async query(sql, params) { + queries.push({ sql, params }); + return { rows: [], rowCount: rowCounts.shift() ?? 0 }; + }, + }; + + const result = await pruneRawNearIntentsQuoteHistory(pool, { + now: '2026-05-21T12:00:00.000Z', + retainRecentMs: 30 * 60 * 1000, + batchSize: 500_000, + maxBatches: 20, + }); + + assert.equal(result.deletedCount, 1_000_012); + assert.equal(result.cutoff, '2026-05-21T11:30:00.000Z'); + assert.equal(result.batchSize, 500_000); + assert.equal(result.maxBatches, 20); + assert.equal(result.batches, 3); + assert.equal(queries.length, 3); + for (const query of queries) { + assert.match(query.sql, /DELETE FROM raw_near_intents_quotes/); + assert.deepEqual(query.params, ['2026-05-21T11:30:00.000Z', 500_000]); + } +}); + function historyEvent(eventId, payload) { return { event_id: eventId,