Parallelize durable history consumers
All checks were successful
deploy / deploy (push) Successful in 52s

Proof: history-writer now runs multiple members in the existing durable consumer group so normalized quote, decision, command, result, inventory, and price topics can be assigned across workers while preserving existing group offsets and durable table writes.

Assumptions: three in-process durable group members stay within the existing service resource envelope and improve catch-up without changing strategy decisions, relay submissions, pair enablement, edge, notional, inventory, or arming behavior.

Still fake: venue-native terminal fill ids and fee-complete realized PnL remain unavailable; historical backlog catch-up still depends on live Kafka/Postgres throughput.
This commit is contained in:
philipp 2026-05-19 16:06:33 +02:00
parent 5f2380fdc0
commit d4280b1d71
2 changed files with 31 additions and 17 deletions

View file

@ -78,12 +78,16 @@ const notificationDispatcher = createNotificationDispatcher({
}, },
}); });
const consumer = await createConsumer({ const durableHistoryConsumerCount = 3;
groupId: config.kafkaConsumerGroupHistory, const durableConsumers = [];
brokers: config.kafkaBrokers, for (let index = 0; index < durableHistoryConsumerCount; index += 1) {
clientId: config.kafkaClientId, durableConsumers.push(await createConsumer({
logger, groupId: config.kafkaConsumerGroupHistory,
}); brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
}));
}
const rawQuoteConsumer = await createConsumer({ const rawQuoteConsumer = await createConsumer({
groupId: `${config.kafkaConsumerGroupHistory}-raw`, groupId: `${config.kafkaConsumerGroupHistory}-raw`,
brokers: config.kafkaBrokers, brokers: config.kafkaBrokers,
@ -122,11 +126,13 @@ const intentRequestOutcomeTopics = new Set([
config.kafkaTopicStateIntentInventory, config.kafkaTopicStateIntentInventory,
]); ]);
for (const topic of topics) { for (const historyConsumer of durableConsumers) {
await consumer.subscribe({ for (const topic of topics) {
topic, await historyConsumer.subscribe({
fromBeginning: true, topic,
}); fromBeginning: true,
});
}
} }
for (const topic of rawQuoteTopics) { for (const topic of rawQuoteTopics) {
// Raw quote volume is a live firehose. It gets a dedicated consumer group so // Raw quote volume is a live firehose. It gets a dedicated consumer group so
@ -178,7 +184,9 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => {
state.intent_request_outcomes_error = serializeError(error); state.intent_request_outcomes_error = serializeError(error);
}); });
await runHistoryConsumer(consumer); for (const historyConsumer of durableConsumers) {
await runHistoryConsumer(historyConsumer);
}
await runHistoryConsumer(rawQuoteConsumer); await runHistoryConsumer(rawQuoteConsumer);
async function runHistoryConsumer(historyConsumer) { async function runHistoryConsumer(historyConsumer) {
@ -592,19 +600,23 @@ function topicRefs(topicNames) {
} }
function pauseConsumers() { function pauseConsumers() {
consumer.pause(topicRefs(topics)); for (const historyConsumer of durableConsumers) {
historyConsumer.pause(topicRefs(topics));
}
rawQuoteConsumer.pause(topicRefs(rawQuoteTopics)); rawQuoteConsumer.pause(topicRefs(rawQuoteTopics));
} }
function resumeConsumers() { function resumeConsumers() {
consumer.resume(topicRefs(topics)); for (const historyConsumer of durableConsumers) {
historyConsumer.resume(topicRefs(topics));
}
rawQuoteConsumer.resume(topicRefs(rawQuoteTopics)); rawQuoteConsumer.resume(topicRefs(rawQuoteTopics));
} }
async function shutdown() { async function shutdown() {
await controlApi.close().catch(() => {}); await controlApi.close().catch(() => {});
await Promise.allSettled([ await Promise.allSettled([
consumer.disconnect(), ...durableConsumers.map((historyConsumer) => historyConsumer.disconnect()),
rawQuoteConsumer.disconnect(), rawQuoteConsumer.disconnect(),
]); ]);
await pool.end(); await pool.end();

View file

@ -5,11 +5,13 @@ import { readFileSync } from 'node:fs';
const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.meta.url), 'utf8'); const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.meta.url), 'utf8');
test('history writer replays durable topics but joins the raw quote firehose live', () => { test('history writer replays durable topics but joins the raw quote firehose live', () => {
assert.match(source, /durableHistoryConsumerCount\s*=\s*3/);
assert.match(source, /durableConsumers\.push\(await createConsumer/);
assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/); assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/);
assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/); assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/);
assert.match(source, /consumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); assert.match(source, /historyConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/);
assert.match(source, /Raw quote volume is a live firehose/); assert.match(source, /Raw quote volume is a live firehose/);
assert.match(source, /runHistoryConsumer\(consumer\)/); assert.match(source, /runHistoryConsumer\(historyConsumer\)/);
assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/); assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/);
assert.match(source, /eachBatch/); assert.match(source, /eachBatch/);
assert.match(source, /insertHistoryEvents/); assert.match(source, /insertHistoryEvents/);