import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { routeHistoryRecord } from '../core/history-records.mjs'; import { shouldRunDerivedRefreshForEvent } from '../core/history-writer-refresh-policy.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { buildIntentRequestOutcomeNotification, buildLiquidityActionNotification, buildQuoteOutcomeNotification, createNotificationDispatcher, observedAtOrAfter, } from '../core/notification-layer.mjs'; import { createNtfyNotificationClient } from '../core/notification-client.mjs'; import { buildPortfolioMetricId, computePortfolioMetric } from '../core/portfolio-metrics.mjs'; import { parseEventMessage } from '../core/event-envelope.mjs'; import { loadConfig } from '../lib/config.mjs'; import { claimNotificationDelivery, createTradingConfigStore, createPostgresPool, ensureHistorySchema, insertEnvironmentStatusChange, finishNotificationDelivery, insertHistoryEvent, loadLatestPortfolioMetric, loadPortfolioMetricInputs, refreshIntentRequestOutcomes, refreshQuoteOutcomes, seedTradingConfig, upsertPortfolioMetric, } from '../lib/postgres.mjs'; const config = loadConfig(); const logger = createLogger({ service: 'history-writer', component: 'history', namespace: config.projectNamespace, }); const pool = createPostgresPool({ connectionString: config.postgresUrl, }); await ensureHistorySchema(pool); await seedTradingConfig(pool); const tradingConfigStore = createTradingConfigStore({ pool, logger: logger.child({ component: 'trading-config' }), }); await tradingConfigStore.forceRefresh(); const notificationLogger = logger.child({ component: 'notifications' }); const notificationClient = createNtfyNotificationClient({ baseUrl: config.notificationNtfyBaseUrl, topic: config.notificationNtfyTopic, token: config.notificationNtfyToken, timeoutMs: config.notificationNtfyTimeoutMs, logger: notificationLogger, }); const notificationDispatcher = createNotificationDispatcher({ client: notificationClient, logger: notificationLogger, store: { claim: (notification) => claimNotificationDelivery(pool, { notificationKey: notification.notification_key, notificationType: notification.notification_type, sourceKind: notification.source_kind, sourceId: notification.source_id, payload: notification.data || {}, }), finish: (notification, result) => finishNotificationDelivery(pool, { notificationKey: notification.notification_key, ok: result.ok, response: result.ok ? { status: result.status, response: result.response || null } : null, error: result.ok ? null : result.error || { status: result.status, response: result.response || null }, }), }, }); const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupHistory, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const topics = [ config.kafkaTopicRawNearIntentsQuote, config.kafkaTopicNormSwapDemand, config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, config.kafkaTopicOpsLiquidityAction, config.kafkaTopicOpsFundingObservation, config.kafkaTopicOpsAlert, config.kafkaTopicOpsEnvironmentStatus, config.kafkaTopicDecisionTradeDecision, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]; const portfolioMetricTopics = new Set([ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, config.kafkaTopicOpsLiquidityAction, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]); const quoteOutcomeTopics = new Set([ config.kafkaTopicStateIntentInventory, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]); const intentRequestOutcomeTopics = new Set([ config.kafkaTopicStateIntentInventory, ]); for (const topic of topics) { // Raw quote volume is a live firehose; replaying retained history can starve // durable strategy/execution topics and exhaust the writer. await consumer.subscribe({ topic, fromBeginning: topic !== config.kafkaTopicRawNearIntentsQuote, }); } const state = { paused: false, draining: false, last_write_at: null, last_funding_observation_write_at: null, last_alert_write_at: null, last_environment_status_write_at: null, last_environment_status_seen_at: null, last_environment_status_duplicate_at: null, last_environment_status_fingerprint: null, last_metrics_at: null, last_error: null, error_count: 0, offsets: {}, latest_portfolio_metrics: null, metrics_error: null, last_quote_outcomes_at: null, latest_quote_outcomes: null, quote_outcomes_error: null, last_intent_request_outcomes_at: null, latest_intent_request_outcomes: null, intent_request_outcomes_error: null, derived_refresh_skipped_count: 0, last_derived_refresh_skipped_at: null, last_derived_refresh_skipped_topic: null, notification_count: 0, notification_error_count: 0, last_notification_at: null, last_notification_error: null, }; await refreshPortfolioMetrics().catch((error) => { state.metrics_error = serializeError(error); }); await refreshQuoteOutcomeAttributions().catch((error) => { state.quote_outcomes_error = serializeError(error); }); await refreshIntentRequestOutcomeAttributions().catch((error) => { state.intent_request_outcomes_error = serializeError(error); }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { if (!message.value || state.paused) return; try { const event = parseEventMessage(message.value.toString()); const routed = routeHistoryRecord({ topic, event }); const writeResult = topic === config.kafkaTopicOpsEnvironmentStatus ? await insertEnvironmentStatusChange(pool, { topic, event, record: routed.record, }) : await insertHistoryEvent(pool, { table: routed.table, topic, event, record: routed.record, }).then(() => ({ inserted: true })); const handledAt = new Date().toISOString(); if (writeResult.inserted) { state.last_write_at = handledAt; } state.last_error = null; state.offsets[topic] = { partition, offset: message.offset, }; const shouldRunDerivedRefresh = shouldRunDerivedRefreshForEvent({ event, now: handledAt, maxEventAgeMs: config.historyWriterDerivedRefreshMaxEventAgeMs, }); if (!shouldRunDerivedRefresh) { state.derived_refresh_skipped_count += 1; state.last_derived_refresh_skipped_at = handledAt; state.last_derived_refresh_skipped_topic = topic; } if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) { state.last_funding_observation_write_at = state.last_write_at; } if (topic === config.kafkaTopicOpsAlert && writeResult.inserted) { state.last_alert_write_at = state.last_write_at; } if (topic === config.kafkaTopicOpsEnvironmentStatus) { state.last_environment_status_seen_at = handledAt; state.last_environment_status_fingerprint = writeResult.status_fingerprint || event.payload.status_fingerprint || null; if (writeResult.inserted) { state.last_environment_status_write_at = handledAt; } else { state.last_environment_status_duplicate_at = handledAt; } } if (shouldRunDerivedRefresh) { await publishLiquidityNotification({ topic, event }); } if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) { try { await refreshPortfolioMetrics(); } catch (error) { state.metrics_error = serializeError(error); logger.error('portfolio_metrics_refresh_failed', { topic, details: { error: serializeError(error), }, }); } } if (shouldRunDerivedRefresh && quoteOutcomeTopics.has(topic)) { try { const records = await refreshQuoteOutcomeAttributions(); await publishQuoteOutcomeNotifications(records, { minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, }); } catch (error) { state.quote_outcomes_error = serializeError(error); logger.error('quote_outcomes_refresh_failed', { topic, details: { error: serializeError(error), }, }); } } if (shouldRunDerivedRefresh && intentRequestOutcomeTopics.has(topic)) { try { const records = await refreshIntentRequestOutcomeAttributions(); await publishIntentRequestOutcomeNotifications(records, { minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, }); } catch (error) { state.intent_request_outcomes_error = serializeError(error); logger.error('intent_request_outcomes_refresh_failed', { topic, details: { error: serializeError(error), }, }); } } } catch (error) { state.last_error = serializeError(error); state.error_count += 1; logger.error('history_write_failed', { topic, details: { error: serializeError(error), }, }); } finally { if (state.draining) { setTimeout(() => shutdown(), 0); } } }, }); const controlApi = startControlApi({ host: config.historyWriterControlHost, port: config.historyWriterControlPort, logger: logger.child({ component: 'control-api' }), service: 'history-writer', namespace: config.projectNamespace, stateProvider: { async getState() { const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); return { ...state, database_connectivity: connectivity, trading_config: tradingConfigStore.getState(), }; }, }, healthProvider: { async getHealth() { const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); const tradingConfig = tradingConfigStore.getState(); const lastTruthAt = state.last_write_at || state.last_metrics_at || null; const freshnessAgeMs = lastTruthAt ? Date.now() - new Date(lastTruthAt).getTime() : null; return { ok: connectivity && tradingConfig.ok === true && (freshnessAgeMs == null || freshnessAgeMs <= config.opsSentinelHistoryWriterStaleMs), paused: state.paused, last_write_at: state.last_write_at, last_alert_write_at: state.last_alert_write_at, last_environment_status_write_at: state.last_environment_status_write_at, last_environment_status_seen_at: state.last_environment_status_seen_at, last_environment_status_duplicate_at: state.last_environment_status_duplicate_at, last_environment_status_fingerprint: state.last_environment_status_fingerprint, last_metrics_at: state.last_metrics_at, freshness_age_ms: Number.isFinite(freshnessAgeMs) ? Math.max(0, freshnessAgeMs) : null, database_connectivity: connectivity, trading_config_ok: tradingConfig.ok, trading_config_block_reason: tradingConfig.block_reason, last_error: state.last_error, reason: tradingConfig.ok === true ? null : tradingConfig.block_reason || 'trading config unavailable', }; }, }, routes: [ { method: 'GET', path: '/portfolio-metrics', readBody: false, handler: async () => { const latest = await loadLatestPortfolioMetric(pool); if (!latest) { return { statusCode: 404, payload: { error: 'portfolio_metrics_unavailable', }, }; } return latest; }, }, { method: 'POST', path: '/pause', handler: () => { state.paused = true; consumer.pause(topics.map((topic) => ({ topic }))); return { ok: true, paused: true }; }, }, { method: 'POST', path: '/resume', handler: () => { state.paused = false; consumer.resume(topics.map((topic) => ({ topic }))); return { ok: true, paused: false }; }, }, { method: 'POST', path: '/drain', handler: () => { state.draining = true; state.paused = true; consumer.pause(topics.map((topic) => ({ topic }))); setTimeout(() => shutdown(), 0); return { ok: true, draining: true }; }, }, ], }); async function refreshPortfolioMetrics() { const tradingConfig = await requireTradingConfig(); const inputs = await loadPortfolioMetricInputs(pool, { btcAsset: tradingConfig.tradingBtc, btcAssets: tradingConfig.tradingBtcAssets, eureAsset: tradingConfig.tradingEure, trackedAssets: tradingConfig.trackedAssets, }); const payload = computePortfolioMetric({ baseline: inputs.baseline, currentInventory: inputs.currentInventory?.payload, currentPrice: inputs.currentPrice?.payload, externalFlows: inputs.externalFlows || [], btcAsset: tradingConfig.tradingBtc, btcAssets: tradingConfig.tradingBtcAssets, eureAsset: tradingConfig.tradingEure, valuationAssets: inputs.valuationAssets || [], commandCount: inputs.commandCount, resultCount: inputs.resultCount, }); if (!payload) return null; const computedAt = new Date().toISOString(); const metricId = buildPortfolioMetricId({ baselineInventoryId: inputs.baseline?.inventory?.inventory_id || null, currentInventoryId: inputs.currentInventory?.payload?.inventory_id || null, currentPriceId: [ inputs.currentPrice?.payload?.price_id, ...(inputs.valuationAssets || []).map((asset) => asset.priceId), ].filter(Boolean).join('+') || null, }); await upsertPortfolioMetric(pool, { metricId, computedAt, baselineAnchorAt: inputs.baseline?.command_at || null, baselineStatus: payload.baseline_status, payload, }); state.last_metrics_at = computedAt; state.metrics_error = null; state.latest_portfolio_metrics = summarizePortfolioMetric({ metric_id: metricId, computed_at: computedAt, baseline_anchor_at: inputs.baseline?.command_at || null, baseline_status: payload.baseline_status, payload, }); return state.latest_portfolio_metrics; } async function refreshIntentRequestOutcomeAttributions() { const tradingConfig = await requireTradingConfig(); const records = await refreshIntentRequestOutcomes(pool, { btcAsset: tradingConfig.tradingBtc, eureAsset: tradingConfig.tradingEure, }); state.last_intent_request_outcomes_at = new Date().toISOString(); state.latest_intent_request_outcomes = { refreshed_count: records.length, completed_count: records.filter((entry) => entry.outcome_status === 'completed').length, not_filled_count: records.filter((entry) => entry.outcome_status === 'not_filled').length, awaiting_settlement_count: records.filter((entry) => entry.outcome_status === 'awaiting_settlement').length, }; state.intent_request_outcomes_error = null; return records; } async function refreshQuoteOutcomeAttributions() { const tradingConfig = await requireTradingConfig(); const records = await refreshQuoteOutcomes(pool, { btcAsset: tradingConfig.tradingBtc, eureAsset: tradingConfig.tradingEure, }); state.last_quote_outcomes_at = new Date().toISOString(); state.quote_outcomes_error = null; state.latest_quote_outcomes = { count: records.length, completed_count: records.filter((entry) => entry.outcome_status === 'completed').length, not_filled_count: records.filter((entry) => entry.outcome_status === 'not_filled').length, submitted_count: records.filter((entry) => entry.outcome_status === 'submitted').length, }; return records; } async function requireTradingConfig() { const tradingConfig = await tradingConfigStore.getConfig(); if (!tradingConfig.ok || !tradingConfig.tradingBtc || !tradingConfig.tradingEure) { throw new Error(`trading config unavailable: ${tradingConfig.blockReason || 'missing current assets'}`); } return tradingConfig; } async function publishLiquidityNotification({ topic, event }) { if (topic !== config.kafkaTopicOpsLiquidityAction) return; const notification = buildLiquidityActionNotification({ event, config }); await publishNotification(notification); } async function publishQuoteOutcomeNotifications(records, { minObservedAt = null } = {}) { for (const record of records || []) { if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue; await publishNotification(buildQuoteOutcomeNotification({ record, config })); } } async function publishIntentRequestOutcomeNotifications(records, { minObservedAt = null } = {}) { for (const record of records || []) { if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue; await publishNotification(buildIntentRequestOutcomeNotification({ record, config })); } } async function publishNotification(notification) { if (!notification) return null; const result = await notificationDispatcher.publishOnce(notification); if (result.delivered) { state.notification_count += 1; state.last_notification_at = new Date().toISOString(); state.last_notification_error = null; } else if (result.ok === false) { state.notification_error_count += 1; state.last_notification_error = result.error || { status: result.status, response: result.response || null }; } return result; } function summarizePortfolioMetric(metric) { if (!metric) return null; return { metric_id: metric.metric_id, computed_at: metric.computed_at, baseline_anchor_at: metric.baseline_anchor_at, baseline_status: metric.baseline_status, current_portfolio_value_eure: metric.payload?.current_portfolio_value_eure ?? null, portfolio_vs_simple_hold_eure: metric.payload?.portfolio_vs_simple_hold_eure ?? null, mark_to_market_pnl_eure: metric.payload?.mark_to_market_pnl_eure ?? null, price_move_pnl_eure: metric.payload?.price_move_pnl_eure ?? null, command_count: metric.payload?.command_count ?? 0, result_count: metric.payload?.result_count ?? 0, }; } async function shutdown() { await controlApi.close().catch(() => {}); await consumer.disconnect(); await pool.end(); process.exit(0); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);