From d4280b1d71d626b473f2499001def78073aeb0db Mon Sep 17 00:00:00 2001 From: philipp Date: Tue, 19 May 2026 16:06:33 +0200 Subject: [PATCH] Parallelize durable history consumers Proof: history-writer now runs multiple members in the existing durable consumer group so normalized quote, decision, command, result, inventory, and price topics can be assigned across workers while preserving existing group offsets and durable table writes. Assumptions: three in-process durable group members stay within the existing service resource envelope and improve catch-up without changing strategy decisions, relay submissions, pair enablement, edge, notional, inventory, or arming behavior. Still fake: venue-native terminal fill ids and fee-complete realized PnL remain unavailable; historical backlog catch-up still depends on live Kafka/Postgres throughput. --- src/apps/history-writer.mjs | 42 ++++++++++++++++++----------- test/history-writer-static.test.mjs | 6 +++-- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 469563d..1dc5776 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -78,12 +78,16 @@ const notificationDispatcher = createNotificationDispatcher({ }, }); -const consumer = await createConsumer({ - groupId: config.kafkaConsumerGroupHistory, - brokers: config.kafkaBrokers, - clientId: config.kafkaClientId, - logger, -}); +const durableHistoryConsumerCount = 3; +const durableConsumers = []; +for (let index = 0; index < durableHistoryConsumerCount; index += 1) { + durableConsumers.push(await createConsumer({ + groupId: config.kafkaConsumerGroupHistory, + brokers: config.kafkaBrokers, + clientId: config.kafkaClientId, + logger, + })); +} const rawQuoteConsumer = await createConsumer({ groupId: `${config.kafkaConsumerGroupHistory}-raw`, brokers: config.kafkaBrokers, @@ -122,11 +126,13 @@ const intentRequestOutcomeTopics = new Set([ config.kafkaTopicStateIntentInventory, ]); -for (const topic of topics) { - await consumer.subscribe({ - topic, - fromBeginning: true, - }); +for (const historyConsumer of durableConsumers) { + for (const topic of topics) { + await historyConsumer.subscribe({ + topic, + fromBeginning: true, + }); + } } for (const topic of rawQuoteTopics) { // Raw quote volume is a live firehose. It gets a dedicated consumer group so @@ -178,7 +184,9 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => { state.intent_request_outcomes_error = serializeError(error); }); -await runHistoryConsumer(consumer); +for (const historyConsumer of durableConsumers) { + await runHistoryConsumer(historyConsumer); +} await runHistoryConsumer(rawQuoteConsumer); async function runHistoryConsumer(historyConsumer) { @@ -592,19 +600,23 @@ function topicRefs(topicNames) { } function pauseConsumers() { - consumer.pause(topicRefs(topics)); + for (const historyConsumer of durableConsumers) { + historyConsumer.pause(topicRefs(topics)); + } rawQuoteConsumer.pause(topicRefs(rawQuoteTopics)); } function resumeConsumers() { - consumer.resume(topicRefs(topics)); + for (const historyConsumer of durableConsumers) { + historyConsumer.resume(topicRefs(topics)); + } rawQuoteConsumer.resume(topicRefs(rawQuoteTopics)); } async function shutdown() { await controlApi.close().catch(() => {}); await Promise.allSettled([ - consumer.disconnect(), + ...durableConsumers.map((historyConsumer) => historyConsumer.disconnect()), rawQuoteConsumer.disconnect(), ]); await pool.end(); diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index 89c83e7..d568ae8 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -5,11 +5,13 @@ import { readFileSync } from 'node:fs'; const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.meta.url), 'utf8'); test('history writer replays durable topics but joins the raw quote firehose live', () => { + assert.match(source, /durableHistoryConsumerCount\s*=\s*3/); + assert.match(source, /durableConsumers\.push\(await createConsumer/); assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/); assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/); - assert.match(source, /consumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); + assert.match(source, /historyConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); assert.match(source, /Raw quote volume is a live firehose/); - assert.match(source, /runHistoryConsumer\(consumer\)/); + assert.match(source, /runHistoryConsumer\(historyConsumer\)/); assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/); assert.match(source, /eachBatch/); assert.match(source, /insertHistoryEvents/);