Split raw history consumer
All checks were successful
deploy / deploy (push) Successful in 45s

Proof: raw quote persistence now uses a dedicated history consumer group so raw quote firehose volume cannot starve durable normalized quote, decision, command, result, and outcome evidence topics in the main history-writer group.

Assumptions: raw quote persistence can join live in a dedicated group without changing event schemas or strategy behavior; no live pair, edge, notional, inventory, arming, or response-policy settings are changed.

Still fake: venue-native terminal fill ids and fee-complete realized PnL remain unavailable; historical backlog catch-up still depends on Kafka and Postgres throughput after deployment.
This commit is contained in:
philipp 2026-05-19 16:00:54 +02:00
parent 348c4f9b0b
commit 5f2380fdc0
2 changed files with 101 additions and 61 deletions

View file

@ -84,9 +84,14 @@ const consumer = await createConsumer({
clientId: config.kafkaClientId,
logger,
});
const rawQuoteConsumer = await createConsumer({
groupId: `${config.kafkaConsumerGroupHistory}-raw`,
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
const topics = [
config.kafkaTopicRawNearIntentsQuote,
config.kafkaTopicNormSwapDemand,
config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory,
@ -98,6 +103,9 @@ const topics = [
config.kafkaTopicCmdExecuteTrade,
config.kafkaTopicExecTradeResult,
];
const rawQuoteTopics = [
config.kafkaTopicRawNearIntentsQuote,
];
const portfolioMetricTopics = new Set([
config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory,
@ -115,11 +123,17 @@ const intentRequestOutcomeTopics = new Set([
]);
for (const topic of topics) {
// Raw quote volume is a live firehose; replaying retained history can starve
// durable strategy/execution topics and exhaust the writer.
await consumer.subscribe({
topic,
fromBeginning: topic !== config.kafkaTopicRawNearIntentsQuote,
fromBeginning: true,
});
}
for (const topic of rawQuoteTopics) {
// Raw quote volume is a live firehose. It gets a dedicated consumer group so
// raw storage cannot starve durable strategy/execution topics.
await rawQuoteConsumer.subscribe({
topic,
fromBeginning: false,
});
}
@ -164,70 +178,75 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => {
state.intent_request_outcomes_error = serializeError(error);
});
await consumer.run({
eachBatch: async ({ batch, heartbeat }) => {
if (state.paused) return;
await runHistoryConsumer(consumer);
await runHistoryConsumer(rawQuoteConsumer);
const contexts = [];
const batchEntries = [];
async function runHistoryConsumer(historyConsumer) {
await historyConsumer.run({
eachBatch: async ({ batch, heartbeat }) => {
if (state.paused) return;
for (const message of batch.messages) {
if (!message.value) continue;
try {
const event = parseEventMessage(message.value.toString());
const routed = routeHistoryRecord({ topic: batch.topic, event });
const context = {
topic: batch.topic,
partition: batch.partition,
message,
event,
routed,
writeResult: null,
};
const contexts = [];
const batchEntries = [];
contexts.push(context);
if (batch.topic === config.kafkaTopicOpsEnvironmentStatus) {
context.writeResult = await insertEnvironmentStatusChange(pool, {
for (const message of batch.messages) {
if (!message.value) continue;
try {
const event = parseEventMessage(message.value.toString());
const routed = routeHistoryRecord({ topic: batch.topic, event });
const context = {
topic: batch.topic,
partition: batch.partition,
message,
event,
record: routed.record,
});
} else {
batchEntries.push({
table: routed.table,
topic: batch.topic,
event,
record: routed.record,
});
routed,
writeResult: null,
};
contexts.push(context);
if (batch.topic === config.kafkaTopicOpsEnvironmentStatus) {
context.writeResult = await insertEnvironmentStatusChange(pool, {
topic: batch.topic,
event,
record: routed.record,
});
} else {
batchEntries.push({
table: routed.table,
topic: batch.topic,
event,
record: routed.record,
});
}
} catch (error) {
recordHistoryError(batch.topic, error);
}
}
let insertedEventIds = new Set();
try {
({ insertedEventIds } = await insertHistoryEvents(pool, batchEntries));
} catch (error) {
recordHistoryError(batch.topic, error);
throw error;
}
}
let insertedEventIds = new Set();
try {
({ insertedEventIds } = await insertHistoryEvents(pool, batchEntries));
} catch (error) {
recordHistoryError(batch.topic, error);
throw error;
}
for (const context of contexts) {
if (!context.writeResult) {
context.writeResult = {
inserted: insertedEventIds.has(context.event.event_id),
};
for (const context of contexts) {
if (!context.writeResult) {
context.writeResult = {
inserted: insertedEventIds.has(context.event.event_id),
};
}
await handleWrittenHistoryEvent(context);
await heartbeat();
}
await handleWrittenHistoryEvent(context);
await heartbeat();
}
if (state.draining) {
setTimeout(() => shutdown(), 0);
}
},
});
if (state.draining) {
setTimeout(() => shutdown(), 0);
}
},
});
}
async function handleWrittenHistoryEvent({
topic,
@ -399,7 +418,7 @@ const controlApi = startControlApi({
path: '/pause',
handler: () => {
state.paused = true;
consumer.pause(topics.map((topic) => ({ topic })));
pauseConsumers();
return { ok: true, paused: true };
},
},
@ -408,7 +427,7 @@ const controlApi = startControlApi({
path: '/resume',
handler: () => {
state.paused = false;
consumer.resume(topics.map((topic) => ({ topic })));
resumeConsumers();
return { ok: true, paused: false };
},
},
@ -418,7 +437,7 @@ const controlApi = startControlApi({
handler: () => {
state.draining = true;
state.paused = true;
consumer.pause(topics.map((topic) => ({ topic })));
pauseConsumers();
setTimeout(() => shutdown(), 0);
return { ok: true, draining: true };
},
@ -568,9 +587,26 @@ function summarizePortfolioMetric(metric) {
};
}
function topicRefs(topicNames) {
return topicNames.map((topic) => ({ topic }));
}
function pauseConsumers() {
consumer.pause(topicRefs(topics));
rawQuoteConsumer.pause(topicRefs(rawQuoteTopics));
}
function resumeConsumers() {
consumer.resume(topicRefs(topics));
rawQuoteConsumer.resume(topicRefs(rawQuoteTopics));
}
async function shutdown() {
await controlApi.close().catch(() => {});
await consumer.disconnect();
await Promise.allSettled([
consumer.disconnect(),
rawQuoteConsumer.disconnect(),
]);
await pool.end();
process.exit(0);
}

View file

@ -5,8 +5,12 @@ import { readFileSync } from 'node:fs';
const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.meta.url), 'utf8');
test('history writer replays durable topics but joins the raw quote firehose live', () => {
assert.match(source, /fromBeginning:\s*topic !== config\.kafkaTopicRawNearIntentsQuote/);
assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/);
assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/);
assert.match(source, /consumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/);
assert.match(source, /Raw quote volume is a live firehose/);
assert.match(source, /runHistoryConsumer\(consumer\)/);
assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/);
assert.match(source, /eachBatch/);
assert.match(source, /insertHistoryEvents/);
});