From b6646fb7a398790834247e1073633481cde5d029 Mon Sep 17 00:00:00 2001 From: philipp Date: Tue, 19 May 2026 16:13:57 +0200 Subject: [PATCH] Keep live quote decision history current Proof: history-writer now adds a repo-owned live evidence consumer group for normalized quotes and trade decisions while the existing durable group continues replaying retained backlog, so current maker timing and strategy truth can be persisted without abandoning old rows. Assumptions: duplicate inserts are safe through existing event_id primary keys and bulk insert conflict handling; this changes only persistence catch-up behavior, not strategy decisions, relay submissions, pair enablement, edge, notional, inventory, arming, or response policy. Still fake: venue-native terminal fill ids and fee-complete realized PnL remain unavailable; older normalized quote and decision backlog still depends on the durable replay group draining over time. --- src/apps/history-writer.mjs | 34 +++++++++++++++++++++++++++++ test/history-writer-static.test.mjs | 5 +++++ 2 files changed, 39 insertions(+) diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 1dc5776..76a5dae 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -88,6 +88,16 @@ for (let index = 0; index < durableHistoryConsumerCount; index += 1) { logger, })); } +const liveEvidenceConsumerCount = 2; +const liveEvidenceConsumers = []; +for (let index = 0; index < liveEvidenceConsumerCount; index += 1) { + liveEvidenceConsumers.push(await createConsumer({ + groupId: `${config.kafkaConsumerGroupHistory}-live`, + brokers: config.kafkaBrokers, + clientId: config.kafkaClientId, + logger, + })); +} const rawQuoteConsumer = await createConsumer({ groupId: `${config.kafkaConsumerGroupHistory}-raw`, brokers: config.kafkaBrokers, @@ -110,6 +120,10 @@ const topics = [ const rawQuoteTopics = [ config.kafkaTopicRawNearIntentsQuote, ]; +const liveEvidenceTopics = [ + config.kafkaTopicNormSwapDemand, + config.kafkaTopicDecisionTradeDecision, +]; const portfolioMetricTopics = new Set([ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, @@ -134,6 +148,16 @@ for (const historyConsumer of durableConsumers) { }); } } +for (const historyConsumer of liveEvidenceConsumers) { + for (const topic of liveEvidenceTopics) { + // The durable group keeps replaying retained history. This live group keeps + // current quote/decision truth visible while that backlog drains. + await historyConsumer.subscribe({ + topic, + fromBeginning: false, + }); + } +} for (const topic of rawQuoteTopics) { // Raw quote volume is a live firehose. It gets a dedicated consumer group so // raw storage cannot starve durable strategy/execution topics. @@ -187,6 +211,9 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => { for (const historyConsumer of durableConsumers) { await runHistoryConsumer(historyConsumer); } +for (const historyConsumer of liveEvidenceConsumers) { + await runHistoryConsumer(historyConsumer); +} await runHistoryConsumer(rawQuoteConsumer); async function runHistoryConsumer(historyConsumer) { @@ -603,6 +630,9 @@ function pauseConsumers() { for (const historyConsumer of durableConsumers) { historyConsumer.pause(topicRefs(topics)); } + for (const historyConsumer of liveEvidenceConsumers) { + historyConsumer.pause(topicRefs(liveEvidenceTopics)); + } rawQuoteConsumer.pause(topicRefs(rawQuoteTopics)); } @@ -610,6 +640,9 @@ function resumeConsumers() { for (const historyConsumer of durableConsumers) { historyConsumer.resume(topicRefs(topics)); } + for (const historyConsumer of liveEvidenceConsumers) { + historyConsumer.resume(topicRefs(liveEvidenceTopics)); + } rawQuoteConsumer.resume(topicRefs(rawQuoteTopics)); } @@ -617,6 +650,7 @@ async function shutdown() { await controlApi.close().catch(() => {}); await Promise.allSettled([ ...durableConsumers.map((historyConsumer) => historyConsumer.disconnect()), + ...liveEvidenceConsumers.map((historyConsumer) => historyConsumer.disconnect()), rawQuoteConsumer.disconnect(), ]); await pool.end(); diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index d568ae8..8522cd9 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -7,9 +7,14 @@ const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.met test('history writer replays durable topics but joins the raw quote firehose live', () => { assert.match(source, /durableHistoryConsumerCount\s*=\s*3/); assert.match(source, /durableConsumers\.push\(await createConsumer/); + assert.match(source, /liveEvidenceConsumerCount\s*=\s*2/); + assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-live`/); assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/); + assert.match(source, /liveEvidenceTopics\s*=\s*\[[\s\S]+config\.kafkaTopicNormSwapDemand[\s\S]+config\.kafkaTopicDecisionTradeDecision/); + assert.match(source, /current quote\/decision truth visible/); assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/); assert.match(source, /historyConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); + assert.match(source, /liveEvidenceTopics[\s\S]+fromBeginning:\s*false/); assert.match(source, /Raw quote volume is a live firehose/); assert.match(source, /runHistoryConsumer\(historyConsumer\)/); assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/);