From 348c4f9b0bb9d703cc91f70a29d0fa24e513aa63 Mon Sep 17 00:00:00 2001 From: philipp Date: Tue, 19 May 2026 15:53:52 +0200 Subject: [PATCH] Batch durable history writes Proof: History-writer now writes routed event batches with one bulk insert per table, preserving decision and normalized quote history while reducing Kafka lag that delayed durable strategy decision rows. Assumptions: Kafka message order within a topic partition remains sufficient for durability; environment status events keep their dedicated dedupe path; this change does not alter strategy, edge, notional, inventory, arming, or relay submission behavior. Still fake: Venue-native terminal fill ids and fee-complete realized PnL remain unavailable; historical decision rows can still lag until the deployed batch writer drains existing backlog. --- src/apps/history-writer.mjs | 258 +++++++++++++++++----------- src/lib/postgres.mjs | 72 +++++++- test/history-writer-static.test.mjs | 2 + test/inventory-and-history.test.mjs | 50 ++++++ 4 files changed, 276 insertions(+), 106 deletions(-) diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 2353e8c..3392efe 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -23,7 +23,7 @@ import { ensureHistorySchema, insertEnvironmentStatusChange, finishNotificationDelivery, - insertHistoryEvent, + insertHistoryEvents, loadLatestPortfolioMetric, loadPortfolioMetricInputs, refreshIntentRequestOutcomes, @@ -165,123 +165,171 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => { }); await consumer.run({ - eachMessage: async ({ topic, partition, message }) => { - if (!message.value || state.paused) return; + eachBatch: async ({ batch, heartbeat }) => { + if (state.paused) return; - try { - const event = parseEventMessage(message.value.toString()); - const routed = routeHistoryRecord({ topic, event }); - const writeResult = topic === config.kafkaTopicOpsEnvironmentStatus - ? await insertEnvironmentStatusChange(pool, { - topic, - event, - record: routed.record, - }) - : await insertHistoryEvent(pool, { - table: routed.table, - topic, - event, - record: routed.record, - }).then(() => ({ inserted: true })); + const contexts = []; + const batchEntries = []; - const handledAt = new Date().toISOString(); - if (writeResult.inserted) { - state.last_write_at = handledAt; - } - state.last_error = null; - state.offsets[topic] = { - partition, - offset: message.offset, - }; - const shouldRunDerivedRefresh = shouldRunDerivedRefreshForEvent({ - event, - now: handledAt, - maxEventAgeMs: config.historyWriterDerivedRefreshMaxEventAgeMs, - }); - if (!shouldRunDerivedRefresh) { - state.derived_refresh_skipped_count += 1; - state.last_derived_refresh_skipped_at = handledAt; - state.last_derived_refresh_skipped_topic = topic; - } - if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) { - state.last_funding_observation_write_at = state.last_write_at; - } - if (topic === config.kafkaTopicOpsAlert && writeResult.inserted) { - state.last_alert_write_at = state.last_write_at; - } - if (topic === config.kafkaTopicOpsEnvironmentStatus) { - state.last_environment_status_seen_at = handledAt; - state.last_environment_status_fingerprint = writeResult.status_fingerprint || event.payload.status_fingerprint || null; - if (writeResult.inserted) { - state.last_environment_status_write_at = handledAt; + for (const message of batch.messages) { + if (!message.value) continue; + try { + const event = parseEventMessage(message.value.toString()); + const routed = routeHistoryRecord({ topic: batch.topic, event }); + const context = { + topic: batch.topic, + partition: batch.partition, + message, + event, + routed, + writeResult: null, + }; + + contexts.push(context); + if (batch.topic === config.kafkaTopicOpsEnvironmentStatus) { + context.writeResult = await insertEnvironmentStatusChange(pool, { + topic: batch.topic, + event, + record: routed.record, + }); } else { - state.last_environment_status_duplicate_at = handledAt; - } - } - if (shouldRunDerivedRefresh) { - await publishLiquidityNotification({ topic, event }); - } - if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) { - try { - await refreshPortfolioMetrics(); - } catch (error) { - state.metrics_error = serializeError(error); - logger.error('portfolio_metrics_refresh_failed', { - topic, - details: { - error: serializeError(error), - }, - }); - } - } - if (shouldRunDerivedRefresh && quoteOutcomeTopics.has(topic)) { - try { - const records = await refreshQuoteOutcomeAttributions(); - await publishQuoteOutcomeNotifications(records, { - minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, - }); - } catch (error) { - state.quote_outcomes_error = serializeError(error); - logger.error('quote_outcomes_refresh_failed', { - topic, - details: { - error: serializeError(error), - }, - }); - } - } - if (shouldRunDerivedRefresh && intentRequestOutcomeTopics.has(topic)) { - try { - const records = await refreshIntentRequestOutcomeAttributions(); - await publishIntentRequestOutcomeNotifications(records, { - minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, - }); - } catch (error) { - state.intent_request_outcomes_error = serializeError(error); - logger.error('intent_request_outcomes_refresh_failed', { - topic, - details: { - error: serializeError(error), - }, + batchEntries.push({ + table: routed.table, + topic: batch.topic, + event, + record: routed.record, }); } + } catch (error) { + recordHistoryError(batch.topic, error); } + } + + let insertedEventIds = new Set(); + try { + ({ insertedEventIds } = await insertHistoryEvents(pool, batchEntries)); } catch (error) { - state.last_error = serializeError(error); - state.error_count += 1; - logger.error('history_write_failed', { + recordHistoryError(batch.topic, error); + throw error; + } + + for (const context of contexts) { + if (!context.writeResult) { + context.writeResult = { + inserted: insertedEventIds.has(context.event.event_id), + }; + } + await handleWrittenHistoryEvent(context); + await heartbeat(); + } + + if (state.draining) { + setTimeout(() => shutdown(), 0); + } + }, +}); + +async function handleWrittenHistoryEvent({ + topic, + partition, + message, + event, + writeResult, +}) { + const handledAt = new Date().toISOString(); + if (writeResult.inserted) { + state.last_write_at = handledAt; + } + state.last_error = null; + state.offsets[topic] = { + partition, + offset: message.offset, + }; + const shouldRunDerivedRefresh = shouldRunDerivedRefreshForEvent({ + event, + now: handledAt, + maxEventAgeMs: config.historyWriterDerivedRefreshMaxEventAgeMs, + }); + if (!shouldRunDerivedRefresh) { + state.derived_refresh_skipped_count += 1; + state.last_derived_refresh_skipped_at = handledAt; + state.last_derived_refresh_skipped_topic = topic; + } + if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) { + state.last_funding_observation_write_at = state.last_write_at; + } + if (topic === config.kafkaTopicOpsAlert && writeResult.inserted) { + state.last_alert_write_at = state.last_write_at; + } + if (topic === config.kafkaTopicOpsEnvironmentStatus) { + state.last_environment_status_seen_at = handledAt; + state.last_environment_status_fingerprint = writeResult.status_fingerprint || event.payload.status_fingerprint || null; + if (writeResult.inserted) { + state.last_environment_status_write_at = handledAt; + } else { + state.last_environment_status_duplicate_at = handledAt; + } + } + if (shouldRunDerivedRefresh) { + await publishLiquidityNotification({ topic, event }); + } + if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) { + try { + await refreshPortfolioMetrics(); + } catch (error) { + state.metrics_error = serializeError(error); + logger.error('portfolio_metrics_refresh_failed', { topic, details: { error: serializeError(error), }, }); - } finally { - if (state.draining) { - setTimeout(() => shutdown(), 0); - } } - }, -}); + } + if (shouldRunDerivedRefresh && quoteOutcomeTopics.has(topic)) { + try { + const records = await refreshQuoteOutcomeAttributions(); + await publishQuoteOutcomeNotifications(records, { + minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, + }); + } catch (error) { + state.quote_outcomes_error = serializeError(error); + logger.error('quote_outcomes_refresh_failed', { + topic, + details: { + error: serializeError(error), + }, + }); + } + } + if (shouldRunDerivedRefresh && intentRequestOutcomeTopics.has(topic)) { + try { + const records = await refreshIntentRequestOutcomeAttributions(); + await publishIntentRequestOutcomeNotifications(records, { + minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, + }); + } catch (error) { + state.intent_request_outcomes_error = serializeError(error); + logger.error('intent_request_outcomes_refresh_failed', { + topic, + details: { + error: serializeError(error), + }, + }); + } + } +} + +function recordHistoryError(topic, error) { + state.last_error = serializeError(error); + state.error_count += 1; + logger.error('history_write_failed', { + topic, + details: { + error: serializeError(error), + }, + }); +} const controlApi = startControlApi({ host: config.historyWriterControlHost, diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index 8c2306c..5110a40 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -2003,7 +2003,7 @@ function importedAssetChanged(previous, next) { } export async function insertHistoryEvent(pool, { table, topic, event, record }) { - await pool.query( + const result = await pool.query( ` INSERT INTO ${table} ( event_id, @@ -2022,6 +2022,7 @@ export async function insertHistoryEvent(pool, { table, topic, event, record }) $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11::jsonb,$12::jsonb ) ON CONFLICT (event_id) DO NOTHING + RETURNING event_id `, [ event.event_id, @@ -2038,6 +2039,75 @@ export async function insertHistoryEvent(pool, { table, topic, event, record }) event.raw ? JSON.stringify(event.raw) : null, ], ); + return { inserted: result.rowCount > 0 }; +} + +export async function insertHistoryEvents(pool, entries = []) { + const insertedEventIds = new Set(); + const normalizedEntries = (entries || []).filter(Boolean); + if (!normalizedEntries.length) return { insertedEventIds }; + + const entriesByTable = new Map(); + for (const entry of normalizedEntries) { + if (!TABLES.includes(entry.table)) { + throw new Error(`unsupported history table: ${entry.table}`); + } + const list = entriesByTable.get(entry.table) || []; + list.push(entry); + entriesByTable.set(entry.table, list); + } + + for (const [table, tableEntries] of entriesByTable.entries()) { + for (let start = 0; start < tableEntries.length; start += 1000) { + const chunk = tableEntries.slice(start, start + 1000); + const values = []; + const placeholders = chunk.map((entry, index) => { + const offset = index * 12; + values.push( + entry.event.event_id, + entry.topic, + entry.event.venue, + entry.event.source, + entry.event.event_type, + entry.event.observed_at, + entry.event.ingested_at, + entry.record.quote_id, + entry.record.pair, + entry.record.decision_key, + JSON.stringify(entry.event.payload), + entry.event.raw ? JSON.stringify(entry.event.raw) : null, + ); + return `($${offset + 1},$${offset + 2},$${offset + 3},$${offset + 4},$${offset + 5},$${offset + 6},$${offset + 7},$${offset + 8},$${offset + 9},$${offset + 10},$${offset + 11}::jsonb,$${offset + 12}::jsonb)`; + }).join(','); + + const result = await pool.query( + ` + INSERT INTO ${table} ( + event_id, + topic, + venue, + source, + event_type, + observed_at, + ingested_at, + quote_id, + pair, + decision_key, + payload, + raw + ) VALUES ${placeholders} + ON CONFLICT (event_id) DO NOTHING + RETURNING event_id + `, + values, + ); + for (const row of result.rows || []) { + insertedEventIds.add(row.event_id); + } + } + } + + return { insertedEventIds }; } export async function insertEnvironmentStatusChange(pool, { topic, event, record }) { diff --git a/test/history-writer-static.test.mjs b/test/history-writer-static.test.mjs index 1129697..acf7a7d 100644 --- a/test/history-writer-static.test.mjs +++ b/test/history-writer-static.test.mjs @@ -7,6 +7,8 @@ const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.met test('history writer replays durable topics but joins the raw quote firehose live', () => { assert.match(source, /fromBeginning:\s*topic !== config\.kafkaTopicRawNearIntentsQuote/); assert.match(source, /Raw quote volume is a live firehose/); + assert.match(source, /eachBatch/); + assert.match(source, /insertHistoryEvents/); }); test('history writer passes tracked assets into portfolio valuation', () => { diff --git a/test/inventory-and-history.test.mjs b/test/inventory-and-history.test.mjs index 0c3000e..ce3fee3 100644 --- a/test/inventory-and-history.test.mjs +++ b/test/inventory-and-history.test.mjs @@ -3,6 +3,7 @@ import assert from 'node:assert/strict'; import { buildInventorySnapshot } from '../src/core/inventory.mjs'; import { routeHistoryRecord } from '../src/core/history-records.mjs'; +import { insertHistoryEvents } from '../src/lib/postgres.mjs'; test('inventory snapshot keeps pending funding out of spendable balances', () => { const snapshot = buildInventorySnapshot({ @@ -62,3 +63,52 @@ test('history writer routes decision events into the decision table family', () assert.equal(routed.record.decision_key, 'decision-1'); assert.equal(routed.record.quote_id, 'quote-1'); }); + +test('bulk history insert writes multiple routed events in one table statement', async () => { + const queries = []; + const pool = { + async query(sql, params) { + queries.push({ sql, params }); + return { + rows: [ + { event_id: params[0] }, + { event_id: params[12] }, + ], + rowCount: 2, + }; + }, + }; + + const result = await insertHistoryEvents(pool, [ + { + table: 'trade_decisions', + topic: 'decision.trade_decision', + event: historyEvent('evt-1', { decision_id: 'decision-1', quote_id: 'quote-1' }), + record: { quote_id: 'quote-1', pair: 'a->b', decision_key: 'decision-1' }, + }, + { + table: 'trade_decisions', + topic: 'decision.trade_decision', + event: historyEvent('evt-2', { decision_id: 'decision-2', quote_id: 'quote-2' }), + record: { quote_id: 'quote-2', pair: 'a->b', decision_key: 'decision-2' }, + }, + ]); + + assert.equal(queries.length, 1); + assert.match(queries[0].sql, /INSERT INTO trade_decisions/); + assert.match(queries[0].sql, /RETURNING event_id/); + assert.equal(queries[0].params.length, 24); + assert.deepEqual([...result.insertedEventIds].sort(), ['evt-1', 'evt-2']); +}); + +function historyEvent(eventId, payload) { + return { + event_id: eventId, + event_type: 'trade_decision', + venue: 'near-intents', + source: 'strategy-engine', + observed_at: '2026-04-02T10:00:00.000Z', + ingested_at: '2026-04-02T10:00:00.001Z', + payload, + }; +}