Batch durable history writes
All checks were successful
deploy / deploy (push) Successful in 47s

Proof: History-writer now writes routed event batches with one bulk insert per table, preserving decision and normalized quote history while reducing Kafka lag that delayed durable strategy decision rows.

Assumptions: Kafka message order within a topic partition remains sufficient for durability; environment status events keep their dedicated dedupe path; this change does not alter strategy, edge, notional, inventory, arming, or relay submission behavior.

Still fake: Venue-native terminal fill ids and fee-complete realized PnL remain unavailable; historical decision rows can still lag until the deployed batch writer drains existing backlog.
This commit is contained in:
philipp 2026-05-19 15:53:52 +02:00
parent 1d66ae208f
commit 348c4f9b0b
4 changed files with 276 additions and 106 deletions

View file

@ -23,7 +23,7 @@ import {
ensureHistorySchema,
insertEnvironmentStatusChange,
finishNotificationDelivery,
insertHistoryEvent,
insertHistoryEvents,
loadLatestPortfolioMetric,
loadPortfolioMetricInputs,
refreshIntentRequestOutcomes,
@ -165,25 +165,77 @@ await refreshIntentRequestOutcomeAttributions().catch((error) => {
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!message.value || state.paused) return;
eachBatch: async ({ batch, heartbeat }) => {
if (state.paused) return;
const contexts = [];
const batchEntries = [];
for (const message of batch.messages) {
if (!message.value) continue;
try {
const event = parseEventMessage(message.value.toString());
const routed = routeHistoryRecord({ topic, event });
const writeResult = topic === config.kafkaTopicOpsEnvironmentStatus
? await insertEnvironmentStatusChange(pool, {
topic,
const routed = routeHistoryRecord({ topic: batch.topic, event });
const context = {
topic: batch.topic,
partition: batch.partition,
message,
event,
record: routed.record,
})
: await insertHistoryEvent(pool, {
table: routed.table,
topic,
event,
record: routed.record,
}).then(() => ({ inserted: true }));
routed,
writeResult: null,
};
contexts.push(context);
if (batch.topic === config.kafkaTopicOpsEnvironmentStatus) {
context.writeResult = await insertEnvironmentStatusChange(pool, {
topic: batch.topic,
event,
record: routed.record,
});
} else {
batchEntries.push({
table: routed.table,
topic: batch.topic,
event,
record: routed.record,
});
}
} catch (error) {
recordHistoryError(batch.topic, error);
}
}
let insertedEventIds = new Set();
try {
({ insertedEventIds } = await insertHistoryEvents(pool, batchEntries));
} catch (error) {
recordHistoryError(batch.topic, error);
throw error;
}
for (const context of contexts) {
if (!context.writeResult) {
context.writeResult = {
inserted: insertedEventIds.has(context.event.event_id),
};
}
await handleWrittenHistoryEvent(context);
await heartbeat();
}
if (state.draining) {
setTimeout(() => shutdown(), 0);
}
},
});
async function handleWrittenHistoryEvent({
topic,
partition,
message,
event,
writeResult,
}) {
const handledAt = new Date().toISOString();
if (writeResult.inserted) {
state.last_write_at = handledAt;
@ -266,7 +318,9 @@ await consumer.run({
});
}
}
} catch (error) {
}
function recordHistoryError(topic, error) {
state.last_error = serializeError(error);
state.error_count += 1;
logger.error('history_write_failed', {
@ -275,13 +329,7 @@ await consumer.run({
error: serializeError(error),
},
});
} finally {
if (state.draining) {
setTimeout(() => shutdown(), 0);
}
}
},
});
}
const controlApi = startControlApi({
host: config.historyWriterControlHost,

View file

@ -2003,7 +2003,7 @@ function importedAssetChanged(previous, next) {
}
export async function insertHistoryEvent(pool, { table, topic, event, record }) {
await pool.query(
const result = await pool.query(
`
INSERT INTO ${table} (
event_id,
@ -2022,6 +2022,7 @@ export async function insertHistoryEvent(pool, { table, topic, event, record })
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11::jsonb,$12::jsonb
)
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id
`,
[
event.event_id,
@ -2038,6 +2039,75 @@ export async function insertHistoryEvent(pool, { table, topic, event, record })
event.raw ? JSON.stringify(event.raw) : null,
],
);
return { inserted: result.rowCount > 0 };
}
export async function insertHistoryEvents(pool, entries = []) {
const insertedEventIds = new Set();
const normalizedEntries = (entries || []).filter(Boolean);
if (!normalizedEntries.length) return { insertedEventIds };
const entriesByTable = new Map();
for (const entry of normalizedEntries) {
if (!TABLES.includes(entry.table)) {
throw new Error(`unsupported history table: ${entry.table}`);
}
const list = entriesByTable.get(entry.table) || [];
list.push(entry);
entriesByTable.set(entry.table, list);
}
for (const [table, tableEntries] of entriesByTable.entries()) {
for (let start = 0; start < tableEntries.length; start += 1000) {
const chunk = tableEntries.slice(start, start + 1000);
const values = [];
const placeholders = chunk.map((entry, index) => {
const offset = index * 12;
values.push(
entry.event.event_id,
entry.topic,
entry.event.venue,
entry.event.source,
entry.event.event_type,
entry.event.observed_at,
entry.event.ingested_at,
entry.record.quote_id,
entry.record.pair,
entry.record.decision_key,
JSON.stringify(entry.event.payload),
entry.event.raw ? JSON.stringify(entry.event.raw) : null,
);
return `($${offset + 1},$${offset + 2},$${offset + 3},$${offset + 4},$${offset + 5},$${offset + 6},$${offset + 7},$${offset + 8},$${offset + 9},$${offset + 10},$${offset + 11}::jsonb,$${offset + 12}::jsonb)`;
}).join(',');
const result = await pool.query(
`
INSERT INTO ${table} (
event_id,
topic,
venue,
source,
event_type,
observed_at,
ingested_at,
quote_id,
pair,
decision_key,
payload,
raw
) VALUES ${placeholders}
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id
`,
values,
);
for (const row of result.rows || []) {
insertedEventIds.add(row.event_id);
}
}
}
return { insertedEventIds };
}
export async function insertEnvironmentStatusChange(pool, { topic, event, record }) {

View file

@ -7,6 +7,8 @@ const source = readFileSync(new URL('../src/apps/history-writer.mjs', import.met
test('history writer replays durable topics but joins the raw quote firehose live', () => {
assert.match(source, /fromBeginning:\s*topic !== config\.kafkaTopicRawNearIntentsQuote/);
assert.match(source, /Raw quote volume is a live firehose/);
assert.match(source, /eachBatch/);
assert.match(source, /insertHistoryEvents/);
});
test('history writer passes tracked assets into portfolio valuation', () => {

View file

@ -3,6 +3,7 @@ import assert from 'node:assert/strict';
import { buildInventorySnapshot } from '../src/core/inventory.mjs';
import { routeHistoryRecord } from '../src/core/history-records.mjs';
import { insertHistoryEvents } from '../src/lib/postgres.mjs';
test('inventory snapshot keeps pending funding out of spendable balances', () => {
const snapshot = buildInventorySnapshot({
@ -62,3 +63,52 @@ test('history writer routes decision events into the decision table family', ()
assert.equal(routed.record.decision_key, 'decision-1');
assert.equal(routed.record.quote_id, 'quote-1');
});
test('bulk history insert writes multiple routed events in one table statement', async () => {
const queries = [];
const pool = {
async query(sql, params) {
queries.push({ sql, params });
return {
rows: [
{ event_id: params[0] },
{ event_id: params[12] },
],
rowCount: 2,
};
},
};
const result = await insertHistoryEvents(pool, [
{
table: 'trade_decisions',
topic: 'decision.trade_decision',
event: historyEvent('evt-1', { decision_id: 'decision-1', quote_id: 'quote-1' }),
record: { quote_id: 'quote-1', pair: 'a->b', decision_key: 'decision-1' },
},
{
table: 'trade_decisions',
topic: 'decision.trade_decision',
event: historyEvent('evt-2', { decision_id: 'decision-2', quote_id: 'quote-2' }),
record: { quote_id: 'quote-2', pair: 'a->b', decision_key: 'decision-2' },
},
]);
assert.equal(queries.length, 1);
assert.match(queries[0].sql, /INSERT INTO trade_decisions/);
assert.match(queries[0].sql, /RETURNING event_id/);
assert.equal(queries[0].params.length, 24);
assert.deepEqual([...result.insertedEventIds].sort(), ['evt-1', 'evt-2']);
});
function historyEvent(eventId, payload) {
return {
event_id: eventId,
event_type: 'trade_decision',
venue: 'near-intents',
source: 'strategy-engine',
observed_at: '2026-04-02T10:00:00.000Z',
ingested_at: '2026-04-02T10:00:00.001Z',
payload,
};
}