Let history writer catch up through stale backlog
Some checks failed
deploy / deploy (push) Failing after 31s

Proof: npm test (140 passing); npm run operator-dashboard:build; git diff --cached --check.

Assumptions: Events older than HISTORY_WRITER_DERIVED_REFRESH_MAX_EVENT_AGE_MS are replay/backfill work; writing them remains required, but expensive derived portfolio/outcome refreshes can wait until the writer reaches fresh events.

Still fake: This does not move or migrate BTC assets; it only improves durable catch-up so fresh inventory can reach request preflight.
This commit is contained in:
philipp 2026-05-07 16:13:21 +02:00
parent ab078d976a
commit d151db1e91
5 changed files with 67 additions and 3 deletions

View file

@ -75,6 +75,7 @@ data:
KAFKA_CONSUMER_GROUP_EXECUTOR: trade-executor-v1 KAFKA_CONSUMER_GROUP_EXECUTOR: trade-executor-v1
KAFKA_CONSUMER_GROUP_OPS_SENTINEL: ops-sentinel-v1 KAFKA_CONSUMER_GROUP_OPS_SENTINEL: ops-sentinel-v1
KAFKA_CONSUMER_GROUP_OPERATOR_DASHBOARD: operator-dashboard-v1 KAFKA_CONSUMER_GROUP_OPERATOR_DASHBOARD: operator-dashboard-v1
HISTORY_WRITER_DERIVED_REFRESH_MAX_EVENT_AGE_MS: "300000"
STRATEGY_STATE_DIR: /var/lib/unrip/strategy-state STRATEGY_STATE_DIR: /var/lib/unrip/strategy-state
EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state
LIQUIDITY_STATE_DIR: /var/lib/unrip/liquidity-state LIQUIDITY_STATE_DIR: /var/lib/unrip/liquidity-state

View file

