diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 1dc5776..76a5dae 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -88,6 +88,16 @@ for (let index = 0; index < durableHistoryConsumerCount; index += 1) { logger, })); } +const liveEvidenceConsumerCount = 2; +const liveEvidenceConsumers = []; +for (let index = 0; index < liveEvidenceConsumerCount; index += 1) { + liveEvidenceConsumers.push(await createConsumer({ + groupId: `${config.kafkaConsumerGroupHistory}-live`, + brokers: config.kafkaBrokers, + clientId: config.kafkaClientId, + logger, + })); +} const rawQuoteConsumer = await createConsumer({ groupId: `${config.kafkaConsumerGroupHistory}-raw`, brokers: config.kafkaBrokers, @@ -110,6 +120,10 @@ const topics = [ const rawQuoteTopics = [ config.kafkaTopicRawNearIntentsQuote, ]; +const liveEvidenceTopics = [ + config.kafkaTopicNormSwapDemand, + config.kafkaTopicDecisionTradeDecision, +]; const portfolioMetricTopics = new Set([ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, @@ -134,6 +148,16 @@ for (const historyConsumer of durableConsumers) { }); } } +for (const historyConsumer of liveEvidenceConsumers) { + for (const topic of liveEvidenceTopics) { + // The durable group keeps replaying retained history. This live group keeps + // current quote/decision truth visible while that backlog drains. + await historyConsumer.subscribe({ + topic, + fromBeginning: false, + }); + } +} for (const topic of rawQuoteTopics) { // Raw quote volume is a live firehose. It gets a dedicated consumer group so // raw storage cannot starve durable strategy/execution topics. @@ -187,6 +211,9 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => { for (const historyConsumer of durableConsumers) { await runHistoryConsumer(historyConsumer); } +for (const historyConsumer of liveEvidenceConsumers) { + await runHistoryConsumer(historyConsumer); +} await runHistoryConsumer(rawQuoteConsumer); async function runHistoryConsumer(historyConsumer) { @@ -603,6 +630,9 @@ function pauseConsumers() { for (const historyConsumer of durableConsumers) { historyConsumer.pause(topicRefs(topics)); } + for (const historyConsumer of liveEvidenceConsumers) { + historyConsumer.pause(topicRefs(liveEvidenceTopics)); + } rawQuoteConsumer.pause(topicRefs(rawQuoteTopics)); } @@ -610,6 +640,9 @@ function resumeConsumers() { for (const historyConsumer of durableConsumers) { historyConsumer.resume(topicRefs(topics)); } + for (const historyConsumer of liveEvidenceConsumers) { + historyConsumer.resume(topicRefs(liveEvidenceTopics)); + } rawQuoteConsumer.resume(topicRefs(rawQuoteTopics)); } @@ -617,6 +650,7 @@ async function shutdown() { await controlApi.close().catch(() => {}); await Promise.allSettled([ ...durableConsumers.map((historyConsumer) => historyConsumer.disconnect()), + ...liveEvidenceConsumers.map((historyConsumer) => historyConsumer.disconnect()), rawQuoteConsumer.disconnect(), ]); await pool.end(); diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index d568ae8..8522cd9 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -7,9 +7,14 @@ const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.met test('history writer replays durable topics but joins the raw quote firehose live', () => { assert.match(source, /durableHistoryConsumerCount\s*=\s*3/); assert.match(source, /durableConsumers\.push\(await createConsumer/); + assert.match(source, /liveEvidenceConsumerCount\s*=\s*2/); + assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-live`/); assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/); + assert.match(source, /liveEvidenceTopics\s*=\s*\[[\s\S]+config\.kafkaTopicNormSwapDemand[\s\S]+config\.kafkaTopicDecisionTradeDecision/); + assert.match(source, /current quote\/decision truth visible/); assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/); assert.match(source, /historyConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); + assert.match(source, /liveEvidenceTopics[\s\S]+fromBeginning:\s*false/); assert.match(source, /Raw quote volume is a live firehose/); assert.match(source, /runHistoryConsumer\(historyConsumer\)/); assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/);