diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 469563d..1dc5776 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -78,12 +78,16 @@ const notificationDispatcher = createNotificationDispatcher({ }, }); -const consumer = await createConsumer({ - groupId: config.kafkaConsumerGroupHistory, - brokers: config.kafkaBrokers, - clientId: config.kafkaClientId, - logger, -}); +const durableHistoryConsumerCount = 3; +const durableConsumers = []; +for (let index = 0; index < durableHistoryConsumerCount; index += 1) { + durableConsumers.push(await createConsumer({ + groupId: config.kafkaConsumerGroupHistory, + brokers: config.kafkaBrokers, + clientId: config.kafkaClientId, + logger, + })); +} const rawQuoteConsumer = await createConsumer({ groupId: `${config.kafkaConsumerGroupHistory}-raw`, brokers: config.kafkaBrokers, @@ -122,11 +126,13 @@ const intentRequestOutcomeTopics = new Set([ config.kafkaTopicStateIntentInventory, ]); -for (const topic of topics) { - await consumer.subscribe({ - topic, - fromBeginning: true, - }); +for (const historyConsumer of durableConsumers) { + for (const topic of topics) { + await historyConsumer.subscribe({ + topic, + fromBeginning: true, + }); + } } for (const topic of rawQuoteTopics) { // Raw quote volume is a live firehose. It gets a dedicated consumer group so @@ -178,7 +184,9 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => { state.intent_request_outcomes_error = serializeError(error); }); -await runHistoryConsumer(consumer); +for (const historyConsumer of durableConsumers) { + await runHistoryConsumer(historyConsumer); +} await runHistoryConsumer(rawQuoteConsumer); async function runHistoryConsumer(historyConsumer) { @@ -592,19 +600,23 @@ function topicRefs(topicNames) { } function pauseConsumers() { - consumer.pause(topicRefs(topics)); + for (const historyConsumer of durableConsumers) { + historyConsumer.pause(topicRefs(topics)); + } rawQuoteConsumer.pause(topicRefs(rawQuoteTopics)); } function resumeConsumers() { - consumer.resume(topicRefs(topics)); + for (const historyConsumer of durableConsumers) { + historyConsumer.resume(topicRefs(topics)); + } rawQuoteConsumer.resume(topicRefs(rawQuoteTopics)); } async function shutdown() { await controlApi.close().catch(() => {}); await Promise.allSettled([ - consumer.disconnect(), + ...durableConsumers.map((historyConsumer) => historyConsumer.disconnect()), rawQuoteConsumer.disconnect(), ]); await pool.end(); diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index 89c83e7..d568ae8 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -5,11 +5,13 @@ import { readFileSync } from 'node:fs'; const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.meta.url), 'utf8'); test('history writer replays durable topics but joins the raw quote firehose live', () => { + assert.match(source, /durableHistoryConsumerCount\s*=\s*3/); + assert.match(source, /durableConsumers\.push\(await createConsumer/); assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/); assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/); - assert.match(source, /consumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); + assert.match(source, /historyConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); assert.match(source, /Raw quote volume is a live firehose/); - assert.match(source, /runHistoryConsumer\(consumer\)/); + assert.match(source, /runHistoryConsumer\(historyConsumer\)/); assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/); assert.match(source, /eachBatch/); assert.match(source, /insertHistoryEvents/);