Keep live quote decision history current
All checks were successful
deploy / deploy (push) Successful in 1m0s

Proof: history-writer now adds a repo-owned live evidence consumer group for normalized quotes and trade decisions while the existing durable group continues replaying retained backlog, so current maker timing and strategy truth can be persisted without abandoning old rows.

Assumptions: duplicate inserts are safe through existing event_id primary keys and bulk insert conflict handling; this changes only persistence catch-up behavior, not strategy decisions, relay submissions, pair enablement, edge, notional, inventory, arming, or response policy.

Still fake: venue-native terminal fill ids and fee-complete realized PnL remain unavailable; older normalized quote and decision backlog still depends on the durable replay group draining over time.
This commit is contained in:
philipp 2026-05-19 16:13:57 +02:00
parent d4280b1d71
commit b6646fb7a3
2 changed files with 39 additions and 0 deletions

View file

@ -88,6 +88,16 @@ for (let index = 0; index < durableHistoryConsumerCount; index += 1) {
logger, logger,
})); }));
} }
const liveEvidenceConsumerCount = 2;
const liveEvidenceConsumers = [];
for (let index = 0; index < liveEvidenceConsumerCount; index += 1) {
liveEvidenceConsumers.push(await createConsumer({
groupId: `${config.kafkaConsumerGroupHistory}-live`,
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
}));
}
const rawQuoteConsumer = await createConsumer({ const rawQuoteConsumer = await createConsumer({
groupId: `${config.kafkaConsumerGroupHistory}-raw`, groupId: `${config.kafkaConsumerGroupHistory}-raw`,
brokers: config.kafkaBrokers, brokers: config.kafkaBrokers,
@ -110,6 +120,10 @@ const topics = [
const rawQuoteTopics = [ const rawQuoteTopics = [
config.kafkaTopicRawNearIntentsQuote, config.kafkaTopicRawNearIntentsQuote,
]; ];
const liveEvidenceTopics = [
config.kafkaTopicNormSwapDemand,
config.kafkaTopicDecisionTradeDecision,
];
const portfolioMetricTopics = new Set([ const portfolioMetricTopics = new Set([
config.kafkaTopicRefMarketPrice, config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory, config.kafkaTopicStateIntentInventory,
@ -134,6 +148,16 @@ for (const historyConsumer of durableConsumers) {
}); });
} }
} }
for (const historyConsumer of liveEvidenceConsumers) {
for (const topic of liveEvidenceTopics) {
// The durable group keeps replaying retained history. This live group keeps
// current quote/decision truth visible while that backlog drains.
await historyConsumer.subscribe({
topic,
fromBeginning: false,
});
}
}
for (const topic of rawQuoteTopics) { for (const topic of rawQuoteTopics) {
// Raw quote volume is a live firehose. It gets a dedicated consumer group so // Raw quote volume is a live firehose. It gets a dedicated consumer group so
// raw storage cannot starve durable strategy/execution topics. // raw storage cannot starve durable strategy/execution topics.
@ -187,6 +211,9 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => {
for (const historyConsumer of durableConsumers) { for (const historyConsumer of durableConsumers) {
await runHistoryConsumer(historyConsumer); await runHistoryConsumer(historyConsumer);
} }
for (const historyConsumer of liveEvidenceConsumers) {
await runHistoryConsumer(historyConsumer);
}
await runHistoryConsumer(rawQuoteConsumer); await runHistoryConsumer(rawQuoteConsumer);
async function runHistoryConsumer(historyConsumer) { async function runHistoryConsumer(historyConsumer) {
@ -603,6 +630,9 @@ function pauseConsumers() {
for (const historyConsumer of durableConsumers) { for (const historyConsumer of durableConsumers) {
historyConsumer.pause(topicRefs(topics)); historyConsumer.pause(topicRefs(topics));
} }
for (const historyConsumer of liveEvidenceConsumers) {
historyConsumer.pause(topicRefs(liveEvidenceTopics));
}
rawQuoteConsumer.pause(topicRefs(rawQuoteTopics)); rawQuoteConsumer.pause(topicRefs(rawQuoteTopics));
} }
@ -610,6 +640,9 @@ function resumeConsumers() {
for (const historyConsumer of durableConsumers) { for (const historyConsumer of durableConsumers) {
historyConsumer.resume(topicRefs(topics)); historyConsumer.resume(topicRefs(topics));
} }
for (const historyConsumer of liveEvidenceConsumers) {
historyConsumer.resume(topicRefs(liveEvidenceTopics));
}
rawQuoteConsumer.resume(topicRefs(rawQuoteTopics)); rawQuoteConsumer.resume(topicRefs(rawQuoteTopics));
} }
@ -617,6 +650,7 @@ async function shutdown() {
await controlApi.close().catch(() => {}); await controlApi.close().catch(() => {});
await Promise.allSettled([ await Promise.allSettled([
...durableConsumers.map((historyConsumer) => historyConsumer.disconnect()), ...durableConsumers.map((historyConsumer) => historyConsumer.disconnect()),
...liveEvidenceConsumers.map((historyConsumer) => historyConsumer.disconnect()),
rawQuoteConsumer.disconnect(), rawQuoteConsumer.disconnect(),
]); ]);
await pool.end(); await pool.end();

View file

@ -7,9 +7,14 @@ const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.met
test('history writer replays durable topics but joins the raw quote firehose live', () => { test('history writer replays durable topics but joins the raw quote firehose live', () => {
assert.match(source, /durableHistoryConsumerCount\s*=\s*3/); assert.match(source, /durableHistoryConsumerCount\s*=\s*3/);
assert.match(source, /durableConsumers\.push\(await createConsumer/); assert.match(source, /durableConsumers\.push\(await createConsumer/);
assert.match(source, /liveEvidenceConsumerCount\s*=\s*2/);
assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-live`/);
assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/); assert.match(source, /groupId:\s*`\$\{config\.kafkaConsumerGroupHistory\}-raw`/);
assert.match(source, /liveEvidenceTopics\s*=\s*\[[\s\S]+config\.kafkaTopicNormSwapDemand[\s\S]+config\.kafkaTopicDecisionTradeDecision/);
assert.match(source, /current quote\/decision truth visible/);
assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/); assert.match(source, /rawQuoteConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*false/);
assert.match(source, /historyConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/); assert.match(source, /historyConsumer\.subscribe\(\{[\s\S]+fromBeginning:\s*true/);
assert.match(source, /liveEvidenceTopics[\s\S]+fromBeginning:\s*false/);
assert.match(source, /Raw quote volume is a live firehose/); assert.match(source, /Raw quote volume is a live firehose/);
assert.match(source, /runHistoryConsumer\(historyConsumer\)/); assert.match(source, /runHistoryConsumer\(historyConsumer\)/);
assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/); assert.match(source, /runHistoryConsumer\(rawQuoteConsumer\)/);