@ -3,6 +3,7 @@ import process from 'node:process';
import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createConsumer } from '../bus/kafka/consumer.mjs';
import { startControlApi } from '../core/control-api.mjs'; import { startControlApi } from '../core/control-api.mjs';
import { routeHistoryRecord } from '../core/history-records.mjs'; import { routeHistoryRecord } from '../core/history-records.mjs';
import { shouldRunDerivedRefreshForEvent } from '../core/history-writer-refresh-policy.mjs';
import { createLogger, serializeError } from '../core/log.mjs'; import { createLogger, serializeError } from '../core/log.mjs';
import { import {
buildIntentRequestOutcomeNotification, buildIntentRequestOutcomeNotification,
@ -131,6 +132,9 @@ const state = {
last_intent_request_outcomes_at: null, last_intent_request_outcomes_at: null,
latest_intent_request_outcomes: null, latest_intent_request_outcomes: null,
intent_request_outcomes_error: null, intent_request_outcomes_error: null,
derived_refresh_skipped_count: 0,
last_derived_refresh_skipped_at: null,
last_derived_refresh_skipped_topic: null,
notification_count: 0, notification_count: 0,
notification_error_count: 0, notification_error_count: 0,
last_notification_at: null, last_notification_at: null,
@ -176,6 +180,16 @@ await consumer.run({
partition, partition,
offset: message.offset, offset: message.offset,
}; };
const shouldRunDerivedRefresh = shouldRunDerivedRefreshForEvent({
event,
now: handledAt,
maxEventAgeMs: config.historyWriterDerivedRefreshMaxEventAgeMs,
});
if (!shouldRunDerivedRefresh) {
state.derived_refresh_skipped_count += 1;
state.last_derived_refresh_skipped_at = handledAt;
state.last_derived_refresh_skipped_topic = topic;
}
if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) { if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) {
state.last_funding_observation_write_at = state.last_write_at; state.last_funding_observation_write_at = state.last_write_at;
} }
@ -192,7 +206,7 @@ await consumer.run({
} }
} }
await publishLiquidityNotification({ topic, event }); await publishLiquidityNotification({ topic, event });
if (portfolioMetricTopics.has(topic)) { if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) {
try { try {
await refreshPortfolioMetrics(); await refreshPortfolioMetrics();
} catch (error) { } catch (error) {
@ -205,7 +219,7 @@ await consumer.run({
}); });
} }
} }
if (quoteOutcomeTopics.has(topic)) { if (shouldRunDerivedRefresh && quoteOutcomeTopics.has(topic)) {
try { try {
const records = await refreshQuoteOutcomeAttributions(); const records = await refreshQuoteOutcomeAttributions();
await publishQuoteOutcomeNotifications(records, { await publishQuoteOutcomeNotifications(records, {
@ -221,7 +235,7 @@ await consumer.run({
}); });
} }
} }
if (intentRequestOutcomeTopics.has(topic)) { if (shouldRunDerivedRefresh && intentRequestOutcomeTopics.has(topic)) {
try { try {
const records = await refreshIntentRequestOutcomeAttributions(); const records = await refreshIntentRequestOutcomeAttributions();
await publishIntentRequestOutcomeNotifications(records, { await publishIntentRequestOutcomeNotifications(records, {

View file

@ -0,0 +1,15 @@
export function shouldRunDerivedRefreshForEvent({
event,
now = new Date().toISOString(),
maxEventAgeMs,
} = {}) {
const maxAge = Number(maxEventAgeMs);
if (!Number.isFinite(maxAge) || maxAge < 0) return true;
const eventAt = event?.ingested_at || event?.observed_at || null;
if (!eventAt) return true;
const age = Date.parse(now) - Date.parse(eventAt);
if (!Number.isFinite(age)) return true;
return age <= maxAge;
}

View file

@ -39,6 +39,7 @@ const DEFAULTS = {
kafkaConsumerGroupExecutor: 'trade-executor-v1', kafkaConsumerGroupExecutor: 'trade-executor-v1',
kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1', kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1',
kafkaConsumerGroupOperatorDashboard: 'operator-dashboard-v1', kafkaConsumerGroupOperatorDashboard: 'operator-dashboard-v1',
historyWriterDerivedRefreshMaxEventAgeMs: 5 * 60 * 1000,
strategyStateDir: './var/strategy-state', strategyStateDir: './var/strategy-state',
executorStateDir: './var/executor-state', executorStateDir: './var/executor-state',
liquidityStateDir: './var/liquidity-state', liquidityStateDir: './var/liquidity-state',
@ -426,6 +427,10 @@ export function loadConfig({ envPath = '.env' } = {}) {
kafkaConsumerGroupOperatorDashboard: kafkaConsumerGroupOperatorDashboard:
process.env.KAFKA_CONSUMER_GROUP_OPERATOR_DASHBOARD process.env.KAFKA_CONSUMER_GROUP_OPERATOR_DASHBOARD
|| DEFAULTS.kafkaConsumerGroupOperatorDashboard, || DEFAULTS.kafkaConsumerGroupOperatorDashboard,
historyWriterDerivedRefreshMaxEventAgeMs: parseNumber(
process.env.HISTORY_WRITER_DERIVED_REFRESH_MAX_EVENT_AGE_MS,
DEFAULTS.historyWriterDerivedRefreshMaxEventAgeMs,
),
strategyStateDir: process.env.STRATEGY_STATE_DIR || DEFAULTS.strategyStateDir, strategyStateDir: process.env.STRATEGY_STATE_DIR || DEFAULTS.strategyStateDir,
executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir, executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir,
liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir, liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir,

View file

@ -0,0 +1,29 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { shouldRunDerivedRefreshForEvent } from '../src/core/history-writer-refresh-policy.mjs';
test('history writer derived refresh policy skips stale replayed events', () => {
assert.equal(shouldRunDerivedRefreshForEvent({
event: {
ingested_at: '2026-05-05T07:56:03.203Z',
},
now: '2026-05-07T14:12:00.000Z',
maxEventAgeMs: 300000,
}), false);
});
test('history writer derived refresh policy runs for fresh and undated events', () => {
assert.equal(shouldRunDerivedRefreshForEvent({
event: {
ingested_at: '2026-05-07T14:11:52.729Z',
},
now: '2026-05-07T14:12:00.000Z',
maxEventAgeMs: 300000,
}), true);
assert.equal(shouldRunDerivedRefreshForEvent({
event: {},
now: '2026-05-07T14:12:00.000Z',
maxEventAgeMs: 300000,
}), true);
});