import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { routeHistoryRecord } from '../core/history-records.mjs'; import { shouldRunDerivedRefreshForEvent } from '../core/history-writer-refresh-policy.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { buildIntentRequestOutcomeNotification, buildLiquidityActionNotification, buildQuoteOutcomeNotification, createNotificationDispatcher, observedAtOrAfter, } from '../core/notification-layer.mjs'; import { createNtfyNotificationClient } from '../core/notification-client.mjs'; import { buildPortfolioMetricId, computePortfolioMetric } from '../core/portfolio-metrics.mjs'; import { parseEventMessage } from '../core/event-envelope.mjs'; import { loadConfig } from '../lib/config.mjs'; import { claimNotificationDelivery, createTradingConfigStore, createPostgresPool, ensureHistorySchema, insertEnvironmentStatusChange, finishNotificationDelivery, insertHistoryEvents, loadLatestPortfolioMetric, loadPortfolioMetricInputs, pruneRawNearIntentsQuoteHistory, refreshIntentRequestOutcomes, refreshQuoteOutcomes, seedTradingConfig, upsertPortfolioMetric, } from '../lib/postgres.mjs'; const config = loadConfig(); const logger = createLogger({ service: 'history-writer', component: 'history', namespace: config.projectNamespace, }); const pool = createPostgresPool({ connectionString: config.postgresUrl, }); await ensureHistorySchema(pool); await seedTradingConfig(pool); const tradingConfigStore = createTradingConfigStore({ pool, logger: logger.child({ component: 'trading-config' }), }); await tradingConfigStore.forceRefresh(); const notificationLogger = logger.child({ component: 'notifications' }); const notificationClient = createNtfyNotificationClient({ baseUrl: config.notificationNtfyBaseUrl, topic: config.notificationNtfyTopic, token: config.notificationNtfyToken, timeoutMs: config.notificationNtfyTimeoutMs, logger: notificationLogger, }); const notificationDispatcher = createNotificationDispatcher({ client: notificationClient, logger: notificationLogger, store: { claim: (notification) => claimNotificationDelivery(pool, { notificationKey: notification.notification_key, notificationType: notification.notification_type, sourceKind: notification.source_kind, sourceId: notification.source_id, payload: notification.data || {}, }), finish: (notification, result) => finishNotificationDelivery(pool, { notificationKey: notification.notification_key, ok: result.ok, response: result.ok ? { status: result.status, response: result.response || null } : null, error: result.ok ? null : result.error || { status: result.status, response: result.response || null }, }), }, }); const durableHistoryConsumerCount = 3; const durableConsumers = []; for (let index = 0; index < durableHistoryConsumerCount; index += 1) { durableConsumers.push(await createConsumer({ groupId: config.kafkaConsumerGroupHistory, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, })); } const liveEvidenceConsumerCount = 2; const liveEvidenceConsumers = []; for (let index = 0; index < liveEvidenceConsumerCount; index += 1) { liveEvidenceConsumers.push(await createConsumer({ groupId: `${config.kafkaConsumerGroupHistory}-live`, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, })); } const rawQuoteConsumer = await createConsumer({ groupId: `${config.kafkaConsumerGroupHistory}-raw`, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const topics = [ config.kafkaTopicNormSwapDemand, config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, config.kafkaTopicOpsLiquidityAction, config.kafkaTopicOpsFundingObservation, config.kafkaTopicOpsAlert, config.kafkaTopicOpsEnvironmentStatus, config.kafkaTopicDecisionTradeDecision, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]; const rawQuoteTopics = [ config.kafkaTopicRawNearIntentsQuote, ]; const rawQuoteHistoryPruneIntervalMs = 60 * 1000; const rawQuoteHistoryRetainRecentMs = 30 * 60 * 1000; const rawQuoteHistoryPruneBatchSize = 500_000; const rawQuoteHistoryPruneMaxBatches = 20; const liveEvidenceTopics = [ config.kafkaTopicNormSwapDemand, config.kafkaTopicDecisionTradeDecision, ]; const portfolioMetricTopics = new Set([ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, config.kafkaTopicOpsLiquidityAction, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]); const quoteOutcomeTopics = new Set([ config.kafkaTopicStateIntentInventory, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]); const intentRequestOutcomeTopics = new Set([ config.kafkaTopicStateIntentInventory, ]); for (const historyConsumer of durableConsumers) { for (const topic of topics) { await historyConsumer.subscribe({ topic, fromBeginning: true, }); } } for (const historyConsumer of liveEvidenceConsumers) { for (const topic of liveEvidenceTopics) { // The durable group keeps replaying retained history. This live group keeps // current quote/decision truth visible while that backlog drains. await historyConsumer.subscribe({ topic, fromBeginning: false, }); } } for (const topic of rawQuoteTopics) { // Raw quote volume is a live firehose. It gets a dedicated consumer group so // raw storage cannot starve durable strategy/execution topics. await rawQuoteConsumer.subscribe({ topic, fromBeginning: false, }); } const state = { paused: false, draining: false, last_write_at: null, last_funding_observation_write_at: null, last_alert_write_at: null, last_environment_status_write_at: null, last_environment_status_seen_at: null, last_environment_status_duplicate_at: null, last_environment_status_fingerprint: null, last_metrics_at: null, last_error: null, error_count: 0, offsets: {}, latest_portfolio_metrics: null, metrics_error: null, last_quote_outcomes_at: null, latest_quote_outcomes: null, quote_outcomes_error: null, last_intent_request_outcomes_at: null, latest_intent_request_outcomes: null, intent_request_outcomes_error: null, last_raw_quote_prune_at: null, raw_quote_prune_deleted_count: 0, raw_quote_prune_error: null, derived_refresh_skipped_count: 0, last_derived_refresh_skipped_at: null, last_derived_refresh_skipped_topic: null, notification_count: 0, notification_error_count: 0, last_notification_at: null, last_notification_error: null, }; await refreshPortfolioMetrics().catch((error) => { state.metrics_error = serializeError(error); }); await refreshQuoteOutcomeAttributions().catch((error) => { state.quote_outcomes_error = serializeError(error); }); await refreshIntentRequestOutcomeAttributions().catch((error) => { state.intent_request_outcomes_error = serializeError(error); }); for (const historyConsumer of durableConsumers) { await runHistoryConsumer(historyConsumer); } for (const historyConsumer of liveEvidenceConsumers) { await runHistoryConsumer(historyConsumer); } await runHistoryConsumer(rawQuoteConsumer); async function runHistoryConsumer(historyConsumer) { await historyConsumer.run({ eachBatch: async ({ batch, heartbeat }) => { if (state.paused) return; const contexts = []; const batchEntries = []; for (const message of batch.messages) { if (!message.value) continue; try { const event = parseEventMessage(message.value.toString()); const routed = routeHistoryRecord({ topic: batch.topic, event }); const context = { topic: batch.topic, partition: batch.partition, message, event, routed, writeResult: null, }; contexts.push(context); if (batch.topic === config.kafkaTopicOpsEnvironmentStatus) { context.writeResult = await insertEnvironmentStatusChange(pool, { topic: batch.topic, event, record: routed.record, }); } else { batchEntries.push({ table: routed.table, topic: batch.topic, event, record: routed.record, }); } } catch (error) { recordHistoryError(batch.topic, error); } } let insertedEventIds = new Set(); try { ({ insertedEventIds } = await insertHistoryEvents(pool, batchEntries)); } catch (error) { recordHistoryError(batch.topic, error); throw error; } for (const context of contexts) { if (!context.writeResult) { context.writeResult = { inserted: insertedEventIds.has(context.event.event_id), }; } await handleWrittenHistoryEvent(context); await heartbeat(); } if (batch.topic === config.kafkaTopicRawNearIntentsQuote) { await maybePruneRawQuoteHistory(); } if (state.draining) { setTimeout(() => shutdown(), 0); } }, }); } async function maybePruneRawQuoteHistory({ force = false } = {}) { const nowMs = Date.now(); const lastPruneMs = state.last_raw_quote_prune_at ? Date.parse(state.last_raw_quote_prune_at) : 0; if ( !force && Number.isFinite(lastPruneMs) && nowMs - lastPruneMs < rawQuoteHistoryPruneIntervalMs ) { return null; } try { const result = await pruneRawNearIntentsQuoteHistory(pool, { now: new Date(nowMs).toISOString(), retainRecentMs: rawQuoteHistoryRetainRecentMs, batchSize: rawQuoteHistoryPruneBatchSize, maxBatches: rawQuoteHistoryPruneMaxBatches, }); state.last_raw_quote_prune_at = new Date(nowMs).toISOString(); state.raw_quote_prune_deleted_count += result.deletedCount; state.raw_quote_prune_error = null; if (result.deletedCount > 0) { logger.info('raw_near_intents_quote_history_pruned', { details: result, }); } return result; } catch (error) { state.raw_quote_prune_error = serializeError(error); logger.error('raw_near_intents_quote_history_prune_failed', { details: { error: state.raw_quote_prune_error, }, }); return null; } } async function handleWrittenHistoryEvent({ topic, partition, message, event, writeResult, }) { const handledAt = new Date().toISOString(); if (writeResult.inserted) { state.last_write_at = handledAt; } state.last_error = null; state.offsets[topic] = { partition, offset: message.offset, }; const shouldRunDerivedRefresh = shouldRunDerivedRefreshForEvent({ event, now: handledAt, maxEventAgeMs: config.historyWriterDerivedRefreshMaxEventAgeMs, }); if (!shouldRunDerivedRefresh) { state.derived_refresh_skipped_count += 1; state.last_derived_refresh_skipped_at = handledAt; state.last_derived_refresh_skipped_topic = topic; } if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) { state.last_funding_observation_write_at = state.last_write_at; } if (topic === config.kafkaTopicOpsAlert && writeResult.inserted) { state.last_alert_write_at = state.last_write_at; } if (topic === config.kafkaTopicOpsEnvironmentStatus) { state.last_environment_status_seen_at = handledAt; state.last_environment_status_fingerprint = writeResult.status_fingerprint || event.payload.status_fingerprint || null; if (writeResult.inserted) { state.last_environment_status_write_at = handledAt; } else { state.last_environment_status_duplicate_at = handledAt; } } if (shouldRunDerivedRefresh) { await publishLiquidityNotification({ topic, event }); } if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) { try { await refreshPortfolioMetrics(); } catch (error) { state.metrics_error = serializeError(error); logger.error('portfolio_metrics_refresh_failed', { topic, details: { error: serializeError(error), }, }); } } if (shouldRunDerivedRefresh && quoteOutcomeTopics.has(topic)) { try { const records = await refreshQuoteOutcomeAttributions(); await publishQuoteOutcomeNotifications(records, { minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, }); } catch (error) { state.quote_outcomes_error = serializeError(error); logger.error('quote_outcomes_refresh_failed', { topic, details: { error: serializeError(error), }, }); } } if (shouldRunDerivedRefresh && intentRequestOutcomeTopics.has(topic)) { try { const records = await refreshIntentRequestOutcomeAttributions(); await publishIntentRequestOutcomeNotifications(records, { minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, }); } catch (error) { state.intent_request_outcomes_error = serializeError(error); logger.error('intent_request_outcomes_refresh_failed', { topic, details: { error: serializeError(error), }, }); } } } function recordHistoryError(topic, error) { state.last_error = serializeError(error); state.error_count += 1; logger.error('history_write_failed', { topic, details: { error: serializeError(error), }, }); } const controlApi = startControlApi({ host: config.historyWriterControlHost, port: config.historyWriterControlPort, logger: logger.child({ component: 'control-api' }), service: 'history-writer', namespace: config.projectNamespace, stateProvider: { async getState() { const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); return { ...state, database_connectivity: connectivity, trading_config: tradingConfigStore.getState(), }; }, }, healthProvider: { async getHealth() { const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); const tradingConfig = tradingConfigStore.getState(); const lastTruthAt = state.last_write_at || state.last_metrics_at || null; const freshnessAgeMs = lastTruthAt ? Date.now() - new Date(lastTruthAt).getTime() : null; return { ok: connectivity && tradingConfig.ok === true && (freshnessAgeMs == null || freshnessAgeMs <= config.opsSentinelHistoryWriterStaleMs), paused: state.paused, last_write_at: state.last_write_at, last_alert_write_at: state.last_alert_write_at, last_environment_status_write_at: state.last_environment_status_write_at, last_environment_status_seen_at: state.last_environment_status_seen_at, last_environment_status_duplicate_at: state.last_environment_status_duplicate_at, last_environment_status_fingerprint: state.last_environment_status_fingerprint, last_metrics_at: state.last_metrics_at, freshness_age_ms: Number.isFinite(freshnessAgeMs) ? Math.max(0, freshnessAgeMs) : null, database_connectivity: connectivity, trading_config_ok: tradingConfig.ok, trading_config_block_reason: tradingConfig.block_reason, last_error: state.last_error, reason: tradingConfig.ok === true ? null : tradingConfig.block_reason || 'trading config unavailable', }; }, }, routes: [ { method: 'GET', path: '/portfolio-metrics', readBody: false, handler: async () => { const latest = await loadLatestPortfolioMetric(pool); if (!latest) { return { statusCode: 404, payload: { error: 'portfolio_metrics_unavailable', }, }; } return latest; }, }, { method: 'POST', path: '/pause', handler: () => { state.paused = true; pauseConsumers(); return { ok: true, paused: true }; }, }, { method: 'POST', path: '/resume', handler: () => { state.paused = false; resumeConsumers(); return { ok: true, paused: false }; }, }, { method: 'POST', path: '/drain', handler: () => { state.draining = true; state.paused = true; pauseConsumers(); setTimeout(() => shutdown(), 0); return { ok: true, draining: true }; }, }, ], }); async function refreshPortfolioMetrics() { const tradingConfig = await requireTradingConfig(); const inputs = await loadPortfolioMetricInputs(pool, { btcAsset: tradingConfig.tradingBtc, btcAssets: tradingConfig.tradingBtcAssets, eureAsset: tradingConfig.tradingEure, trackedAssets: tradingConfig.trackedAssets, }); const payload = computePortfolioMetric({ baseline: inputs.baseline, currentInventory: inputs.currentInventory?.payload, currentPrice: inputs.currentPrice?.payload, externalFlows: inputs.externalFlows || [], btcAsset: tradingConfig.tradingBtc, btcAssets: tradingConfig.tradingBtcAssets, eureAsset: tradingConfig.tradingEure, valuationAssets: inputs.valuationAssets || [], commandCount: inputs.commandCount, resultCount: inputs.resultCount, }); if (!payload) return null; const computedAt = new Date().toISOString(); const metricId = buildPortfolioMetricId({ baselineInventoryId: inputs.baseline?.inventory?.inventory_id || null, currentInventoryId: inputs.currentInventory?.payload?.inventory_id || null, currentPriceId: [ inputs.currentPrice?.payload?.price_id, ...(inputs.valuationAssets || []).map((asset) => asset.priceId), ].filter(Boolean).join('+') || null, }); await upsertPortfolioMetric(pool, { metricId, computedAt, baselineAnchorAt: inputs.baseline?.command_at || null, baselineStatus: payload.baseline_status, payload, }); state.last_metrics_at = computedAt; state.metrics_error = null; state.latest_portfolio_metrics = summarizePortfolioMetric({ metric_id: metricId, computed_at: computedAt, baseline_anchor_at: inputs.baseline?.command_at || null, baseline_status: payload.baseline_status, payload, }); return state.latest_portfolio_metrics; } async function refreshIntentRequestOutcomeAttributions() { const tradingConfig = await requireTradingConfig(); const records = await refreshIntentRequestOutcomes(pool, { btcAsset: tradingConfig.tradingBtc, eureAsset: tradingConfig.tradingEure, }); state.last_intent_request_outcomes_at = new Date().toISOString(); state.latest_intent_request_outcomes = { refreshed_count: records.length, completed_count: records.filter((entry) => entry.outcome_status === 'completed').length, not_filled_count: records.filter((entry) => entry.outcome_status === 'not_filled').length, awaiting_settlement_count: records.filter((entry) => entry.outcome_status === 'awaiting_settlement').length, }; state.intent_request_outcomes_error = null; return records; } async function refreshQuoteOutcomeAttributions() { const tradingConfig = await requireTradingConfig(); const records = await refreshQuoteOutcomes(pool, { btcAsset: tradingConfig.tradingBtc, eureAsset: tradingConfig.tradingEure, }); state.last_quote_outcomes_at = new Date().toISOString(); state.quote_outcomes_error = null; state.latest_quote_outcomes = { count: records.length, completed_count: records.filter((entry) => entry.outcome_status === 'completed').length, not_filled_count: records.filter((entry) => entry.outcome_status === 'not_filled').length, submitted_count: records.filter((entry) => entry.outcome_status === 'submitted').length, }; return records; } async function requireTradingConfig() { const tradingConfig = await tradingConfigStore.getConfig(); if (!tradingConfig.ok) { throw new Error(`trading config unavailable: ${tradingConfig.blockReason || 'unavailable'}`); } return tradingConfig; } async function publishLiquidityNotification({ topic, event }) { if (topic !== config.kafkaTopicOpsLiquidityAction) return; const notification = buildLiquidityActionNotification({ event, config }); await publishNotification(notification); } async function publishQuoteOutcomeNotifications(records, { minObservedAt = null } = {}) { for (const record of records || []) { if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue; await publishNotification(buildQuoteOutcomeNotification({ record, config })); } } async function publishIntentRequestOutcomeNotifications(records, { minObservedAt = null } = {}) { for (const record of records || []) { if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue; await publishNotification(buildIntentRequestOutcomeNotification({ record, config })); } } async function publishNotification(notification) { if (!notification) return null; const result = await notificationDispatcher.publishOnce(notification); if (result.delivered) { state.notification_count += 1; state.last_notification_at = new Date().toISOString(); state.last_notification_error = null; } else if (result.ok === false) { state.notification_error_count += 1; state.last_notification_error = result.error || { status: result.status, response: result.response || null }; } return result; } function summarizePortfolioMetric(metric) { if (!metric) return null; return { metric_id: metric.metric_id, computed_at: metric.computed_at, baseline_anchor_at: metric.baseline_anchor_at, baseline_status: metric.baseline_status, current_portfolio_value_eure: metric.payload?.current_portfolio_value_eure ?? null, portfolio_vs_simple_hold_eure: metric.payload?.portfolio_vs_simple_hold_eure ?? null, mark_to_market_pnl_eure: metric.payload?.mark_to_market_pnl_eure ?? null, price_move_pnl_eure: metric.payload?.price_move_pnl_eure ?? null, command_count: metric.payload?.command_count ?? 0, result_count: metric.payload?.result_count ?? 0, }; } function topicRefs(topicNames) { return topicNames.map((topic) => ({ topic })); } function pauseConsumers() { for (const historyConsumer of durableConsumers) { historyConsumer.pause(topicRefs(topics)); } for (const historyConsumer of liveEvidenceConsumers) { historyConsumer.pause(topicRefs(liveEvidenceTopics)); } rawQuoteConsumer.pause(topicRefs(rawQuoteTopics)); } function resumeConsumers() { for (const historyConsumer of durableConsumers) { historyConsumer.resume(topicRefs(topics)); } for (const historyConsumer of liveEvidenceConsumers) { historyConsumer.resume(topicRefs(liveEvidenceTopics)); } rawQuoteConsumer.resume(topicRefs(rawQuoteTopics)); } async function shutdown() { await controlApi.close().catch(() => {}); await Promise.allSettled([ ...durableConsumers.map((historyConsumer) => historyConsumer.disconnect()), ...liveEvidenceConsumers.map((historyConsumer) => historyConsumer.disconnect()), rawQuoteConsumer.disconnect(), ]); await pool.end(); process.exit(0); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);