diff --git a/deploy/k8s/base/unrip.yaml b/deploy/k8s/base/unrip.yaml index 57a4221..34d45a0 100644 --- a/deploy/k8s/base/unrip.yaml +++ b/deploy/k8s/base/unrip.yaml @@ -75,6 +75,7 @@ data: KAFKA_CONSUMER_GROUP_EXECUTOR: trade-executor-v1 KAFKA_CONSUMER_GROUP_OPS_SENTINEL: ops-sentinel-v1 KAFKA_CONSUMER_GROUP_OPERATOR_DASHBOARD: operator-dashboard-v1 + HISTORY_WRITER_DERIVED_REFRESH_MAX_EVENT_AGE_MS: "300000" STRATEGY_STATE_DIR: /var/lib/unrip/strategy-state EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state LIQUIDITY_STATE_DIR: /var/lib/unrip/liquidity-state diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index f700a67..a760f5e 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -3,6 +3,7 @@ import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { routeHistoryRecord } from '../core/history-records.mjs'; +import { shouldRunDerivedRefreshForEvent } from '../core/history-writer-refresh-policy.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { buildIntentRequestOutcomeNotification, @@ -131,6 +132,9 @@ const state = { last_intent_request_outcomes_at: null, latest_intent_request_outcomes: null, intent_request_outcomes_error: null, + derived_refresh_skipped_count: 0, + last_derived_refresh_skipped_at: null, + last_derived_refresh_skipped_topic: null, notification_count: 0, notification_error_count: 0, last_notification_at: null, @@ -176,6 +180,16 @@ await consumer.run({ partition, offset: message.offset, }; + const shouldRunDerivedRefresh = shouldRunDerivedRefreshForEvent({ + event, + now: handledAt, + maxEventAgeMs: config.historyWriterDerivedRefreshMaxEventAgeMs, + }); + if (!shouldRunDerivedRefresh) { + state.derived_refresh_skipped_count += 1; + state.last_derived_refresh_skipped_at = handledAt; + state.last_derived_refresh_skipped_topic = topic; + } if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) { state.last_funding_observation_write_at = state.last_write_at; } @@ -192,7 +206,7 @@ await consumer.run({ } } await publishLiquidityNotification({ topic, event }); - if (portfolioMetricTopics.has(topic)) { + if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) { try { await refreshPortfolioMetrics(); } catch (error) { @@ -205,7 +219,7 @@ await consumer.run({ }); } } - if (quoteOutcomeTopics.has(topic)) { + if (shouldRunDerivedRefresh && quoteOutcomeTopics.has(topic)) { try { const records = await refreshQuoteOutcomeAttributions(); await publishQuoteOutcomeNotifications(records, { @@ -221,7 +235,7 @@ await consumer.run({ }); } } - if (intentRequestOutcomeTopics.has(topic)) { + if (shouldRunDerivedRefresh && intentRequestOutcomeTopics.has(topic)) { try { const records = await refreshIntentRequestOutcomeAttributions(); await publishIntentRequestOutcomeNotifications(records, { diff --git a/src/core/history-writer-refresh-policy.mjs b/src/core/history-writer-refresh-policy.mjs new file mode 100644 index 0000000..700707c --- /dev/null +++ b/src/core/history-writer-refresh-policy.mjs @@ -0,0 +1,15 @@ +export function shouldRunDerivedRefreshForEvent({ + event, + now = new Date().toISOString(), + maxEventAgeMs, +} = {}) { + const maxAge = Number(maxEventAgeMs); + if (!Number.isFinite(maxAge) || maxAge < 0) return true; + + const eventAt = event?.ingested_at || event?.observed_at || null; + if (!eventAt) return true; + + const age = Date.parse(now) - Date.parse(eventAt); + if (!Number.isFinite(age)) return true; + return age <= maxAge; +} diff --git a/src/lib/config.mjs b/src/lib/config.mjs index c3779b7..a6ebae1 100644 --- a/src/lib/config.mjs +++ b/src/lib/config.mjs @@ -39,6 +39,7 @@ const DEFAULTS = { kafkaConsumerGroupExecutor: 'trade-executor-v1', kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1', kafkaConsumerGroupOperatorDashboard: 'operator-dashboard-v1', + historyWriterDerivedRefreshMaxEventAgeMs: 5 * 60 * 1000, strategyStateDir: './var/strategy-state', executorStateDir: './var/executor-state', liquidityStateDir: './var/liquidity-state', @@ -426,6 +427,10 @@ export function loadConfig({ envPath = '.env' } = {}) { kafkaConsumerGroupOperatorDashboard: process.env.KAFKA_CONSUMER_GROUP_OPERATOR_DASHBOARD || DEFAULTS.kafkaConsumerGroupOperatorDashboard, + historyWriterDerivedRefreshMaxEventAgeMs: parseNumber( + process.env.HISTORY_WRITER_DERIVED_REFRESH_MAX_EVENT_AGE_MS, + DEFAULTS.historyWriterDerivedRefreshMaxEventAgeMs, + ), strategyStateDir: process.env.STRATEGY_STATE_DIR || DEFAULTS.strategyStateDir, executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir, liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir, diff --git a/test/history-writer-refresh-policy.test.mjs b/test/history-writer-refresh-policy.test.mjs new file mode 100644 index 0000000..313c8bb --- /dev/null +++ b/test/history-writer-refresh-policy.test.mjs @@ -0,0 +1,29 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { shouldRunDerivedRefreshForEvent } from '../src/core/history-writer-refresh-policy.mjs'; + +test('history writer derived refresh policy skips stale replayed events', () => { + assert.equal(shouldRunDerivedRefreshForEvent({ + event: { + ingested_at: '2026-05-05T07:56:03.203Z', + }, + now: '2026-05-07T14:12:00.000Z', + maxEventAgeMs: 300000, + }), false); +}); + +test('history writer derived refresh policy runs for fresh and undated events', () => { + assert.equal(shouldRunDerivedRefreshForEvent({ + event: { + ingested_at: '2026-05-07T14:11:52.729Z', + }, + now: '2026-05-07T14:12:00.000Z', + maxEventAgeMs: 300000, + }), true); + assert.equal(shouldRunDerivedRefreshForEvent({ + event: {}, + now: '2026-05-07T14:12:00.000Z', + maxEventAgeMs: 300000, + }), true); +});