import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { routeHistoryRecord } from '../core/history-records.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { buildPortfolioMetricId, computePortfolioMetric } from '../core/portfolio-metrics.mjs'; import { parseEventMessage } from '../core/event-envelope.mjs'; import { loadConfig } from '../lib/config.mjs'; import { createPostgresPool, ensureHistorySchema, insertHistoryEvent, loadLatestPortfolioMetric, loadPortfolioMetricInputs, upsertPortfolioMetric, } from '../lib/postgres.mjs'; const config = loadConfig(); const logger = createLogger({ service: 'history-writer', component: 'history', namespace: config.projectNamespace, }); const pool = createPostgresPool({ connectionString: config.postgresUrl, }); await ensureHistorySchema(pool); const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupHistory, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const topics = [ config.kafkaTopicRawNearIntentsQuote, config.kafkaTopicNormSwapDemand, config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, config.kafkaTopicOpsLiquidityAction, config.kafkaTopicOpsFundingObservation, config.kafkaTopicOpsAlert, config.kafkaTopicDecisionTradeDecision, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]; const portfolioMetricTopics = new Set([ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]); for (const topic of topics) { await consumer.subscribe({ topic, fromBeginning: false }); } const state = { paused: false, draining: false, last_write_at: null, last_funding_observation_write_at: null, last_alert_write_at: null, last_metrics_at: null, last_error: null, error_count: 0, offsets: {}, latest_portfolio_metrics: null, metrics_error: null, }; await refreshPortfolioMetrics().catch((error) => { state.metrics_error = serializeError(error); }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { if (!message.value || state.paused) return; try { const event = parseEventMessage(message.value.toString()); const routed = routeHistoryRecord({ topic, event }); await insertHistoryEvent(pool, { table: routed.table, topic, event, record: routed.record, }); state.last_write_at = new Date().toISOString(); state.last_error = null; state.offsets[topic] = { partition, offset: message.offset, }; if (topic === config.kafkaTopicOpsFundingObservation) { state.last_funding_observation_write_at = state.last_write_at; } if (topic === config.kafkaTopicOpsAlert) { state.last_alert_write_at = state.last_write_at; } if (portfolioMetricTopics.has(topic)) { try { await refreshPortfolioMetrics(); } catch (error) { state.metrics_error = serializeError(error); logger.error('portfolio_metrics_refresh_failed', { topic, details: { error: serializeError(error), }, }); } } } catch (error) { state.last_error = serializeError(error); state.error_count += 1; logger.error('history_write_failed', { topic, details: { error: serializeError(error), }, }); } finally { if (state.draining) { setTimeout(() => shutdown(), 0); } } }, }); const controlApi = startControlApi({ host: config.historyWriterControlHost, port: config.historyWriterControlPort, logger: logger.child({ component: 'control-api' }), service: 'history-writer', namespace: config.projectNamespace, stateProvider: { async getState() { const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); return { ...state, database_connectivity: connectivity, }; }, }, routes: [ { method: 'GET', path: '/portfolio-metrics', readBody: false, handler: async () => { const latest = await loadLatestPortfolioMetric(pool); if (!latest) { return { statusCode: 404, payload: { error: 'portfolio_metrics_unavailable', }, }; } return latest; }, }, { method: 'POST', path: '/pause', handler: () => { state.paused = true; consumer.pause(topics.map((topic) => ({ topic }))); return { ok: true, paused: true }; }, }, { method: 'POST', path: '/resume', handler: () => { state.paused = false; consumer.resume(topics.map((topic) => ({ topic }))); return { ok: true, paused: false }; }, }, { method: 'POST', path: '/drain', handler: () => { state.draining = true; state.paused = true; consumer.pause(topics.map((topic) => ({ topic }))); setTimeout(() => shutdown(), 0); return { ok: true, draining: true }; }, }, ], }); async function refreshPortfolioMetrics() { const inputs = await loadPortfolioMetricInputs(pool); const payload = computePortfolioMetric({ baseline: inputs.baseline, currentInventory: inputs.currentInventory?.payload, currentPrice: inputs.currentPrice?.payload, btcAsset: config.tradingBtc, eureAsset: config.tradingEure, commandCount: inputs.commandCount, resultCount: inputs.resultCount, }); if (!payload) return null; const computedAt = new Date().toISOString(); const metricId = buildPortfolioMetricId({ baselineInventoryId: inputs.baseline?.inventory?.inventory_id || null, currentInventoryId: inputs.currentInventory?.payload?.inventory_id || null, currentPriceId: inputs.currentPrice?.payload?.price_id || null, }); await upsertPortfolioMetric(pool, { metricId, computedAt, baselineAnchorAt: inputs.baseline?.command_at || null, baselineStatus: payload.baseline_status, payload, }); state.last_metrics_at = computedAt; state.metrics_error = null; state.latest_portfolio_metrics = summarizePortfolioMetric({ metric_id: metricId, computed_at: computedAt, baseline_anchor_at: inputs.baseline?.command_at || null, baseline_status: payload.baseline_status, payload, }); return state.latest_portfolio_metrics; } function summarizePortfolioMetric(metric) { if (!metric) return null; return { metric_id: metric.metric_id, computed_at: metric.computed_at, baseline_anchor_at: metric.baseline_anchor_at, baseline_status: metric.baseline_status, current_portfolio_value_eure: metric.payload?.current_portfolio_value_eure ?? null, trade_pnl_eure: metric.payload?.trade_pnl_eure ?? null, mark_to_market_pnl_eure: metric.payload?.mark_to_market_pnl_eure ?? null, price_move_pnl_eure: metric.payload?.price_move_pnl_eure ?? null, command_count: metric.payload?.command_count ?? 0, result_count: metric.payload?.result_count ?? 0, }; } async function shutdown() { await controlApi.close().catch(() => {}); await consumer.disconnect(); await pool.end(); process.exit(0); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);