unrip/src/apps/history-writer.mjs
philipp 8f109a7463
Some checks failed
deploy / deploy (push) Failing after 40s
Value tracked USDC in portfolio metrics
Proof: Dashboard portfolio metrics now include DB-tracked USDC balances valued from live BTC/EUR and BTC/USDC reference prices, with regression coverage for the observed USDC inventory case.

Assumptions: USDC is cash-equivalent for valuation when a fresh BTC/USDC reference event is available; live trading safety remains governed by pair config and price route checks.

Still fake: Portfolio valuation still does not provide fee-complete realized PnL or generalized valuation for every imported non-stable asset.
2026-05-18 17:51:31 +02:00

531 lines
18 KiB
JavaScript

import process from 'node:process';
import { createConsumer } from '../bus/kafka/consumer.mjs';
import { startControlApi } from '../core/control-api.mjs';
import { routeHistoryRecord } from '../core/history-records.mjs';
import { shouldRunDerivedRefreshForEvent } from '../core/history-writer-refresh-policy.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import {
buildIntentRequestOutcomeNotification,
buildLiquidityActionNotification,
buildQuoteOutcomeNotification,
createNotificationDispatcher,
observedAtOrAfter,
} from '../core/notification-layer.mjs';
import { createNtfyNotificationClient } from '../core/notification-client.mjs';
import { buildPortfolioMetricId, computePortfolioMetric } from '../core/portfolio-metrics.mjs';
import { parseEventMessage } from '../core/event-envelope.mjs';
import { loadConfig } from '../lib/config.mjs';
import {
claimNotificationDelivery,
createTradingConfigStore,
createPostgresPool,
ensureHistorySchema,
insertEnvironmentStatusChange,
finishNotificationDelivery,
insertHistoryEvent,
loadLatestPortfolioMetric,
loadPortfolioMetricInputs,
refreshIntentRequestOutcomes,
refreshQuoteOutcomes,
seedTradingConfig,
upsertPortfolioMetric,
} from '../lib/postgres.mjs';
const config = loadConfig();
const logger = createLogger({
service: 'history-writer',
component: 'history',
namespace: config.projectNamespace,
});
const pool = createPostgresPool({
connectionString: config.postgresUrl,
});
await ensureHistorySchema(pool);
await seedTradingConfig(pool);
const tradingConfigStore = createTradingConfigStore({
pool,
logger: logger.child({ component: 'trading-config' }),
});
await tradingConfigStore.forceRefresh();
const notificationLogger = logger.child({ component: 'notifications' });
const notificationClient = createNtfyNotificationClient({
baseUrl: config.notificationNtfyBaseUrl,
topic: config.notificationNtfyTopic,
token: config.notificationNtfyToken,
timeoutMs: config.notificationNtfyTimeoutMs,
logger: notificationLogger,
});
const notificationDispatcher = createNotificationDispatcher({
client: notificationClient,
logger: notificationLogger,
store: {
claim: (notification) => claimNotificationDelivery(pool, {
notificationKey: notification.notification_key,
notificationType: notification.notification_type,
sourceKind: notification.source_kind,
sourceId: notification.source_id,
payload: notification.data || {},
}),
finish: (notification, result) => finishNotificationDelivery(pool, {
notificationKey: notification.notification_key,
ok: result.ok,
response: result.ok ? { status: result.status, response: result.response || null } : null,
error: result.ok ? null : result.error || { status: result.status, response: result.response || null },
}),
},
});
const consumer = await createConsumer({
groupId: config.kafkaConsumerGroupHistory,
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
const topics = [
config.kafkaTopicRawNearIntentsQuote,
config.kafkaTopicNormSwapDemand,
config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory,
config.kafkaTopicOpsLiquidityAction,
config.kafkaTopicOpsFundingObservation,
config.kafkaTopicOpsAlert,
config.kafkaTopicOpsEnvironmentStatus,
config.kafkaTopicDecisionTradeDecision,
config.kafkaTopicCmdExecuteTrade,
config.kafkaTopicExecTradeResult,
];
const portfolioMetricTopics = new Set([
config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory,
config.kafkaTopicOpsLiquidityAction,
config.kafkaTopicCmdExecuteTrade,
config.kafkaTopicExecTradeResult,
]);
const quoteOutcomeTopics = new Set([
config.kafkaTopicStateIntentInventory,
config.kafkaTopicCmdExecuteTrade,
config.kafkaTopicExecTradeResult,
]);
const intentRequestOutcomeTopics = new Set([
config.kafkaTopicStateIntentInventory,
]);
for (const topic of topics) {
// Raw quote volume is a live firehose; replaying retained history can starve
// durable strategy/execution topics and exhaust the writer.
await consumer.subscribe({
topic,
fromBeginning: topic !== config.kafkaTopicRawNearIntentsQuote,
});
}
const state = {
paused: false,
draining: false,
last_write_at: null,
last_funding_observation_write_at: null,
last_alert_write_at: null,
last_environment_status_write_at: null,
last_environment_status_seen_at: null,
last_environment_status_duplicate_at: null,
last_environment_status_fingerprint: null,
last_metrics_at: null,
last_error: null,
error_count: 0,
offsets: {},
latest_portfolio_metrics: null,
metrics_error: null,
last_quote_outcomes_at: null,
latest_quote_outcomes: null,
quote_outcomes_error: null,
last_intent_request_outcomes_at: null,
latest_intent_request_outcomes: null,
intent_request_outcomes_error: null,
derived_refresh_skipped_count: 0,
last_derived_refresh_skipped_at: null,
last_derived_refresh_skipped_topic: null,
notification_count: 0,
notification_error_count: 0,
last_notification_at: null,
last_notification_error: null,
};
await refreshPortfolioMetrics().catch((error) => {
state.metrics_error = serializeError(error);
});
await refreshQuoteOutcomeAttributions().catch((error) => {
state.quote_outcomes_error = serializeError(error);
});
await refreshIntentRequestOutcomeAttributions().catch((error) => {
state.intent_request_outcomes_error = serializeError(error);
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!message.value || state.paused) return;
try {
const event = parseEventMessage(message.value.toString());
const routed = routeHistoryRecord({ topic, event });
const writeResult = topic === config.kafkaTopicOpsEnvironmentStatus
? await insertEnvironmentStatusChange(pool, {
topic,
event,
record: routed.record,
})
: await insertHistoryEvent(pool, {
table: routed.table,
topic,
event,
record: routed.record,
}).then(() => ({ inserted: true }));
const handledAt = new Date().toISOString();
if (writeResult.inserted) {
state.last_write_at = handledAt;
}
state.last_error = null;
state.offsets[topic] = {
partition,
offset: message.offset,
};
const shouldRunDerivedRefresh = shouldRunDerivedRefreshForEvent({
event,
now: handledAt,
maxEventAgeMs: config.historyWriterDerivedRefreshMaxEventAgeMs,
});
if (!shouldRunDerivedRefresh) {
state.derived_refresh_skipped_count += 1;
state.last_derived_refresh_skipped_at = handledAt;
state.last_derived_refresh_skipped_topic = topic;
}
if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) {
state.last_funding_observation_write_at = state.last_write_at;
}
if (topic === config.kafkaTopicOpsAlert && writeResult.inserted) {
state.last_alert_write_at = state.last_write_at;
}
if (topic === config.kafkaTopicOpsEnvironmentStatus) {
state.last_environment_status_seen_at = handledAt;
state.last_environment_status_fingerprint = writeResult.status_fingerprint || event.payload.status_fingerprint || null;
if (writeResult.inserted) {
state.last_environment_status_write_at = handledAt;
} else {
state.last_environment_status_duplicate_at = handledAt;
}
}
if (shouldRunDerivedRefresh) {
await publishLiquidityNotification({ topic, event });
}
if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) {
try {
await refreshPortfolioMetrics();
} catch (error) {
state.metrics_error = serializeError(error);
logger.error('portfolio_metrics_refresh_failed', {
topic,
details: {
error: serializeError(error),
},
});
}
}
if (shouldRunDerivedRefresh && quoteOutcomeTopics.has(topic)) {
try {
const records = await refreshQuoteOutcomeAttributions();
await publishQuoteOutcomeNotifications(records, {
minObservedAt: event.observed_at || event.ingested_at || state.last_write_at,
});
} catch (error) {
state.quote_outcomes_error = serializeError(error);
logger.error('quote_outcomes_refresh_failed', {
topic,
details: {
error: serializeError(error),
},
});
}
}
if (shouldRunDerivedRefresh && intentRequestOutcomeTopics.has(topic)) {
try {
const records = await refreshIntentRequestOutcomeAttributions();
await publishIntentRequestOutcomeNotifications(records, {
minObservedAt: event.observed_at || event.ingested_at || state.last_write_at,
});
} catch (error) {
state.intent_request_outcomes_error = serializeError(error);
logger.error('intent_request_outcomes_refresh_failed', {
topic,
details: {
error: serializeError(error),
},
});
}
}
} catch (error) {
state.last_error = serializeError(error);
state.error_count += 1;
logger.error('history_write_failed', {
topic,
details: {
error: serializeError(error),
},
});
} finally {
if (state.draining) {
setTimeout(() => shutdown(), 0);
}
}
},
});
const controlApi = startControlApi({
host: config.historyWriterControlHost,
port: config.historyWriterControlPort,
logger: logger.child({ component: 'control-api' }),
service: 'history-writer',
namespace: config.projectNamespace,
stateProvider: {
async getState() {
const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false);
return {
...state,
database_connectivity: connectivity,
trading_config: tradingConfigStore.getState(),
};
},
},
healthProvider: {
async getHealth() {
const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false);
const tradingConfig = tradingConfigStore.getState();
const lastTruthAt = state.last_write_at || state.last_metrics_at || null;
const freshnessAgeMs = lastTruthAt ? Date.now() - new Date(lastTruthAt).getTime() : null;
return {
ok: connectivity
&& tradingConfig.ok === true
&& (freshnessAgeMs == null || freshnessAgeMs <= config.opsSentinelHistoryWriterStaleMs),
paused: state.paused,
last_write_at: state.last_write_at,
last_alert_write_at: state.last_alert_write_at,
last_environment_status_write_at: state.last_environment_status_write_at,
last_environment_status_seen_at: state.last_environment_status_seen_at,
last_environment_status_duplicate_at: state.last_environment_status_duplicate_at,
last_environment_status_fingerprint: state.last_environment_status_fingerprint,
last_metrics_at: state.last_metrics_at,
freshness_age_ms: Number.isFinite(freshnessAgeMs) ? Math.max(0, freshnessAgeMs) : null,
database_connectivity: connectivity,
trading_config_ok: tradingConfig.ok,
trading_config_block_reason: tradingConfig.block_reason,
last_error: state.last_error,
reason: tradingConfig.ok === true
? null
: tradingConfig.block_reason || 'trading config unavailable',
};
},
},
routes: [
{
method: 'GET',
path: '/portfolio-metrics',
readBody: false,
handler: async () => {
const latest = await loadLatestPortfolioMetric(pool);
if (!latest) {
return {
statusCode: 404,
payload: {
error: 'portfolio_metrics_unavailable',
},
};
}
return latest;
},
},
{
method: 'POST',
path: '/pause',
handler: () => {
state.paused = true;
consumer.pause(topics.map((topic) => ({ topic })));
return { ok: true, paused: true };
},
},
{
method: 'POST',
path: '/resume',
handler: () => {
state.paused = false;
consumer.resume(topics.map((topic) => ({ topic })));
return { ok: true, paused: false };
},
},
{
method: 'POST',
path: '/drain',
handler: () => {
state.draining = true;
state.paused = true;
consumer.pause(topics.map((topic) => ({ topic })));
setTimeout(() => shutdown(), 0);
return { ok: true, draining: true };
},
},
],
});
async function refreshPortfolioMetrics() {
const tradingConfig = await requireTradingConfig();
const inputs = await loadPortfolioMetricInputs(pool, {
btcAsset: tradingConfig.tradingBtc,
btcAssets: tradingConfig.tradingBtcAssets,
eureAsset: tradingConfig.tradingEure,
trackedAssets: tradingConfig.trackedAssets,
});
const payload = computePortfolioMetric({
baseline: inputs.baseline,
currentInventory: inputs.currentInventory?.payload,
currentPrice: inputs.currentPrice?.payload,
externalFlows: inputs.externalFlows || [],
btcAsset: tradingConfig.tradingBtc,
btcAssets: tradingConfig.tradingBtcAssets,
eureAsset: tradingConfig.tradingEure,
valuationAssets: inputs.valuationAssets || [],
commandCount: inputs.commandCount,
resultCount: inputs.resultCount,
});
if (!payload) return null;
const computedAt = new Date().toISOString();
const metricId = buildPortfolioMetricId({
baselineInventoryId: inputs.baseline?.inventory?.inventory_id || null,
currentInventoryId: inputs.currentInventory?.payload?.inventory_id || null,
currentPriceId: [
inputs.currentPrice?.payload?.price_id,
...(inputs.valuationAssets || []).map((asset) => asset.priceId),
].filter(Boolean).join('+') || null,
});
await upsertPortfolioMetric(pool, {
metricId,
computedAt,
baselineAnchorAt: inputs.baseline?.command_at || null,
baselineStatus: payload.baseline_status,
payload,
});
state.last_metrics_at = computedAt;
state.metrics_error = null;
state.latest_portfolio_metrics = summarizePortfolioMetric({
metric_id: metricId,
computed_at: computedAt,
baseline_anchor_at: inputs.baseline?.command_at || null,
baseline_status: payload.baseline_status,
payload,
});
return state.latest_portfolio_metrics;
}
async function refreshIntentRequestOutcomeAttributions() {
const tradingConfig = await requireTradingConfig();
const records = await refreshIntentRequestOutcomes(pool, {
btcAsset: tradingConfig.tradingBtc,
eureAsset: tradingConfig.tradingEure,
});
state.last_intent_request_outcomes_at = new Date().toISOString();
state.latest_intent_request_outcomes = {
refreshed_count: records.length,
completed_count: records.filter((entry) => entry.outcome_status === 'completed').length,
not_filled_count: records.filter((entry) => entry.outcome_status === 'not_filled').length,
awaiting_settlement_count: records.filter((entry) => entry.outcome_status === 'awaiting_settlement').length,
};
state.intent_request_outcomes_error = null;
return records;
}
async function refreshQuoteOutcomeAttributions() {
const tradingConfig = await requireTradingConfig();
const records = await refreshQuoteOutcomes(pool, {
btcAsset: tradingConfig.tradingBtc,
eureAsset: tradingConfig.tradingEure,
});
state.last_quote_outcomes_at = new Date().toISOString();
state.quote_outcomes_error = null;
state.latest_quote_outcomes = {
count: records.length,
completed_count: records.filter((entry) => entry.outcome_status === 'completed').length,
not_filled_count: records.filter((entry) => entry.outcome_status === 'not_filled').length,
submitted_count: records.filter((entry) => entry.outcome_status === 'submitted').length,
};
return records;
}
async function requireTradingConfig() {
const tradingConfig = await tradingConfigStore.getConfig();
if (!tradingConfig.ok || !tradingConfig.tradingBtc || !tradingConfig.tradingEure) {
throw new Error(`trading config unavailable: ${tradingConfig.blockReason || 'missing current assets'}`);
}
return tradingConfig;
}
async function publishLiquidityNotification({ topic, event }) {
if (topic !== config.kafkaTopicOpsLiquidityAction) return;
const notification = buildLiquidityActionNotification({ event, config });
await publishNotification(notification);
}
async function publishQuoteOutcomeNotifications(records, { minObservedAt = null } = {}) {
for (const record of records || []) {
if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue;
await publishNotification(buildQuoteOutcomeNotification({ record, config }));
}
}
async function publishIntentRequestOutcomeNotifications(records, { minObservedAt = null } = {}) {
for (const record of records || []) {
if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue;
await publishNotification(buildIntentRequestOutcomeNotification({ record, config }));
}
}
async function publishNotification(notification) {
if (!notification) return null;
const result = await notificationDispatcher.publishOnce(notification);
if (result.delivered) {
state.notification_count += 1;
state.last_notification_at = new Date().toISOString();
state.last_notification_error = null;
} else if (result.ok === false) {
state.notification_error_count += 1;
state.last_notification_error = result.error || { status: result.status, response: result.response || null };
}
return result;
}
function summarizePortfolioMetric(metric) {
if (!metric) return null;
return {
metric_id: metric.metric_id,
computed_at: metric.computed_at,
baseline_anchor_at: metric.baseline_anchor_at,
baseline_status: metric.baseline_status,
current_portfolio_value_eure: metric.payload?.current_portfolio_value_eure ?? null,
portfolio_vs_simple_hold_eure: metric.payload?.portfolio_vs_simple_hold_eure ?? null,
mark_to_market_pnl_eure: metric.payload?.mark_to_market_pnl_eure ?? null,
price_move_pnl_eure: metric.payload?.price_move_pnl_eure ?? null,
command_count: metric.payload?.command_count ?? 0,
result_count: metric.payload?.result_count ?? 0,
};
}
async function shutdown() {
await controlApi.close().catch(() => {});
await consumer.disconnect();
await pool.end();
process.exit(0);
}
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);