From 5f2380fdc094ed54a98521e6909496c9c881ae49 Mon Sep 17 00:00:00 2001 From: philipp Date: Tue, 19 May 2026 16:00:54 +0200 Subject: [PATCH] Split raw history consumer Proof: raw quote persistence now uses a dedicated history consumer group so raw quote firehose volume cannot starve durable normalized quote, decision, command, result, and outcome evidence topics in the main history-writer group. Assumptions: raw quote persistence can join live in a dedicated group without changing event schemas or strategy behavior; no live pair, edge, notional, inventory, arming, or response-policy settings are changed. Still fake: venue-native terminal fill ids and fee-complete realized PnL remain unavailable; historical backlog catch-up still depends on Kafka and Postgres throughput after deployment. --- src/apps/history-writer.mjs | 156 +++++++++++++++++----------- test/history-writer-static.test.mjs | 6 +- 2 files changed, 101 insertions(+), 61 deletions(-) diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 3392efe..469563d 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -84,9 +84,14 @@ const consumer = await createConsumer({ clientId: config.kafkaClientId, logger, }); +const rawQuoteConsumer = await createConsumer({ + groupId: `${config.kafkaConsumerGroupHistory}-raw`, + brokers: config.kafkaBrokers, + clientId: config.kafkaClientId, + logger, +}); const topics = [ - config.kafkaTopicRawNearIntentsQuote, config.kafkaTopicNormSwapDemand, config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, @@ -98,6 +103,9 @@ const topics = [ config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]; +const rawQuoteTopics = [ + config.kafkaTopicRawNearIntentsQuote, +]; const portfolioMetricTopics = new Set([ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, @@ -115,11 +123,17 @@ const intentRequestOutcomeTopics = new Set([ ]); for (const topic of topics) { - // Raw quote volume is a live firehose; replaying retained history can starve - // durable strategy/execution topics and exhaust the writer. await consumer.subscribe({ topic, - fromBeginning: topic !== config.kafkaTopicRawNearIntentsQuote, + fromBeginning: true, + }); +} +for (const topic of rawQuoteTopics) { + // Raw quote volume is a live firehose. It gets a dedicated consumer group so + // raw storage cannot starve durable strategy/execution topics. + await rawQuoteConsumer.subscribe({ + topic, + fromBeginning: false, }); } @@ -164,70 +178,75 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => { state.intent_request_outcomes_error = serializeError(error); }); -await consumer.run({ - eachBatch: async ({ batch, heartbeat }) => { - if (state.paused) return; +await runHistoryConsumer(consumer); +await runHistoryConsumer(rawQuoteConsumer); - const contexts = []; - const batchEntries = []; +async function runHistoryConsumer(historyConsumer) { + await historyConsumer.run({ + eachBatch: async ({ batch, heartbeat }) => { + if (state.paused) return; - for (const message of batch.messages) { - if (!message.value) continue; - try { - const event = parseEventMessage(message.value.toString()); - const routed = routeHistoryRecord({ topic: batch.topic, event }); - const context = { - topic: batch.topic, - partition: batch.partition, - message, - event, - routed, - writeResult: null, - }; + const contexts = []; + const batchEntries = []; - contexts.push(context); - if (batch.topic === config.kafkaTopicOpsEnvironmentStatus) { - context.writeResult = await insertEnvironmentStatusChange(pool, { + for (const message of batch.messages) { + if (!message.value) continue; + try { + const event = parseEventMessage(message.value.toString()); + const routed = routeHistoryRecord({ topic: batch.topic, event }); + const context = { topic: batch.topic, + partition: batch.partition, + message, event, - record: routed.record, - }); - } else { - batchEntries.push({ - table: routed.table, - topic: batch.topic, - event, - record: routed.record, - }); + routed, + writeResult: null, + }; + + contexts.push(context); + if (batch.topic === config.kafkaTopicOpsEnvironmentStatus) { + context.writeResult = await insertEnvironmentStatusChange(pool, { + topic: batch.topic, + event, + record: routed.record, + }); + } else { + batchEntries.push({ + table: routed.table, + topic: batch.topic, + event, + record: routed.record, + }); + } + } catch (error) { + recordHistoryError(batch.topic, error); } + } + + let insertedEventIds = new Set(); + try { + ({ insertedEventIds } = await insertHistoryEvents(pool, batchEntries)); } catch (error) { recordHistoryError(batch.topic, error); + throw error; } - } - let insertedEventIds = new Set(); - try { - ({ insertedEventIds } = await insertHistoryEvents(pool, batchEntries)); - } catch (error) { - recordHistoryError(batch.topic, error); - throw error; - } - - for (const context of contexts) { - if (!context.writeResult) { - context.writeResult = { - inserted: insertedEventIds.has(context.event.event_id), - }; + for (const context of contexts) { + if (!context.writeResult) { + context.writeResult = { + inserted: insertedEventIds.has(context.event.event_id), + }; + } + await handleWrittenHistoryEvent(context); + await heartbeat(); } - await handleWrittenHistoryEvent(context); - await heartbeat(); - } - if (state.draining) { - setTimeout(() => shutdown(), 0); - } - }, -}); + if (state.draining) { + setTimeout(() => shutdown(), 0); + } + }, + }); +} async function handleWrittenHistoryEvent({ topic, @@ -399,7 +418,7 @@ const controlApi = startControlApi({ path: '/pause', handler: () => { state.paused = true; - consumer.pause(topics.map((topic) => ({ topic }))); + pauseConsumers(); return { ok: true, paused: true }; }, }, @@ -408,7 +427,7 @@ const controlApi = startControlApi({ path: '/resume', handler: () => { state.paused = false; - consumer.resume(topics.map((topic) => ({ topic }))); + resumeConsumers(); return { ok: true, paused: false }; }, }, @@ -418,7 +437,7 @@ const controlApi = startControlApi({ handler: () => { state.draining = true; state.paused = true; - consumer.pause(topics.map((topic) => ({ topic }))); + pauseConsumers(); setTimeout(() => shutdown(), 0); return { ok: true, draining: true }; }, @@ -568,9 +587,26 @@ function summarizePortfolioMetric(metric) { }; } +function topicRefs(topicNames) { + return topicNames.map((topic) => ({ topic })); +} + +function pauseConsumers() { + consumer.pause(topicRefs(topics)); + rawQuoteConsumer.pause(topicRefs(rawQuoteTopics)); +} + +function resumeConsumers() { + consumer.resume(topicRefs(topics)); + rawQuoteConsumer.resume(topicRefs(rawQuoteTopics)); +} + async function shutdown() { await controlApi.close().catch(() => {}); - await consumer.disconnect(); + await Promise.allSettled([ + consumer.disconnect(), + rawQuoteConsumer.disconnect(), + ]); await pool.end(); process.exit(0); } diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index acf7a7d..89c83e7 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -5,8 +5,12 @@ import { readFileSync } from 'node:fs'; const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.meta.url), 'utf8'); test('history writer replays durable topics but joins the raw quote firehose live', () => { - assert.match(source, /fromBeginning:\s*topic !== config\.kafkaTopicRawNearIntentsQuote/); + assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/); + assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/); + assert.match(source, /consumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); assert.match(source, /Raw quote volume is a live firehose/); + assert.match(source, /runHistoryConsumer\(consumer\)/); + assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/); assert.match(source, /eachBatch/); assert.match(source, /insertHistoryEvents/); });