diff --git a/.env.example b/.env.example index de24d66..284b221 100644 --- a/.env.example +++ b/.env.example @@ -95,3 +95,5 @@ OPS_SENTINEL_PRICE_STALE_MS=30000 OPS_SENTINEL_INVENTORY_STALE_MS=30000 OPS_SENTINEL_FUNDING_CREDIT_PENDING_MS=300000 OPS_SENTINEL_FUNDING_STUCK_MS=3600000 +OPS_SENTINEL_ALERT_WEBHOOK_URL= +OPS_SENTINEL_ALERT_WEBHOOK_TIMEOUT_MS=5000 diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 5c35d35..b51e2c6 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -50,6 +50,7 @@ const topics = [ const portfolioMetricTopics = new Set([ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, + config.kafkaTopicOpsLiquidityAction, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, ]); @@ -141,11 +142,28 @@ const controlApi = startControlApi({ stateProvider: { async getState() { const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); - return { - ...state, - database_connectivity: connectivity, - }; - }, + return { + ...state, + database_connectivity: connectivity, + }; + }, + }, + healthProvider: { + async getHealth() { + const connectivity = await pool.query('SELECT 1').then(() => true).catch(() => false); + const lastTruthAt = state.last_write_at || state.last_metrics_at || null; + const freshnessAgeMs = lastTruthAt ? Date.now() - new Date(lastTruthAt).getTime() : null; + return { + ok: connectivity && (freshnessAgeMs == null || freshnessAgeMs <= config.opsSentinelHistoryWriterStaleMs), + paused: state.paused, + last_write_at: state.last_write_at, + last_alert_write_at: state.last_alert_write_at, + last_metrics_at: state.last_metrics_at, + freshness_age_ms: Number.isFinite(freshnessAgeMs) ? Math.max(0, freshnessAgeMs) : null, + database_connectivity: connectivity, + last_error: state.last_error, + }; + }, }, routes: [ { @@ -198,11 +216,15 @@ const controlApi = startControlApi({ }); async function refreshPortfolioMetrics() { - const inputs = await loadPortfolioMetricInputs(pool); + const inputs = await loadPortfolioMetricInputs(pool, { + btcAsset: config.tradingBtc, + eureAsset: config.tradingEure, + }); const payload = computePortfolioMetric({ baseline: inputs.baseline, currentInventory: inputs.currentInventory?.payload, currentPrice: inputs.currentPrice?.payload, + externalFlows: inputs.externalFlows || [], btcAsset: config.tradingBtc, eureAsset: config.tradingEure, commandCount: inputs.commandCount, diff --git a/src/apps/near-intents-ingest.mjs b/src/apps/near-intents-ingest.mjs index def2a53..a9adc4d 100644 --- a/src/apps/near-intents-ingest.mjs +++ b/src/apps/near-intents-ingest.mjs @@ -6,6 +6,7 @@ import { createLogger } from '../core/log.mjs'; import { createPairFilterController } from '../core/pair-filter.mjs'; import { loadConfig } from '../lib/config.mjs'; import { startNearIntentsWs } from '../venues/near-intents/ws.mjs'; +import { ageMs } from '../core/runtime-health.mjs'; const config = loadConfig(); const logger = createLogger({ @@ -73,7 +74,38 @@ const controlApi = config.nearIntentsControlApiEnabled }; }, }, + healthProvider: { + getHealth() { + const ingest = wsRuntime.getState(); + const lastTruthAt = ingest.last_published_at || ingest.last_matching_quote_at || ingest.last_message_at; + const freshnessAgeMs = ageMs(lastTruthAt); + const staleAfterMs = config.opsSentinelIngestQuoteStaleMs; + return { + ok: Boolean(ingest.connected) && (freshnessAgeMs == null || freshnessAgeMs <= staleAfterMs), + connected: ingest.connected, + last_message_at: ingest.last_message_at, + last_matching_quote_at: ingest.last_matching_quote_at, + last_published_at: ingest.last_published_at, + freshness_age_ms: freshnessAgeMs, + stale_after_ms: staleAfterMs, + reason: + ingest.connected + ? freshnessAgeMs != null && freshnessAgeMs > staleAfterMs + ? 'quote truth stale' + : null + : 'websocket disconnected', + }; + }, + }, routes: [ + { + method: 'POST', + path: '/reconnect', + handler: () => { + wsRuntime.reconnect(); + return { ok: true, reconnecting: true }; + }, + }, { method: 'GET', path: '/pair-filter', diff --git a/src/apps/operator-dashboard.mjs b/src/apps/operator-dashboard.mjs new file mode 100644 index 0000000..9ef886b --- /dev/null +++ b/src/apps/operator-dashboard.mjs @@ -0,0 +1,617 @@ +import http from 'node:http'; +import process from 'node:process'; +import { readdir, readFile } from 'node:fs/promises'; +import path from 'node:path'; + +import { WebSocketServer } from 'ws'; + +import { createConsumer } from '../bus/kafka/consumer.mjs'; +import { parseEventMessage } from '../core/event-envelope.mjs'; +import { + applyDashboardLiveEvent, + buildDashboardBootstrap, + buildLiveStatusBar, + createDashboardLiveState, + listDashboardServices, + resolveDashboardControl, +} from '../core/operator-dashboard.mjs'; +import { + buildDashboardAuthChallengeHeader, + buildDashboardSessionCookie, + resolveDashboardRequestAuth, +} from '../core/operator-dashboard-auth.mjs'; +import { createLogger, serializeError } from '../core/log.mjs'; +import { readJsonBody, sendJson } from '../core/control-api.mjs'; +import { loadConfig } from '../lib/config.mjs'; +import { fetchJson } from '../lib/http.mjs'; +import { + createPostgresPool, + loadCurrentFundingObservations, + loadLatestInventorySnapshot, + loadLatestMarketPrice, + loadLatestPortfolioMetric, + loadRecentAlertTransitions, + loadRecentDepositStatuses, + loadRecentTradeDecisions, + loadRecentQuotes, + loadSuccessfulTradeSummary, + loadSuccessfulTradesPage, +} from '../lib/postgres.mjs'; + +const config = loadConfig(); +const logger = createLogger({ + service: 'operator-dashboard', + component: 'dashboard', + namespace: config.projectNamespace, +}); + +const dashboardRuntimeState = { + last_bootstrap_at: null, + last_bootstrap_error: null, + source_errors: {}, + last_source_error_at: null, + last_live_event_error: null, + websocket_clients: 0, +}; + +if ( + config.operatorDashboardAuthMode === 'basic' + && (!config.operatorDashboardAuthUsername || !config.operatorDashboardAuthPassword) +) { + logger.error('dashboard_basic_auth_config_missing', { + details: { + auth_mode: config.operatorDashboardAuthMode, + }, + }); + process.exit(1); +} + +const pool = createPostgresPool({ + connectionString: config.postgresUrl, +}); + +const staticAssets = await loadStaticAssets(); +const initialServiceSnapshots = await loadServiceSnapshots(); +const initialRecentQuotes = await safeSourceLoad( + 'recent_quotes', + () => loadRecentQuotes(pool, { + limit: config.operatorDashboardQuoteLimit, + }), + [], +); +const initialSuccessfulTradeSummary = await safeSourceLoad( + 'successful_trade_summary', + () => loadSuccessfulTradeSummary(pool), + { total: 0, last_successful_trade_at: null }, +); +const initialMarketPrice = await safeSourceLoad( + 'latest_market_price', + () => loadLatestMarketPrice(pool), + null, +); +const initialInventory = await safeSourceLoad( + 'latest_inventory', + () => loadLatestInventorySnapshot(pool), + null, +); + +const liveState = createDashboardLiveState({ + config, + recentQuotes: initialRecentQuotes, + latestMarketPrice: initialMarketPrice, + latestInventory: initialInventory, + successfulTradeCount: initialSuccessfulTradeSummary.total, + lastSuccessfulTradeAt: initialSuccessfulTradeSummary.last_successful_trade_at, + activeAlerts: + initialServiceSnapshots.find((snapshot) => snapshot.service === 'ops-sentinel')?.state?.active_alerts + || [], +}); + +const liveConsumer = await createConsumer({ + groupId: config.kafkaConsumerGroupOperatorDashboard, + brokers: config.kafkaBrokers, + clientId: config.kafkaClientId, + logger, +}); + +const liveTopics = [ + config.kafkaTopicNormSwapDemand, + config.kafkaTopicRefMarketPrice, + config.kafkaTopicStateIntentInventory, + config.kafkaTopicOpsAlert, + config.kafkaTopicExecTradeResult, +]; + +for (const topic of liveTopics) { + await liveConsumer.subscribe({ topic, fromBeginning: false }); +} + +await liveConsumer.run({ + eachMessage: async ({ topic, message }) => { + if (!message.value) return; + + try { + const event = parseEventMessage(message.value.toString()); + const updates = applyDashboardLiveEvent(liveState, { topic, event }); + for (const update of updates) { + broadcast(update); + } + } catch (error) { + dashboardRuntimeState.last_live_event_error = serializeError(error); + logger.error('dashboard_live_event_failed', { + topic, + details: { + error: serializeError(error), + }, + }); + } + }, +}); + +const webSockets = new Set(); +const webSocketServer = new WebSocketServer({ + noServer: true, +}); + +webSocketServer.on('connection', (socket, _req, authContext) => { + webSockets.add(socket); + dashboardRuntimeState.websocket_clients = webSockets.size; + socket.send(JSON.stringify({ + type: 'session.ready', + session: authContext, + live: { + recent_quotes: liveState.recent_quotes, + status_bar: buildLiveStatusBar(liveState), + }, + })); + + socket.on('close', () => { + webSockets.delete(socket); + dashboardRuntimeState.websocket_clients = webSockets.size; + }); +}); + +const server = http.createServer(async (req, res) => { + try { + const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`); + + if (req.method === 'GET' && url.pathname === '/healthz') { + return sendJson(res, 200, { + ok: Object.keys(dashboardRuntimeState.source_errors).length === 0 && !dashboardRuntimeState.last_bootstrap_error, + service: 'operator-dashboard', + websocket_clients: webSockets.size, + source_error_count: Object.keys(dashboardRuntimeState.source_errors).length, + last_source_error_at: dashboardRuntimeState.last_source_error_at, + last_bootstrap_at: dashboardRuntimeState.last_bootstrap_at, + last_bootstrap_error: dashboardRuntimeState.last_bootstrap_error, + last_live_event_error: dashboardRuntimeState.last_live_event_error, + }); + } + + if (req.method === 'GET' && url.pathname === '/state') { + return sendJson(res, 200, { + service: 'operator-dashboard', + namespace: config.projectNamespace, + websocket_clients: webSockets.size, + last_bootstrap_at: dashboardRuntimeState.last_bootstrap_at, + last_bootstrap_error: dashboardRuntimeState.last_bootstrap_error, + source_errors: Object.values(dashboardRuntimeState.source_errors), + source_error_count: Object.keys(dashboardRuntimeState.source_errors).length, + last_source_error_at: dashboardRuntimeState.last_source_error_at, + last_live_event_error: dashboardRuntimeState.last_live_event_error, + }); + } + + const auth = authenticateHttpRequest(req, res); + if (!auth) return; + + if (url.pathname.startsWith('/api/')) { + return handleApiRequest({ req, res, url, auth }); + } + + if (req.method === 'GET' && staticAssets.has(url.pathname)) { + const asset = staticAssets.get(url.pathname); + res.statusCode = 200; + res.setHeader('content-type', asset.contentType); + res.end(asset.body); + return; + } + + return sendJson(res, 404, { error: 'not_found' }); + } catch (error) { + logger.error('dashboard_request_failed', { + details: { + path: req.url, + error: serializeError(error), + }, + }); + return sendJson(res, 500, { + error: error.message, + }); + } +}); + +server.on('upgrade', (req, socket, head) => { + const url = new URL(req.url || '/', `http://${req.headers.host || 'localhost'}`); + if (url.pathname !== '/ws') { + socket.destroy(); + return; + } + + const auth = resolveDashboardRequestAuth({ + mode: config.operatorDashboardAuthMode, + authorizationHeader: req.headers.authorization || '', + cookieHeader: req.headers.cookie || '', + username: config.operatorDashboardAuthUsername, + password: config.operatorDashboardAuthPassword, + }); + if (!auth.authenticated) { + socket.write( + `HTTP/1.1 401 Unauthorized\r\nWWW-Authenticate: ${buildDashboardAuthChallengeHeader({ + realm: config.operatorDashboardAuthRealm, + })}\r\n\r\n`, + ); + socket.destroy(); + return; + } + + webSocketServer.handleUpgrade(req, socket, head, (ws) => { + webSocketServer.emit('connection', ws, req, auth); + }); +}); + +server.listen(config.operatorDashboardControlPort, config.operatorDashboardControlHost, () => { + logger.info('operator_dashboard_started', { + details: { + host: config.operatorDashboardControlHost, + port: config.operatorDashboardControlPort, + }, + }); +}); + +async function handleApiRequest({ req, res, url, auth }) { + if (req.method === 'GET' && url.pathname === '/api/session') { + return sendJson(res, 200, auth); + } + + if (req.method === 'GET' && url.pathname === '/api/bootstrap') { + const page = Number(url.searchParams.get('page') || 1); + const pageSize = Number( + url.searchParams.get('page_size') || config.operatorDashboardTradePageSize, + ); + const payload = await loadBootstrapPayload({ + auth, + page, + pageSize, + }); + return sendJson(res, 200, payload); + } + + if (req.method === 'GET' && url.pathname === '/api/trades') { + const page = Number(url.searchParams.get('page') || 1); + const pageSize = Number( + url.searchParams.get('page_size') || config.operatorDashboardTradePageSize, + ); + const successfulTrades = await loadSuccessfulTradesPage(pool, { + page, + pageSize, + }); + return sendJson(res, 200, successfulTrades); + } + + const controlMatch = req.method === 'POST' + ? url.pathname.match(/^\/api\/control\/([^/]+)\/([^/]+)$/) + : null; + if (controlMatch) { + const [, service, action] = controlMatch; + const body = await readJsonBody(req); + const control = resolveDashboardControl({ service, action }); + if (!control) { + return sendJson(res, 404, { + error: 'unknown_control', + }); + } + + const result = await invokeControl(control, body || {}); + const serviceSnapshot = await loadServiceSnapshot( + listDashboardServices(config).find((definition) => definition.service === control.service), + ); + return sendJson(res, 200, { + ok: true, + control, + result, + service_snapshot: serviceSnapshot, + }); + } + + return sendJson(res, 404, { error: 'not_found' }); +} + +async function loadBootstrapPayload({ auth, page, pageSize }) { + const sourceErrors = []; + const [ + portfolioMetric, + inventorySnapshot, + marketPrice, + recentQuotes, + successfulTradeSummary, + successfulTrades, + fundingObservations, + recentDepositStatuses, + recentTradeDecisions, + recentAlertTransitions, + serviceSnapshots, + ] = await Promise.all([ + safeSourceLoad('portfolio_metric', () => loadLatestPortfolioMetric(pool), null, sourceErrors), + safeSourceLoad('latest_inventory', () => loadLatestInventorySnapshot(pool), null, sourceErrors), + safeSourceLoad('latest_market_price', () => loadLatestMarketPrice(pool), null, sourceErrors), + safeSourceLoad( + 'recent_quotes', + () => loadRecentQuotes(pool, { + limit: config.operatorDashboardQuoteLimit, + }), + [], + sourceErrors, + ), + safeSourceLoad( + 'successful_trade_summary', + () => loadSuccessfulTradeSummary(pool), + { total: 0, last_successful_trade_at: null }, + sourceErrors, + ), + safeSourceLoad( + 'successful_trades', + () => loadSuccessfulTradesPage(pool, { + page, + pageSize, + }), + { + page, + page_size: pageSize, + total: 0, + total_pages: 1, + items: [], + }, + sourceErrors, + ), + safeSourceLoad('funding_observations', () => loadCurrentFundingObservations(pool), [], sourceErrors), + safeSourceLoad( + 'recent_deposit_statuses', + () => loadRecentDepositStatuses(pool, { limit: 20 }), + [], + sourceErrors, + ), + safeSourceLoad( + 'recent_trade_decisions', + () => loadRecentTradeDecisions(pool, { limit: 20 }), + [], + sourceErrors, + ), + safeSourceLoad( + 'recent_alert_transitions', + () => loadRecentAlertTransitions(pool, { limit: 20 }), + [], + sourceErrors, + ), + loadServiceSnapshots(), + ]); + + const payload = buildDashboardBootstrap({ + config, + auth, + portfolioMetric, + inventorySnapshot, + marketPrice, + recentQuotes, + successfulTrades, + successfulTradeSummary, + fundingObservations, + recentDepositStatuses, + recentTradeDecisions, + recentAlertTransitions, + serviceSnapshots, + sourceErrors, + }); + dashboardRuntimeState.last_bootstrap_at = new Date().toISOString(); + dashboardRuntimeState.last_bootstrap_error = null; + return payload; +} + +async function loadServiceSnapshots() { + const services = listDashboardServices(config); + return Promise.all(services.map((service) => loadServiceSnapshot(service))); +} + +async function loadServiceSnapshot(service) { + const [stateResult, healthResult] = await Promise.allSettled([ + fetchUpstreamJson(`${service.base_url}/state`), + fetchUpstreamJson(`${service.base_url}/healthz`), + ]); + + const state = stateResult.status === 'fulfilled' ? stateResult.value : null; + const health = healthResult.status === 'fulfilled' ? healthResult.value : null; + const error = stateResult.status === 'rejected' + ? serializeError(stateResult.reason) + : healthResult.status === 'rejected' + ? serializeError(healthResult.reason) + : null; + + return { + ...service, + reachable: Boolean(state || health), + state, + health, + error, + }; +} + +async function fetchUpstreamJson(url) { + return fetchJson(url, { + signal: AbortSignal.timeout(config.operatorDashboardUpstreamTimeoutMs), + }); +} + +async function invokeControl(control, body) { + const response = await fetchJson( + `${lookupServiceBaseUrl(control.service)}${control.path}`, + { + method: control.method, + headers: { + 'content-type': 'application/json', + }, + body: JSON.stringify(body || {}), + signal: AbortSignal.timeout(config.operatorDashboardUpstreamTimeoutMs), + }, + ); + + return response; +} + +function lookupServiceBaseUrl(serviceName) { + const service = listDashboardServices(config).find((entry) => entry.service === serviceName); + if (!service) { + throw new Error(`unknown service: ${serviceName}`); + } + return service.base_url; +} + +function broadcast(payload) { + const encoded = JSON.stringify(payload); + for (const socket of webSockets) { + if (socket.readyState !== 1) continue; + socket.send(encoded); + } +} + +function authenticateHttpRequest(req, res) { + const auth = resolveDashboardRequestAuth({ + mode: config.operatorDashboardAuthMode, + authorizationHeader: req.headers.authorization || '', + cookieHeader: req.headers.cookie || '', + username: config.operatorDashboardAuthUsername, + password: config.operatorDashboardAuthPassword, + }); + + if (!auth.authenticated) { + res.statusCode = 401; + res.setHeader('WWW-Authenticate', buildDashboardAuthChallengeHeader({ + realm: config.operatorDashboardAuthRealm, + })); + res.end('authentication required\n'); + return null; + } + + if (auth.setSessionCookie) { + res.setHeader('Set-Cookie', buildDashboardSessionCookie({ + sessionCookieName: auth.sessionCookieName, + sessionToken: auth.sessionToken, + })); + } + + return auth; +} + +async function loadStaticAssets() { + const distDirectory = new URL('../operator-dashboard/dist/', import.meta.url); + const assets = new Map(); + + await loadStaticAssetDirectory(distDirectory, '', assets); + + const indexAsset = assets.get('/index.html'); + if (!indexAsset) { + throw new Error('operator dashboard frontend is missing /index.html; run the dashboard build'); + } + + assets.set('/', indexAsset); + return assets; +} + +async function loadStaticAssetDirectory(directoryUrl, relativeDirectory, assets) { + const entries = await readdir(directoryUrl, { withFileTypes: true }); + + for (const entry of entries) { + if (entry.isDirectory()) { + await loadStaticAssetDirectory( + new URL(`${entry.name}/`, directoryUrl), + path.posix.join(relativeDirectory, entry.name), + assets, + ); + continue; + } + + const relativePath = path.posix.join(relativeDirectory, entry.name); + const requestPath = `/${relativePath}`; + const body = await readFile(new URL(entry.name, directoryUrl)); + assets.set(requestPath, { + contentType: resolveStaticContentType(entry.name), + body, + }); + } +} + +function resolveStaticContentType(filename) { + switch (path.extname(filename)) { + case '.html': + return 'text/html; charset=utf-8'; + case '.js': + return 'text/javascript; charset=utf-8'; + case '.css': + return 'text/css; charset=utf-8'; + case '.json': + return 'application/json; charset=utf-8'; + case '.svg': + return 'image/svg+xml'; + case '.png': + return 'image/png'; + case '.jpg': + case '.jpeg': + return 'image/jpeg'; + case '.webp': + return 'image/webp'; + case '.ico': + return 'image/x-icon'; + default: + return 'application/octet-stream'; + } +} + +async function safeSourceLoad(name, loader, fallback, sourceErrors = null) { + try { + const result = await loader(); + delete dashboardRuntimeState.source_errors[name]; + return result; + } catch (error) { + const serialized = serializeError(error); + dashboardRuntimeState.source_errors[name] = { + source: name, + error: serialized, + }; + dashboardRuntimeState.last_source_error_at = new Date().toISOString(); + logger.error('dashboard_source_load_failed', { + details: { + source: name, + error: serialized, + }, + }); + sourceErrors?.push({ + source: name, + error: serialized, + }); + dashboardRuntimeState.last_bootstrap_error = serialized; + return fallback; + } +} + +async function shutdown() { + server.close(() => {}); + for (const socket of webSockets) { + socket.close(); + } + await liveConsumer.stop().catch(() => {}); + await liveConsumer.disconnect().catch(() => {}); + await pool.end().catch(() => {}); + process.exit(0); +} + +process.on('SIGINT', shutdown); +process.on('SIGTERM', shutdown); diff --git a/src/apps/ops-sentinel.mjs b/src/apps/ops-sentinel.mjs index 029aaeb..783eb94 100644 --- a/src/apps/ops-sentinel.mjs +++ b/src/apps/ops-sentinel.mjs @@ -4,8 +4,16 @@ import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { createAlertEngine } from '../core/alert-engine.mjs'; +import { createAlertNotifier } from '../core/alert-notifier.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; +import { listDashboardServices } from '../core/operator-dashboard.mjs'; +import { + ageMs, + buildRuntimeAlert, + createRuntimeHealthThresholds, + evaluateRuntimeHealth, +} from '../core/runtime-health.mjs'; import { assertFundingObservationEvent, assertInventorySnapshotEvent, @@ -15,8 +23,10 @@ import { assertTradeResult, } from '../core/schemas.mjs'; import { loadConfig } from '../lib/config.mjs'; +import { fetchJson } from '../lib/http.mjs'; const config = loadConfig(); +const thresholds = createRuntimeHealthThresholds(config); const logger = createLogger({ service: 'ops-sentinel', component: 'alerts', @@ -48,6 +58,17 @@ const state = { last_error: null, last_event_at: null, publish_count: 0, + last_runtime_eval_at: null, + service_snapshots: [], + service_health: [], + latest_runtime_alerts: [], + containment: { + executor_auto_disarmed: false, + last_action_at: null, + last_action_reason: null, + last_action_result: null, + }, + anomaly_samples: [], }; const alertEngine = createAlertEngine({ @@ -59,6 +80,14 @@ const alertEngine = createAlertEngine({ evaluationIntervalMs: config.opsSentinelEvaluationMs, }); +const notifier = createAlertNotifier({ + webhookUrl: config.opsSentinelAlertWebhookUrl, + webhookTimeoutMs: config.opsSentinelAlertWebhookTimeoutMs, + logger: logger.child({ component: 'webhook-notifier' }), +}); + +const monitoredServices = listDashboardServices(config); + for (const topic of topics) { await consumer.subscribe({ topic, fromBeginning: true }); } @@ -89,10 +118,9 @@ await consumer.run({ const timer = setInterval(() => { if (state.paused) return; - const transitions = alertEngine.evaluate(); - publishTransitions(transitions).catch((error) => { + evaluateRuntimeHealthLoop().catch((error) => { state.last_error = serializeError(error); - logger.error('ops_sentinel_evaluate_failed', { + logger.error('ops_sentinel_runtime_evaluate_failed', { topic: config.kafkaTopicOpsAlert, details: { error: serializeError(error), @@ -115,16 +143,28 @@ const controlApi = startControlApi({ publish_count: state.publish_count, last_error: state.last_error, last_event_at: state.last_event_at, + last_runtime_eval_at: state.last_runtime_eval_at, + service_snapshots: state.service_snapshots, + service_health: state.service_health, + latest_runtime_alerts: state.latest_runtime_alerts, + containment: state.containment, + notifier: notifier.getState(), + anomaly_samples: state.anomaly_samples.slice(-thresholds.anomalyWindowSize), ...alertEngine.getState(), }; }, }, healthProvider: { getHealth() { + const staleMs = ageMs(state.last_runtime_eval_at); return { + ok: !state.paused && (staleMs == null || staleMs <= thresholds.sentinelStaleMs), paused: state.paused, last_event_at: state.last_event_at, + last_runtime_eval_at: state.last_runtime_eval_at, last_error: state.last_error, + stale: staleMs != null && staleMs > thresholds.sentinelStaleMs, + stale_after_ms: thresholds.sentinelStaleMs, }; }, }, @@ -147,18 +187,438 @@ const controlApi = startControlApi({ return { ok: true, paused: false }; }, }, + { + method: 'POST', + path: '/evaluate', + handler: async () => { + await evaluateRuntimeHealthLoop(); + return { ok: true, evaluated_at: state.last_runtime_eval_at }; + }, + }, ], }); +async function evaluateRuntimeHealthLoop() { + const now = new Date().toISOString(); + const previousRuntimeEvalAt = state.last_runtime_eval_at; + const serviceSnapshots = await Promise.all(monitoredServices.map(loadServiceSnapshot)); + state.service_snapshots = serviceSnapshots; + state.last_runtime_eval_at = now; + + const servicesByName = Object.fromEntries(serviceSnapshots.map((snapshot) => [snapshot.service, snapshot])); + const anomalyAlerts = buildAnomalyAlerts({ servicesByName, now }); + const runtimeAlerts = buildDeterministicRuntimeAlerts({ servicesByName, now, previousRuntimeEvalAt }); + const desiredRuntimeAlerts = [...runtimeAlerts, ...anomalyAlerts]; + const transitions = alertEngine.applyRuntimeAlerts(desiredRuntimeAlerts, now); + const activeAlerts = alertEngine.getState(now).active_alerts; + state.service_health = [...evaluateRuntimeHealth({ + servicesByName, + activePair: config.activePair, + activeAlerts, + now, + }).values()]; + state.latest_runtime_alerts = desiredRuntimeAlerts; + + await publishTransitions(transitions); + await maybeContainRisk({ servicesByName, desiredRuntimeAlerts, now }); +} + +async function loadServiceSnapshot(service) { + const [stateResult, healthResult] = await Promise.allSettled([ + fetchUpstreamJson(`${service.base_url}/state`), + fetchUpstreamJson(`${service.base_url}/healthz`), + ]); + + const statePayload = stateResult.status === 'fulfilled' ? stateResult.value : null; + const healthPayload = healthResult.status === 'fulfilled' ? healthResult.value : null; + const error = stateResult.status === 'rejected' + ? serializeError(stateResult.reason) + : healthResult.status === 'rejected' + ? serializeError(healthResult.reason) + : null; + + return { + ...service, + reachable: Boolean(statePayload || healthPayload), + state: statePayload, + health: healthPayload, + error, + }; +} + +async function fetchUpstreamJson(url) { + return fetchJson(url, { + signal: AbortSignal.timeout(config.operatorDashboardUpstreamTimeoutMs), + }); +} + +function buildDeterministicRuntimeAlerts({ servicesByName, now, previousRuntimeEvalAt = null }) { + const alerts = []; + const ingest = servicesByName['near-intents-ingest']; + const ingestState = ingest?.state?.ingest || {}; + const ingestHealth = ingest?.health || {}; + const matchingQuoteAgeMs = ageMs(ingestState.last_matching_quote_at, now); + const publishedAgeMs = ageMs(ingestState.last_published_at, now); + const messageAgeMs = ageMs(ingestState.last_message_at, now); + + if (!ingest?.reachable || ingestState.connected === false || ingestHealth.connected === false) { + alerts.push(buildRuntimeAlert({ + alert_code: 'near_intents_ingest_disconnected', + severity: 'critical', + reason: 'near-intents-ingest websocket is disconnected or unreachable', + service_scope: 'near-intents-ingest', + pair: config.activePair, + details: { + reachable: ingest?.reachable ?? false, + connected: ingestState.connected ?? ingestHealth.connected ?? null, + last_message_at: ingestState.last_message_at || null, + last_connected_at: ingestState.last_connected_at || null, + last_disconnected_at: ingestState.last_disconnected_at || null, + }, + })); + } + + if (matchingQuoteAgeMs == null || matchingQuoteAgeMs > thresholds.ingestQuoteStaleMs) { + alerts.push(buildRuntimeAlert({ + alert_code: 'near_intents_quotes_stale', + severity: 'critical', + reason: matchingQuoteAgeMs == null + ? 'near-intents-ingest has not observed a matching quote' + : `matching quote freshness ${matchingQuoteAgeMs}ms exceeds ${thresholds.ingestQuoteStaleMs}ms`, + service_scope: 'near-intents-ingest', + pair: config.activePair, + details: { + last_matching_quote_at: ingestState.last_matching_quote_at || null, + age_ms: matchingQuoteAgeMs, + stale_after_ms: thresholds.ingestQuoteStaleMs, + last_message_at: ingestState.last_message_at || null, + message_age_ms: messageAgeMs, + }, + })); + } + + if ( + (publishedAgeMs == null || publishedAgeMs > thresholds.ingestPublishStaleMs) + || ( + matchingQuoteAgeMs != null + && matchingQuoteAgeMs <= thresholds.ingestQuoteStaleMs + && (publishedAgeMs == null || publishedAgeMs > thresholds.ingestPublishStaleMs) + ) + ) { + alerts.push(buildRuntimeAlert({ + alert_code: 'near_intents_publish_stale', + severity: 'critical', + reason: publishedAgeMs == null + ? 'near-intents-ingest has not published a matching quote' + : `published quote freshness ${publishedAgeMs}ms exceeds ${thresholds.ingestPublishStaleMs}ms`, + service_scope: 'near-intents-ingest', + pair: config.activePair, + details: { + last_matching_quote_at: ingestState.last_matching_quote_at || null, + last_published_at: ingestState.last_published_at || null, + quote_age_ms: matchingQuoteAgeMs, + publish_age_ms: publishedAgeMs, + stale_after_ms: thresholds.ingestPublishStaleMs, + }, + })); + } + + const executor = servicesByName['trade-executor']; + const relay = executor?.state?.relay || {}; + const relayAgeMs = ageMs(relay.last_message_at, now); + if (!executor?.reachable || relay.connected === false || (relayAgeMs != null && relayAgeMs > thresholds.executorRelayStaleMs)) { + alerts.push(buildRuntimeAlert({ + alert_code: 'trade_executor_relay_disconnected', + severity: 'critical', + reason: !executor?.reachable || relay.connected === false + ? 'trade-executor solver relay is disconnected or unreachable' + : `trade-executor relay freshness ${relayAgeMs}ms exceeds ${thresholds.executorRelayStaleMs}ms`, + service_scope: 'trade-executor', + pair: config.activePair, + details: { + reachable: executor?.reachable ?? false, + connected: relay.connected ?? null, + last_message_at: relay.last_message_at || null, + age_ms: relayAgeMs, + stale_after_ms: thresholds.executorRelayStaleMs, + }, + })); + } + + const writer = servicesByName['history-writer']; + const writerState = writer?.state || {}; + const writerAgeMs = ageMs(writerState.last_write_at, now); + const rawOffset = parseOffset(writerState.offsets?.[config.kafkaTopicRawNearIntentsQuote]?.offset); + const normOffset = parseOffset(writerState.offsets?.[config.kafkaTopicNormSwapDemand]?.offset); + const ingestPublishedCount = Number(ingestState.published_count || 0); + const lastSample = state.anomaly_samples.at(-1) || null; + const writerProgressed = lastSample + ? rawOffset > lastSample.raw_offset || normOffset > lastSample.norm_offset + : true; + + if ( + !writer?.reachable + || writerState.database_connectivity === false + || writerAgeMs == null + || writerAgeMs > thresholds.historyWriterStaleMs + || (lastSample && ingestPublishedCount > lastSample.ingest_published_count && !writerProgressed) + ) { + alerts.push(buildRuntimeAlert({ + alert_code: 'history_writer_stalled', + severity: 'critical', + reason: !writer?.reachable + ? 'history-writer is unreachable' + : writerState.database_connectivity === false + ? 'history-writer lost database connectivity' + : lastSample && ingestPublishedCount > lastSample.ingest_published_count && !writerProgressed + ? 'ingest published quotes but durable history offsets stopped advancing' + : `history-writer freshness ${writerAgeMs}ms exceeds ${thresholds.historyWriterStaleMs}ms`, + service_scope: 'history-writer', + pair: config.activePair, + details: { + last_write_at: writerState.last_write_at || null, + age_ms: writerAgeMs, + stale_after_ms: thresholds.historyWriterStaleMs, + raw_offset: rawOffset, + normalized_offset: normOffset, + ingest_published_count: ingestPublishedCount, + }, + })); + } + + const dashboard = servicesByName['operator-dashboard']; + const dashboardState = dashboard?.state || {}; + const dashboardSourceErrorCount = Number( + dashboardState.source_error_count + || dashboard?.health?.source_error_count + || 0, + ); + const dashboardBootstrapAgeMs = ageMs(dashboardState.last_bootstrap_at, now); + if ( + !dashboard?.reachable + || dashboardSourceErrorCount > 0 + || (dashboardBootstrapAgeMs != null && dashboardBootstrapAgeMs > thresholds.dashboardSourceDegradedMs) + ) { + alerts.push(buildRuntimeAlert({ + alert_code: 'operator_dashboard_source_degraded', + severity: 'warning', + reason: !dashboard?.reachable + ? 'operator-dashboard is unreachable' + : dashboardSourceErrorCount > 0 + ? 'operator-dashboard has upstream source errors' + : `operator-dashboard bootstrap freshness ${dashboardBootstrapAgeMs}ms exceeds ${thresholds.dashboardSourceDegradedMs}ms`, + service_scope: 'operator-dashboard', + pair: config.activePair, + details: { + source_error_count: dashboardSourceErrorCount, + last_source_error_at: dashboardState.last_source_error_at || null, + last_bootstrap_at: dashboardState.last_bootstrap_at || null, + bootstrap_age_ms: dashboardBootstrapAgeMs, + }, + })); + } + + const selfAgeMs = ageMs(previousRuntimeEvalAt, now); + if (selfAgeMs != null && selfAgeMs > thresholds.sentinelStaleMs) { + alerts.push(buildRuntimeAlert({ + alert_code: 'sentinel_stale', + severity: 'critical', + reason: `ops-sentinel evaluation freshness ${selfAgeMs}ms exceeds ${thresholds.sentinelStaleMs}ms`, + service_scope: 'ops-sentinel', + pair: config.activePair, + details: { + last_runtime_eval_at: state.last_runtime_eval_at, + previous_runtime_eval_at: previousRuntimeEvalAt, + age_ms: selfAgeMs, + stale_after_ms: thresholds.sentinelStaleMs, + }, + })); + } + + if (notifier.getState().last_delivery_status === 'failed') { + alerts.push(buildRuntimeAlert({ + alert_code: 'sentinel_alert_delivery_failed', + severity: 'warning', + reason: 'external alert delivery failed', + service_scope: 'ops-sentinel', + pair: config.activePair, + details: notifier.getState(), + })); + } + + const executorArmed = executor?.state?.armed === true; + const criticalTruthFailure = alerts.some((alert) => ( + alert.severity === 'critical' + && ['near_intents_ingest_disconnected', 'near_intents_quotes_stale', 'near_intents_publish_stale', 'history_writer_stalled'].includes(alert.alert_code) + )); + if (executorArmed && criticalTruthFailure) { + alerts.push(buildRuntimeAlert({ + alert_code: 'executor_armed_with_stale_truth', + severity: 'critical', + reason: 'trade-executor remains armed while upstream quote truth is critically stale', + service_scope: 'trade-executor', + pair: config.activePair, + details: { + armed: true, + containment_available: true, + recommended_action: 'disarm', + }, + })); + } + + return alerts; +} + +function buildAnomalyAlerts({ servicesByName, now }) { + const ingestState = servicesByName['near-intents-ingest']?.state?.ingest || {}; + const writerState = servicesByName['history-writer']?.state || {}; + const nextSample = { + at: now, + ingest_published_count: Number(ingestState.published_count || 0), + ingest_reconnect_count: Number(ingestState.reconnect_count || 0), + raw_offset: parseOffset(writerState.offsets?.[config.kafkaTopicRawNearIntentsQuote]?.offset), + norm_offset: parseOffset(writerState.offsets?.[config.kafkaTopicNormSwapDemand]?.offset), + }; + + state.anomaly_samples.push(nextSample); + state.anomaly_samples = state.anomaly_samples.slice(-(thresholds.anomalyWindowSize + 1)); + + if (state.anomaly_samples.length < thresholds.anomalyWindowSize) { + return []; + } + + const windows = []; + for (let index = 1; index < state.anomaly_samples.length; index += 1) { + const previous = state.anomaly_samples[index - 1]; + const current = state.anomaly_samples[index]; + windows.push({ + quote_delta: Math.max(0, current.ingest_published_count - previous.ingest_published_count), + reconnect_delta: Math.max(0, current.ingest_reconnect_count - previous.ingest_reconnect_count), + durable_delta: Math.max(0, current.norm_offset - previous.norm_offset), + }); + } + + const currentWindow = windows.at(-1); + const baseline = windows.slice(0, -1); + const averageQuoteDelta = average(baseline.map((entry) => entry.quote_delta)); + const averageReconnectDelta = average(baseline.map((entry) => entry.reconnect_delta)); + const averageDurableDelta = average(baseline.map((entry) => entry.durable_delta)); + const alerts = []; + + if ( + averageQuoteDelta > 0 + && currentWindow.quote_delta <= averageQuoteDelta * thresholds.anomalyQuoteRateCollapseRatio + ) { + alerts.push(buildRuntimeAlert({ + alert_code: 'near_intents_quote_rate_collapse', + severity: 'warning', + reason: 'quote publish rate collapsed versus recent baseline', + service_scope: 'near-intents-ingest', + pair: config.activePair, + details: { + current_window_quote_delta: currentWindow.quote_delta, + baseline_average_quote_delta: averageQuoteDelta, + collapse_ratio: thresholds.anomalyQuoteRateCollapseRatio, + }, + })); + } + + if ( + averageReconnectDelta >= 0 + && currentWindow.reconnect_delta > 0 + && currentWindow.reconnect_delta >= Math.max(2, averageReconnectDelta * thresholds.anomalyReconnectSpikeMultiplier) + ) { + alerts.push(buildRuntimeAlert({ + alert_code: 'near_intents_reconnect_spike', + severity: 'warning', + reason: 'near-intents reconnect frequency spiked versus recent baseline', + service_scope: 'near-intents-ingest', + pair: config.activePair, + details: { + current_window_reconnect_delta: currentWindow.reconnect_delta, + baseline_average_reconnect_delta: averageReconnectDelta, + spike_multiplier: thresholds.anomalyReconnectSpikeMultiplier, + }, + })); + } + + if ( + currentWindow.quote_delta > 0 + && currentWindow.durable_delta === 0 + && averageDurableDelta >= 0 + ) { + alerts.push(buildRuntimeAlert({ + alert_code: 'near_intents_pipeline_flow_mismatch', + severity: 'warning', + reason: 'ingest quote flow advanced while durable writer progress stalled', + service_scope: 'history-writer', + pair: config.activePair, + details: { + current_window_quote_delta: currentWindow.quote_delta, + current_window_durable_delta: currentWindow.durable_delta, + baseline_average_durable_delta: averageDurableDelta, + }, + })); + } + + return alerts; +} + +async function maybeContainRisk({ servicesByName, desiredRuntimeAlerts, now }) { + const executor = servicesByName['trade-executor']; + const criticalTruthFailure = desiredRuntimeAlerts.some((alert) => ( + alert.severity === 'critical' + && ['near_intents_ingest_disconnected', 'near_intents_quotes_stale', 'near_intents_publish_stale', 'history_writer_stalled'].includes(alert.alert_code) + )); + const executorArmed = executor?.state?.armed === true; + + if (!criticalTruthFailure) { + state.containment.executor_auto_disarmed = false; + return; + } + + const sinceLastActionMs = ageMs(state.containment.last_action_at, now); + if ( + !executorArmed + || state.containment.executor_auto_disarmed + || (sinceLastActionMs != null && sinceLastActionMs < thresholds.containmentCooldownMs) + ) { + return; + } + + try { + const result = await fetchJson(`${config.tradeExecutorControlBaseUrl}/disarm`, { + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: JSON.stringify({ reason: 'critical_quote_truth_stale' }), + signal: AbortSignal.timeout(config.operatorDashboardUpstreamTimeoutMs), + }); + state.containment.executor_auto_disarmed = true; + state.containment.last_action_at = now; + state.containment.last_action_reason = 'critical_quote_truth_stale'; + state.containment.last_action_result = result; + } catch (error) { + state.containment.last_action_at = now; + state.containment.last_action_reason = 'critical_quote_truth_stale'; + state.containment.last_action_result = { + ok: false, + error: serializeError(error), + }; + } +} + async function publishTransitions(transitions) { for (const transition of transitions) { + const alertEventId = `${transition.alert_code}-${transition.status}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`; const event = buildEventEnvelope({ source: 'ops-sentinel', venue: 'unrip', eventType: 'ops_alert', observedAt: transition.last_evaluated_at, payload: { - alert_event_id: `${transition.alert_code}-${transition.status}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`, + alert_event_id: alertEventId, ...transition, }, }); @@ -167,6 +627,10 @@ async function publishTransitions(transitions) { key: `${transition.alert_code}:${transition.service_scope}:${transition.tx_hash || transition.pair || 'global'}`, }); state.publish_count += 1; + await notifier.notify({ + ...transition, + alert_event_id: alertEventId, + }); } } @@ -208,6 +672,17 @@ function normalizePayloadForAlert(topic, event) { } } +function parseOffset(value) { + if (value == null) return 0; + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : 0; +} + +function average(values) { + if (!values.length) return 0; + return values.reduce((sum, value) => sum + value, 0) / values.length; +} + async function shutdown() { clearInterval(timer); await controlApi.close().catch(() => {}); diff --git a/src/apps/trade-executor.mjs b/src/apps/trade-executor.mjs index 2fff22c..d9458b5 100644 --- a/src/apps/trade-executor.mjs +++ b/src/apps/trade-executor.mjs @@ -12,6 +12,7 @@ import { loadConfig } from '../lib/config.mjs'; import { buildQuoteResponseSubmission } from '../venues/near-intents/signing.mjs'; import { startSolverRelayWs } from '../venues/near-intents/solver-relay-ws.mjs'; import { createVerifierClient } from '../venues/near-intents/verifier-client.mjs'; +import { ageMs } from '../core/runtime-health.mjs'; const config = loadConfig(); const logger = createLogger({ @@ -215,13 +216,42 @@ const controlApi = startControlApi({ account_id: config.nearIntentsAccountId, signer_public_key: signer.getPublicKey().toString(), signer_registered: signerRegistered, + relay: relayClient.getState(), ...state, durable_control_state: armedStateStore.getState(), durable_state: stateStore.getState(), }; }, }, + healthProvider: { + getHealth() { + const relay = relayClient.getState(); + const freshnessAgeMs = ageMs(relay.last_message_at); + return { + ok: relay.connected && (freshnessAgeMs == null || freshnessAgeMs <= config.opsSentinelExecutorRelayStaleMs), + connected: relay.connected, + relay_last_message_at: relay.last_message_at, + relay_freshness_age_ms: freshnessAgeMs, + paused: state.paused, + armed: state.armed, + reason: + relay.connected + ? freshnessAgeMs != null && freshnessAgeMs > config.opsSentinelExecutorRelayStaleMs + ? 'solver relay stale' + : null + : 'solver relay disconnected', + }; + }, + }, routes: [ + { + method: 'POST', + path: '/reconnect', + handler: () => { + relayClient.reconnect(); + return { ok: true, reconnecting: true }; + }, + }, { method: 'POST', path: '/arm', diff --git a/src/core/alert-engine.mjs b/src/core/alert-engine.mjs index 2712b52..089ead5 100644 --- a/src/core/alert-engine.mjs +++ b/src/core/alert-engine.mjs @@ -16,6 +16,7 @@ export function createAlertEngine({ latest_trade_result: null, funding_observations: {}, active_alerts: {}, + runtime_alert_keys: new Set(), recent_transitions: [], last_evaluated_at: null, }; @@ -67,6 +68,14 @@ export function createAlertEngine({ now, }); }, + applyRuntimeAlerts(desiredAlerts = [], now = new Date().toISOString()) { + return reconcileRuntimeAlertState({ + state, + desiredAlerts, + now, + recentTransitionLimit, + }); + }, getState(now = new Date().toISOString()) { return summarizeState({ state, @@ -329,6 +338,79 @@ function reconcileAlertState({ state, desired, now, recentTransitionLimit }) { return transitions; } +function reconcileRuntimeAlertState({ + state, + desiredAlerts, + now, + recentTransitionLimit, +}) { + const transitions = []; + const desired = new Map(); + const nextRuntimeKeys = new Set(); + + for (const alert of desiredAlerts || []) { + const key = buildAlertKey({ + alertCode: alert.alert_code, + serviceScope: alert.service_scope, + pair: alert.pair, + assetId: alert.asset_id, + txHash: alert.tx_hash, + }); + desired.set(key, alert); + nextRuntimeKeys.add(key); + } + + for (const key of state.runtime_alert_keys) { + if (desired.has(key)) continue; + const existing = state.active_alerts[key]; + if (!existing) continue; + const cleared = { + ...existing, + status: 'cleared', + raised_at: existing.raised_at || existing.first_raised_at || now, + cleared_at: now, + last_evaluated_at: now, + }; + delete state.active_alerts[key]; + transitions.push(cleared); + } + + for (const [key, alert] of desired.entries()) { + const existing = state.active_alerts[key]; + if (!existing) { + const raised = { + ...alert, + status: 'raised', + first_raised_at: now, + raised_at: now, + cleared_at: null, + last_evaluated_at: now, + }; + state.active_alerts[key] = raised; + transitions.push(raised); + continue; + } + + state.active_alerts[key] = { + ...existing, + ...alert, + status: 'raised', + raised_at: existing.raised_at || existing.first_raised_at || now, + first_raised_at: existing.first_raised_at || existing.raised_at || now, + cleared_at: null, + last_evaluated_at: now, + }; + } + + state.runtime_alert_keys = nextRuntimeKeys; + if (transitions.length > 0) { + state.recent_transitions.unshift(...transitions); + state.recent_transitions = state.recent_transitions.slice(0, recentTransitionLimit); + } + state.last_evaluated_at = now; + return transitions; +} + function summarizeState({ state, evaluationIntervalMs, now }) { const activeAlerts = Object.values(state.active_alerts) .sort((left, right) => timestampValue(right.first_raised_at) - timestampValue(left.first_raised_at)); diff --git a/src/core/alert-notifier.mjs b/src/core/alert-notifier.mjs new file mode 100644 index 0000000..8cdf2d3 --- /dev/null +++ b/src/core/alert-notifier.mjs @@ -0,0 +1,96 @@ +import { postJson } from '../lib/http.mjs'; +import { serializeError } from './log.mjs'; + +export function createAlertNotifier({ + webhookUrl = '', + webhookTimeoutMs = 5_000, + logger = null, +} = {}) { + const delivered = new Set(); + const state = { + enabled: Boolean(webhookUrl), + webhook_url_configured: Boolean(webhookUrl), + last_delivery_at: null, + last_delivery_error: null, + last_delivery_status: null, + last_delivery_key: null, + sent_count: 0, + deduped_count: 0, + }; + + return { + async notify(transition) { + if (!state.enabled) return { ok: false, skipped: true, reason: 'webhook_disabled' }; + + const deliveryKey = buildDeliveryKey(transition); + if (delivered.has(deliveryKey)) { + state.deduped_count += 1; + state.last_delivery_status = 'deduped'; + state.last_delivery_key = deliveryKey; + return { ok: true, deduped: true }; + } + + const payload = { + source: 'unrip', + alert: { + alert_code: transition.alert_code, + status: transition.status, + severity: transition.severity, + service_scope: transition.service_scope, + reason: transition.reason, + pair: transition.pair || null, + raised_at: transition.raised_at || transition.first_raised_at || null, + cleared_at: transition.cleared_at || null, + last_evaluated_at: transition.last_evaluated_at || null, + details: transition.details || {}, + }, + }; + + try { + await postJson(webhookUrl, payload, { + signal: AbortSignal.timeout(webhookTimeoutMs), + }); + delivered.add(deliveryKey); + state.sent_count += 1; + state.last_delivery_at = new Date().toISOString(); + state.last_delivery_error = null; + state.last_delivery_status = 'sent'; + state.last_delivery_key = deliveryKey; + return { ok: true, deduped: false }; + } catch (error) { + const serialized = serializeError(error); + state.last_delivery_at = new Date().toISOString(); + state.last_delivery_error = serialized; + state.last_delivery_status = 'failed'; + state.last_delivery_key = deliveryKey; + logger?.error('alert_webhook_delivery_failed', { + details: { + error: serialized, + delivery_key: deliveryKey, + alert_code: transition.alert_code, + service_scope: transition.service_scope, + }, + }); + return { ok: false, error: serialized }; + } + }, + getState() { + return { + ...state, + }; + }, + }; +} + +function buildDeliveryKey(transition) { + return [ + transition.alert_code, + transition.service_scope, + transition.pair || '', + transition.asset_id || '', + transition.tx_hash || '', + transition.status, + transition.raised_at || transition.first_raised_at || '', + transition.cleared_at || '', + ].join('|'); +} diff --git a/src/core/operator-dashboard.mjs b/src/core/operator-dashboard.mjs new file mode 100644 index 0000000..04ceb7e --- /dev/null +++ b/src/core/operator-dashboard.mjs @@ -0,0 +1,1200 @@ +import { unitsToNumber } from './assets.mjs'; +import { summarizeFundingObservations } from './funding-observations.mjs'; +import { resolveDashboardRequestAuth } from './operator-dashboard-auth.mjs'; +import { deriveServiceHealth, inferServiceFreshnessTimestamp as inferRuntimeFreshnessTimestamp } from './runtime-health.mjs'; + +export const DASHBOARD_LIVE_QUOTE_LIMIT = 10; + +const DECIMAL_SCALE = 18; +const DECIMAL_FACTOR = 10n ** BigInt(DECIMAL_SCALE); +const ALERT_SEVERITY_ORDER = { + critical: 3, + warning: 2, + info: 1, +}; +const CREDITED_FUNDING_STATUSES = new Set(['CREDITED', 'COMPLETED', 'FINALIZED', 'SETTLED']); + +const CONTROL_DEFINITIONS = [ + { + service: 'near-intents-ingest', + action: 'reconnect', + method: 'POST', + path: '/reconnect', + label: 'Reconnect Ingest', + description: 'Reconnect the ingest websocket without restarting the deployment.', + page: 'system', + risk_class: 'safe', + }, + { + service: 'market-reference-ingest', + action: 'refresh', + method: 'POST', + path: '/refresh', + label: 'Refresh Price', + description: 'Fetch the latest BTC/EUR reference price.', + page: 'funds', + risk_class: 'safe', + }, + { + service: 'inventory-sync', + action: 'refresh', + method: 'POST', + path: '/refresh', + label: 'Refresh Inventory', + description: 'Sync verifier balances into the latest inventory snapshot.', + page: 'funds', + risk_class: 'safe', + }, + { + service: 'liquidity-manager', + action: 'refresh', + method: 'POST', + path: '/refresh', + label: 'Refresh Liquidity', + description: 'Refresh bridge deposit handles, deposits, and tracked withdrawals.', + page: 'funds', + risk_class: 'safe', + }, + { + service: 'liquidity-manager', + action: 'pause', + method: 'POST', + path: '/pause', + label: 'Pause Liquidity', + description: 'Pause non-fund-moving liquidity state refreshes.', + page: 'system', + risk_class: 'safe', + }, + { + service: 'liquidity-manager', + action: 'resume', + method: 'POST', + path: '/resume', + label: 'Resume Liquidity', + description: 'Resume liquidity state refreshes.', + page: 'system', + risk_class: 'safe', + }, + { + service: 'liquidity-manager', + action: 'pause-funding-observer', + method: 'POST', + path: '/pause-funding-observer', + label: 'Pause Funding Observer', + description: 'Pause pre-credit funding observations without touching spendable truth.', + page: 'funds', + risk_class: 'safe', + }, + { + service: 'liquidity-manager', + action: 'resume-funding-observer', + method: 'POST', + path: '/resume-funding-observer', + label: 'Resume Funding Observer', + description: 'Resume pre-credit funding observations.', + page: 'funds', + risk_class: 'safe', + }, + { + service: 'liquidity-manager', + action: 'freeze-withdrawals', + method: 'POST', + path: '/freeze-withdrawals', + label: 'Update Withdrawal Freeze', + description: 'Toggle withdrawal freeze without submitting live withdrawals.', + page: 'funds', + risk_class: 'safe', + }, + { + service: 'liquidity-manager', + action: 'withdrawal-estimate', + method: 'POST', + path: '/withdrawal-estimate', + label: 'Estimate Withdrawal', + description: 'Estimate a bridge withdrawal without moving funds.', + page: 'funds', + risk_class: 'safe', + }, + { + service: 'trade-executor', + action: 'reconnect', + method: 'POST', + path: '/reconnect', + label: 'Reconnect Relay', + description: 'Reconnect the trade-executor solver relay websocket.', + page: 'system', + risk_class: 'safe', + }, + { + service: 'trade-executor', + action: 'pause', + method: 'POST', + path: '/pause', + label: 'Pause Executor', + description: 'Pause trade-executor command consumption without moving funds.', + page: 'system', + risk_class: 'safe', + }, + { + service: 'trade-executor', + action: 'resume', + method: 'POST', + path: '/resume', + label: 'Resume Executor', + description: 'Resume trade-executor command consumption.', + page: 'system', + risk_class: 'safe', + }, + { + service: 'trade-executor', + action: 'disarm', + method: 'POST', + path: '/disarm', + label: 'Disarm Executor', + description: 'Force the executor into a non-fund-moving safe state.', + page: 'system', + risk_class: 'safe', + }, + { + service: 'ops-sentinel', + action: 'pause', + method: 'POST', + path: '/pause', + label: 'Pause Alerts', + description: 'Pause alert evaluation without changing trade arming state.', + page: 'system', + risk_class: 'safe', + }, + { + service: 'ops-sentinel', + action: 'resume', + method: 'POST', + path: '/resume', + label: 'Resume Alerts', + description: 'Resume alert evaluation.', + page: 'system', + risk_class: 'safe', + }, +]; + +const SERVICE_DEFINITIONS = [ + ['near-intents-ingest', 'Intents Ingest', 'nearIntentsControlBaseUrl'], + ['market-reference-ingest', 'Reference Price', 'marketReferenceControlBaseUrl'], + ['inventory-sync', 'Inventory Sync', 'inventorySyncControlBaseUrl'], + ['liquidity-manager', 'Liquidity Manager', 'liquidityManagerControlBaseUrl'], + ['history-writer', 'History Writer', 'historyWriterControlBaseUrl'], + ['ops-sentinel', 'Ops Sentinel', 'opsSentinelControlBaseUrl'], + ['strategy-engine', 'Strategy Engine', 'strategyEngineControlBaseUrl'], + ['trade-executor', 'Trade Executor', 'tradeExecutorControlBaseUrl'], +]; + +export function resolveDashboardAuth({ mode = 'stub' } = {}) { + return resolveDashboardRequestAuth({ mode }); +} + +export function listDashboardControls({ page = null } = {}) { + const controls = CONTROL_DEFINITIONS.map((definition) => ({ ...definition })); + if (!page) return controls; + return controls.filter((definition) => definition.page === page); +} + +export function resolveDashboardControl({ service, action }) { + return CONTROL_DEFINITIONS.find((definition) => ( + definition.service === service && definition.action === action + )) || null; +} + +export function listDashboardServices(config) { + return SERVICE_DEFINITIONS.map(([service, label, configKey]) => ({ + service, + label, + base_url: config[configKey], + })); +} + +export function createDashboardLiveState({ + config, + recentQuotes = [], + latestMarketPrice = null, + latestInventory = null, + successfulTradeCount = 0, + lastSuccessfulTradeAt = null, + activeAlerts = [], +} = {}) { + const state = { + active_pair: config.activePair, + btc_asset: config.tradingBtc, + eure_asset: config.tradingEure, + quote_limit: config.operatorDashboardQuoteLimit || DASHBOARD_LIVE_QUOTE_LIMIT, + recent_quotes: recentQuotes.slice(0, config.operatorDashboardQuoteLimit || DASHBOARD_LIVE_QUOTE_LIMIT), + latest_market_price: latestMarketPrice?.payload || latestMarketPrice || null, + latest_inventory: latestInventory?.payload || latestInventory || null, + successful_trade_count: Number(successfulTradeCount || 0), + last_successful_trade_at: lastSuccessfulTradeAt || null, + active_alerts: new Map(), + }; + + for (const alert of activeAlerts) { + if (!alert) continue; + state.active_alerts.set(buildAlertKey(alert), alert); + } + + return state; +} + +export function applyDashboardLiveEvent(state, { topic, event }) { + if (!event?.payload) return []; + + switch (topic) { + case 'norm.swap_demand': { + const quote = normalizeLiveQuote(event.payload, event); + if (!quote) return []; + state.recent_quotes = appendUniqueRecentQuote(state.recent_quotes, quote, state.quote_limit); + return [{ + type: 'quotes.recent', + recent_quotes: state.recent_quotes, + }]; + } + case 'ref.market_price': + state.latest_market_price = { + ...event.payload, + observed_at: event.observed_at || event.payload.observed_at || null, + ingested_at: event.ingested_at || null, + }; + return [{ + type: 'status_bar.updated', + status_bar: buildLiveStatusBar(state), + }]; + case 'state.intent_inventory': + state.latest_inventory = { + ...event.payload, + observed_at: event.observed_at || event.payload.synced_at || null, + ingested_at: event.ingested_at || null, + }; + return [{ + type: 'status_bar.updated', + status_bar: buildLiveStatusBar(state), + }]; + case 'ops.alert': { + const alert = normalizeAlert(event.payload); + const key = buildAlertKey(alert); + if (alert.status === 'raised') { + state.active_alerts.set(key, alert); + } else if (alert.status === 'cleared') { + state.active_alerts.delete(key); + } + return [{ + type: 'alerts.updated', + alerts: { + active_alert_count: state.active_alerts.size, + highest_alert_severity: highestAlertSeverity([...state.active_alerts.values()]), + }, + }, { + type: 'status_bar.updated', + status_bar: buildLiveStatusBar(state), + }]; + } + case 'exec.trade_result': + if (event.payload.status !== 'submitted') return []; + state.successful_trade_count += 1; + state.last_successful_trade_at = event.observed_at || event.ingested_at || new Date().toISOString(); + return [{ + type: 'status_bar.updated', + status_bar: buildLiveStatusBar(state), + }]; + default: + return []; + } +} + +export function buildDashboardBootstrap({ + config, + auth, + portfolioMetric, + inventorySnapshot, + marketPrice, + recentQuotes, + successfulTrades, + successfulTradeSummary, + fundingObservations, + recentDepositStatuses, + recentTradeDecisions, + recentAlertTransitions, + serviceSnapshots, + sourceErrors = [], +} = {}) { + const servicesByName = Object.fromEntries( + (serviceSnapshots || []).map((snapshot) => [snapshot.service, snapshot]), + ); + const activeAlerts = normalizeAlertList( + servicesByName['ops-sentinel']?.state?.active_alerts || [], + ); + const recentAlerts = normalizeAlertList( + servicesByName['ops-sentinel']?.state?.recent_transitions + || recentAlertTransitions?.map((entry) => entry.payload) + || [], + ); + const profitability = buildProfitabilitySummary({ + metric: portfolioMetric, + successfulTradeSummary, + }); + const balances = buildBalanceSummary({ + inventorySnapshot, + marketPrice, + config, + }); + const funding = buildFundingSummary({ + config, + fundingObservations, + recentDepositStatuses, + liquidityState: servicesByName['liquidity-manager']?.state || {}, + }); + const tradesPage = normalizeSuccessfulTradesPage({ + config, + successfulTrades, + }); + + return { + session: auth, + source_errors: sourceErrors, + default_page: 'funds', + status_bar: buildStatusBar({ + config, + profitability, + inventorySnapshot, + marketPrice, + activeAlerts, + servicesByName, + }), + funds: { + profitability, + balances, + funding, + recent_deposits: funding.credited_deposits, + recent_withdrawals: buildRecentWithdrawals({ + config, + liquidityState: servicesByName['liquidity-manager']?.state || {}, + }), + trade_asset_changes: buildTradeAssetChanges({ + config, + trades: tradesPage.items, + }), + recent_quotes: (recentQuotes || []).slice(0, config.operatorDashboardQuoteLimit || DASHBOARD_LIVE_QUOTE_LIMIT), + successful_trades: tradesPage, + controls: listDashboardControls({ page: 'funds' }), + caveats: profitability.caveats, + }, + strategy: buildStrategySummary({ + servicesByName, + activeAlerts, + recentTradeDecisions, + }), + system: buildSystemSummary({ + servicesByName, + activeAlerts, + recentAlerts, + }), + }; +} + +export function buildProfitabilitySummary({ metric, successfulTradeSummary } = {}) { + const externalCashFlows = metric?.payload?.external_cash_flows || {}; + const externalFlowCount = Number(externalCashFlows.flow_count || 0); + const externalFlowAdjusted = externalFlowCount > 0; + const summary = { + computed_at: metric?.computed_at || null, + current_total_portfolio_value_eure: metric?.payload?.current_portfolio_value_eure || null, + deposit_baseline_value_eure: metric?.payload?.baseline_portfolio_value_eure_at_baseline_price || null, + simple_hold_value_eure: metric?.payload?.baseline_portfolio_value_eure_at_current_price || null, + pnl_vs_deposit_baseline_eure: null, + pnl_vs_simple_hold_eure: null, + market_move_contribution_eure: null, + trading_contribution_eure: null, + baseline_anchor_at: metric?.baseline_anchor_at || null, + baseline_status: metric?.baseline_status || metric?.payload?.baseline_status || 'unavailable', + external_flow_adjusted: externalFlowAdjusted, + external_flow_count: externalFlowCount, + external_deposit_count: Number(externalCashFlows.deposit_count || 0), + external_withdrawal_count: Number(externalCashFlows.withdrawal_count || 0), + latest_external_flow_at: externalCashFlows.latest_effective_at || null, + net_external_flow_value_eure: externalCashFlows.net_value_eure_at_flow_time || '0', + recent_trade_count: successfulTradeSummary?.total ?? metric?.payload?.result_count ?? 0, + last_successful_trade_at: successfulTradeSummary?.last_successful_trade_at || null, + caveats: [ + 'Portfolio PnL is truthful to the current durable inventory and reference price path.', + 'Fees and per-trade realized net settlement deltas are not fully tracked yet.', + ], + }; + + if (summary.current_total_portfolio_value_eure == null) { + summary.caveats.unshift('Profitability is unavailable until durable portfolio metrics exist.'); + return summary; + } + + if (summary.deposit_baseline_value_eure) { + summary.pnl_vs_deposit_baseline_eure = formatDecimalDifference( + summary.current_total_portfolio_value_eure, + summary.deposit_baseline_value_eure, + ); + } + + if (summary.simple_hold_value_eure) { + summary.pnl_vs_simple_hold_eure = formatDecimalDifference( + summary.current_total_portfolio_value_eure, + summary.simple_hold_value_eure, + ); + } + + if (summary.deposit_baseline_value_eure && summary.simple_hold_value_eure) { + summary.market_move_contribution_eure = formatDecimalDifference( + summary.simple_hold_value_eure, + summary.deposit_baseline_value_eure, + ); + } + + if (summary.simple_hold_value_eure) { + summary.trading_contribution_eure = formatDecimalDifference( + summary.current_total_portfolio_value_eure, + summary.simple_hold_value_eure, + ); + } + + if (summary.external_flow_adjusted) { + summary.caveats.unshift( + `Later credited deposits and completed withdrawals (${summary.external_flow_count}) are treated as external cash flows, not trading PnL.`, + ); + } + + return summary; +} + +export function buildLiveStatusBar(state) { + return { + latest_reference_price_eure_per_btc: state.latest_market_price?.eure_per_btc || null, + market_observed_at: + state.latest_market_price?.observed_at + || state.latest_market_price?.ingested_at + || null, + market_freshness_ms: ageMs( + state.latest_market_price?.observed_at || state.latest_market_price?.ingested_at, + ), + inventory_observed_at: + state.latest_inventory?.synced_at + || state.latest_inventory?.observed_at + || state.latest_inventory?.ingested_at + || null, + inventory_freshness_ms: ageMs( + state.latest_inventory?.synced_at + || state.latest_inventory?.observed_at + || state.latest_inventory?.ingested_at, + ), + current_total_portfolio_value_eure: computeCurrentPortfolioValue({ + inventory: state.latest_inventory, + marketPrice: state.latest_market_price, + btcAsset: state.btc_asset, + eureAsset: state.eure_asset, + }), + active_alert_count: state.active_alerts.size, + highest_alert_severity: highestAlertSeverity([...state.active_alerts.values()]), + recent_trade_count: state.successful_trade_count, + last_successful_trade_at: state.last_successful_trade_at, + }; +} + +function buildStatusBar({ + config, + profitability, + inventorySnapshot, + marketPrice, + activeAlerts, + servicesByName, +}) { + return { + active_pair: config.activePair, + latest_reference_price_eure_per_btc: marketPrice?.payload?.eure_per_btc || null, + market_observed_at: marketPrice?.payload?.observed_at || marketPrice?.ingested_at || null, + market_freshness_ms: ageMs(marketPrice?.payload?.observed_at || marketPrice?.ingested_at), + inventory_observed_at: + inventorySnapshot?.payload?.synced_at || inventorySnapshot?.ingested_at || null, + inventory_freshness_ms: ageMs( + inventorySnapshot?.payload?.synced_at || inventorySnapshot?.ingested_at, + ), + active_alert_count: activeAlerts.length, + highest_alert_severity: highestAlertSeverity(activeAlerts), + strategy_armed: servicesByName['strategy-engine']?.state?.armed ?? null, + executor_armed: servicesByName['trade-executor']?.state?.armed ?? null, + current_total_portfolio_value_eure: profitability.current_total_portfolio_value_eure, + recent_trade_count: profitability.recent_trade_count, + last_successful_trade_at: profitability.last_successful_trade_at, + }; +} + +function buildBalanceSummary({ inventorySnapshot, marketPrice, config }) { + const inventory = inventorySnapshot?.payload || {}; + const spendable = inventory.spendable || {}; + const pendingInbound = inventory.pending_inbound || {}; + const pendingOutbound = inventory.pending_outbound || {}; + + return { + synced_at: inventory.synced_at || inventorySnapshot?.ingested_at || null, + reconciliation_status: inventory.reconciliation_status || null, + items: [...config.assetRegistry.values()].map((asset) => { + const spendableUnits = String(spendable[asset.assetId] || '0'); + const pendingInboundUnits = String(pendingInbound[asset.assetId] || '0'); + const pendingOutboundUnits = String(pendingOutbound[asset.assetId] || '0'); + return { + asset_id: asset.assetId, + symbol: asset.symbol, + chain: asset.chain, + spendable_units: spendableUnits, + spendable: formatUnits(spendableUnits, asset.decimals), + pending_inbound_units: pendingInboundUnits, + pending_inbound: formatUnits(pendingInboundUnits, asset.decimals), + pending_outbound_units: pendingOutboundUnits, + pending_outbound: formatUnits(pendingOutboundUnits, asset.decimals), + eur_value_eure: valueAssetInEur({ + asset, + units: spendableUnits, + marketPrice: marketPrice?.payload || marketPrice || null, + }), + }; + }), + }; +} + +function buildFundingSummary({ config, fundingObservations, recentDepositStatuses, liquidityState }) { + const observations = (fundingObservations || []).map((entry) => entry.payload || entry); + const summary = summarizeFundingObservations(observations, { + now: new Date().toISOString(), + }); + const observerItems = observations.map((observation) => normalizeFundingObservationForUi({ + config, + observation, + })); + const depositItems = buildRecentDepositItems({ + config, + recentDepositStatuses, + liquidityState, + }); + const recentFundingActivity = mergeFundingActivityItems({ + observerItems, + depositItems, + }); + + return { + latest_observed_at: latestFundingActivityAt(recentFundingActivity, summary.latest_funding_observation_at), + control_state: { + paused: liquidityState?.paused ?? null, + funding_observer_paused: liquidityState?.funding_observer_paused ?? null, + withdrawals_frozen: liquidityState?.withdrawals_frozen ?? null, + withdrawal_defaults: liquidityState?.withdrawal_defaults || {}, + }, + handles: Object.entries(liquidityState?.deposit_addresses || {}).map(([chain, details]) => ({ + chain, + asset_id: config.tradingBtc.chain === chain ? config.tradingBtc.assetId : config.tradingEure.assetId, + symbol: config.tradingBtc.chain === chain ? config.tradingBtc.symbol : config.tradingEure.symbol, + address: details?.address || null, + memo: details?.memo || null, + refreshed_at: details?.refreshed_at || null, + })), + credited_deposits: recentFundingActivity + .filter((observation) => CREDITED_FUNDING_STATUSES.has(String(observation?.status || '').toUpperCase())) + .sort((left, right) => sortTimestamps( + fundingActivityTimestamp(right), + fundingActivityTimestamp(left), + )) + .slice(0, 10) + .map((observation) => ({ ...observation })), + pre_credit_by_asset: Object.values(summary.funding_visibility_by_asset || {}).map((entry) => { + const asset = config.assetRegistry.get(entry.asset_id); + return { + asset_id: entry.asset_id, + symbol: asset?.symbol || entry.asset_id, + pre_credit_total_units: entry.pre_credit_total || '0', + pre_credit_total: formatUnits(entry.pre_credit_total || '0', asset?.decimals || 0), + latest_status: entry.latest_status, + latest_observation_at: entry.latest_observation_at, + }; + }), + pre_credit_by_handle: Object.values(summary.funding_observations_by_handle || {}).map((entry) => { + const asset = config.assetRegistry.get(entry.asset_id); + return { + funding_handle: entry.funding_handle, + chain: entry.chain, + asset_id: entry.asset_id, + symbol: asset?.symbol || entry.asset_id, + pre_credit_total_units: entry.pre_credit_total || '0', + pre_credit_total: formatUnits(entry.pre_credit_total || '0', asset?.decimals || 0), + latest_status: entry.latest_status, + latest_observation_at: entry.latest_observation_at, + observation_count: entry.observations?.length || 0, + }; + }), + recent_observations: recentFundingActivity + .sort((left, right) => sortTimestamps( + fundingActivityTimestamp(right), + fundingActivityTimestamp(left), + )) + .slice(0, 10) + .map((observation) => ({ ...observation })), + }; +} + +function buildRecentWithdrawals({ config, liquidityState }) { + return Object.values(liquidityState?.tracked_withdrawals || {}) + .sort((left, right) => sortTimestamps( + right.last_checked_at || right.submitted_at || right.noted_at, + left.last_checked_at || left.submitted_at || left.noted_at, + )) + .slice(0, 10) + .map((withdrawal) => { + const asset = config.assetRegistry.get(withdrawal.asset_id); + return { + withdrawal_hash: withdrawal.withdrawal_hash, + asset_id: withdrawal.asset_id, + symbol: asset?.symbol || withdrawal.asset_id, + chain: withdrawal.chain || null, + amount_units: String(withdrawal.amount || '0'), + amount: formatUnits(withdrawal.amount || '0', asset?.decimals || 0), + status: withdrawal.status || null, + address: withdrawal.address || null, + submitted_at: withdrawal.submitted_at || null, + last_checked_at: withdrawal.last_checked_at || null, + noted_at: withdrawal.noted_at || null, + }; + }); +} + +function buildTradeAssetChanges({ config, trades }) { + return (trades || []).slice(0, 10).map((trade) => { + const assetIn = config.assetRegistry.get(trade.asset_in); + const assetOut = config.assetRegistry.get(trade.asset_out); + return { + observed_at: trade.observed_at, + quote_id: trade.quote_id, + request_kind: trade.request_kind, + asset_in: trade.asset_in, + asset_in_symbol: assetIn?.symbol || trade.asset_in, + amount_in_units: trade.amount_in, + amount_in: formatUnits(trade.amount_in || '0', assetIn?.decimals || 0), + asset_out: trade.asset_out, + asset_out_symbol: assetOut?.symbol || trade.asset_out, + amount_out_units: trade.amount_out, + amount_out: formatUnits(trade.amount_out || '0', assetOut?.decimals || 0), + }; + }); +} + +function normalizeSuccessfulTradesPage({ config, successfulTrades }) { + return { + ...successfulTrades, + items: (successfulTrades?.items || []).map((trade) => normalizeTradeForUi({ + config, + trade, + })), + }; +} + +function buildStrategySummary({ servicesByName, activeAlerts, recentTradeDecisions = [] }) { + const strategyState = servicesByName['strategy-engine']?.state || {}; + const executorState = servicesByName['trade-executor']?.state || {}; + const durableDecisionsById = new Map( + (recentTradeDecisions || []) + .map((entry) => normalizeDecision({ + ...(entry.payload || {}), + decision_at: + entry?.payload?.decision_at + || entry?.observed_at + || entry?.ingested_at + || null, + })) + .filter((entry) => entry?.decision_id) + .map((entry) => [entry.decision_id, entry]), + ); + const recentDecisions = (strategyState.recent_decisions || []).slice(0, 20).map((decision) => { + const durableDecision = durableDecisionsById.get(decision?.decision_id) || null; + return normalizeDecision({ + ...(durableDecision || {}), + ...(decision || {}), + decision_at: + decision?.decision_at + || durableDecision?.decision_at + || null, + }); + }); + const latestDecision = normalizeDecision({ + ...(durableDecisionsById.get(strategyState.latest_decision?.decision_id) || {}), + ...(strategyState.latest_decision || {}), + decision_at: + strategyState.latest_decision?.decision_at + || durableDecisionsById.get(strategyState.latest_decision?.decision_id)?.decision_at + || null, + }); + + return { + strategy_state: { + armed: strategyState.armed ?? null, + paused: strategyState.paused ?? null, + threshold_pct: strategyState.threshold_pct ?? null, + max_notional_eure: strategyState.max_notional_eure ?? null, + latest_decision: latestDecision?.decision_id ? latestDecision : null, + recent_decisions: recentDecisions.length + ? recentDecisions + : [...durableDecisionsById.values()].slice(0, 20), + skipped_counts: strategyState.skipped_counts || {}, + durable_control_state: strategyState.durable_control_state || null, + }, + executor_state: { + armed: executorState.armed ?? null, + paused: executorState.paused ?? null, + draining: executorState.draining ?? null, + in_flight_count: executorState.in_flight_count ?? 0, + completed_count: executorState.completed_count ?? 0, + last_command: executorState.last_command || null, + last_venue_response: executorState.last_venue_response || null, + last_error: executorState.last_error || null, + signer_registered: executorState.signer_registered ?? null, + account_id: executorState.account_id || null, + signer_public_key: executorState.signer_public_key || null, + }, + relevant_alerts: activeAlerts.filter((alert) => ( + ['strategy-engine', 'trade-executor', 'liquidity-manager'].includes(alert.service_scope) + )), + omitted_controls: [ + 'Strategy arm and disarm are intentionally absent in this turn.', + 'Executor arm and drain are intentionally absent in this turn.', + 'Live withdrawal submission is intentionally absent in this turn.', + ], + }; +} + +function buildSystemSummary({ servicesByName, activeAlerts, recentAlerts }) { + const historyWriterState = servicesByName['history-writer']?.state || {}; + const sentinelServiceHealth = new Map( + (servicesByName['ops-sentinel']?.state?.service_health || []).map((entry) => [entry.service, entry]), + ); + + return { + service_health: Object.values(servicesByName).map((snapshot) => ( + summarizeServiceSnapshot(snapshot, { + authoritativeHealth: sentinelServiceHealth.get(snapshot.service) || null, + activeAlerts, + }) + )), + alerts: { + active: activeAlerts, + recent: recentAlerts, + }, + persistence: { + database_connectivity: historyWriterState.database_connectivity ?? null, + last_write_at: historyWriterState.last_write_at || null, + last_alert_write_at: historyWriterState.last_alert_write_at || null, + last_funding_observation_write_at: historyWriterState.last_funding_observation_write_at || null, + last_metrics_at: historyWriterState.last_metrics_at || null, + latest_portfolio_metrics: historyWriterState.latest_portfolio_metrics || null, + offsets: historyWriterState.offsets || {}, + metrics_error: historyWriterState.metrics_error || null, + }, + controls: listDashboardControls({ page: 'system' }), + }; +} + +function summarizeServiceSnapshot(snapshot, { authoritativeHealth = null, activeAlerts = [] } = {}) { + const state = snapshot.state || {}; + const health = snapshot.health || {}; + const derived = authoritativeHealth || deriveServiceHealth({ + service: snapshot.service, + snapshot, + activeAlerts: activeAlerts.filter((alert) => alert.service_scope === snapshot.service), + }); + const freshnessAt = derived.freshness_at || inferServiceFreshnessTimestamp(snapshot.service, state, health); + + return { + service: snapshot.service, + label: snapshot.label, + base_url: snapshot.base_url, + reachable: snapshot.reachable, + health_ok: derived.health_ok, + health_status: derived.status, + health_reasons: derived.reasons || [], + highest_alert_severity: derived.highest_alert_severity || null, + paused: derived.paused ?? state.paused ?? health.paused ?? null, + armed: derived.armed ?? state.armed ?? null, + draining: state.draining ?? null, + freshness_at: freshnessAt, + freshness_age_ms: derived.freshness_age_ms ?? ageMs(freshnessAt), + last_error: state.last_error || health.last_error || null, + summary: buildServiceSummary(snapshot.service, state), + }; +} + +function buildServiceSummary(service, state) { + switch (service) { + case 'near-intents-ingest': + return { + connected: state.ingest?.connected ?? null, + reconnect_count: state.ingest?.reconnect_count ?? null, + pair_filter: state.pair_filter?.pair_filter || null, + last_message_at: state.ingest?.last_message_at || null, + last_matching_quote_at: state.ingest?.last_matching_quote_at || null, + last_published_at: state.ingest?.last_published_at || null, + }; + case 'market-reference-ingest': + return { + last_published_at: state.last_published_at || null, + source_used: state.kraken?.healthy ? 'kraken' : state.coingecko?.healthy ? 'coingecko' : null, + }; + case 'inventory-sync': + return { + last_sync_at: state.last_sync_at || null, + reconciliation_status: state.last_snapshot?.reconciliation_status || null, + }; + case 'liquidity-manager': + return { + last_refresh_at: state.last_refresh_at || null, + funding_observer_paused: state.funding_observer_paused ?? null, + withdrawals_frozen: state.withdrawals_frozen ?? null, + }; + case 'history-writer': + return { + last_write_at: state.last_write_at || null, + last_alert_write_at: state.last_alert_write_at || null, + database_connectivity: state.database_connectivity ?? null, + }; + case 'ops-sentinel': + return { + last_event_at: state.last_event_at || null, + last_runtime_eval_at: state.last_runtime_eval_at || null, + active_alert_count: state.active_alerts?.length || 0, + stale: state.stale ?? null, + }; + case 'strategy-engine': + return { + latest_decision_id: state.latest_decision?.decision_id || null, + threshold_pct: state.threshold_pct ?? null, + max_notional_eure: state.max_notional_eure ?? null, + }; + case 'trade-executor': + return { + in_flight_count: state.in_flight_count ?? 0, + completed_count: state.completed_count ?? 0, + signer_registered: state.signer_registered ?? null, + relay_connected: state.relay?.connected ?? null, + relay_last_message_at: state.relay?.last_message_at || null, + }; + case 'operator-dashboard': + return { + source_error_count: state.source_error_count ?? 0, + last_source_error_at: state.last_source_error_at || null, + last_bootstrap_at: state.last_bootstrap_at || null, + }; + default: + return {}; + } +} + +function inferServiceFreshnessTimestamp(service, state, health) { + return inferRuntimeFreshnessTimestamp(service, state, health); +} + +function normalizeTradeForUi({ config, trade }) { + const assetIn = config.assetRegistry.get(trade.asset_in); + const assetOut = config.assetRegistry.get(trade.asset_out); + + return { + ...trade, + asset_in_symbol: assetIn?.symbol || trade.asset_in, + asset_out_symbol: assetOut?.symbol || trade.asset_out, + amount_in_display: formatUnits(trade.amount_in || '0', assetIn?.decimals || 0), + amount_out_display: formatUnits(trade.amount_out || '0', assetOut?.decimals || 0), + }; +} + +function buildRecentDepositItems({ config, recentDepositStatuses, liquidityState }) { + const recentItems = (recentDepositStatuses || []).map((entry) => normalizeDepositStatusForUi({ + config, + depositStatus: entry, + })); + const itemsByKey = new Map( + recentItems.map((item) => [buildFundingActivityKey(item), item]), + ); + + for (const deposit of Object.values(liquidityState?.deposits || {})) { + const fallbackItem = normalizeLiquidityDepositForUi({ + config, + deposit, + observedAt: liquidityState?.last_refresh_at || null, + }); + const key = buildFundingActivityKey(fallbackItem); + if (!itemsByKey.has(key)) { + itemsByKey.set(key, fallbackItem); + } + } + + return [...itemsByKey.values()]; +} + +function mergeFundingActivityItems({ observerItems, depositItems }) { + const merged = new Map(); + + for (const item of depositItems || []) { + merged.set(buildFundingActivityKey(item), item); + } + for (const item of observerItems || []) { + merged.set(buildFundingActivityKey(item), item); + } + + return [...merged.values()]; +} + +function latestFundingActivityAt(items, fallback = null) { + let latest = fallback; + for (const item of items || []) { + if (sortTimestamps(fundingActivityTimestamp(item), latest) > 0) { + latest = fundingActivityTimestamp(item); + } + } + return latest; +} + +function fundingActivityTimestamp(item) { + return item?.last_seen_at || item?.credited_at || item?.first_seen_at || null; +} + +function buildFundingActivityKey(item) { + return [ + item?.tx_hash || 'no-tx', + item?.chain || 'no-chain', + item?.asset_id || 'no-asset', + item?.funding_handle || 'no-handle', + item?.amount_units || 'no-amount', + ].join('|'); +} + +function normalizeFundingObservationForUi({ config, observation }) { + const asset = config.assetRegistry.get(observation.asset_id); + return { + funding_observation_id: observation.funding_observation_id, + asset_id: observation.asset_id, + symbol: asset?.symbol || observation.asset_id, + chain: observation.chain, + funding_handle: observation.funding_handle, + tx_hash: observation.tx_hash, + status: observation.status, + amount_units: observation.amount, + amount: formatUnits(observation.amount || '0', asset?.decimals || 0), + confirmations: observation.confirmations, + first_seen_at: observation.first_seen_at, + last_seen_at: observation.last_seen_at, + credited_at: observation.credited_at || null, + }; +} + +function normalizeDepositStatusForUi({ config, depositStatus }) { + const payload = depositStatus?.payload || {}; + const details = payload.details || {}; + return normalizeLiquidityDepositForUi({ + config, + deposit: { + tx_hash: details.tx_hash || null, + chain: payload.chain || details.chain || null, + asset_id: payload.asset_id || details.asset_id || null, + amount: details.amount || '0', + address: details.address || details.deposit_address || null, + status: payload.status || details.status || null, + }, + observedAt: depositStatus?.observed_at || depositStatus?.ingested_at || null, + }); +} + +function normalizeLiquidityDepositForUi({ config, deposit, observedAt }) { + const asset = config.assetRegistry.get(deposit?.asset_id); + const status = deposit?.status || null; + const timestamp = observedAt || null; + + return { + funding_observation_id: null, + asset_id: deposit?.asset_id || null, + symbol: asset?.symbol || deposit?.asset_id || null, + chain: deposit?.chain || null, + funding_handle: deposit?.address || null, + tx_hash: deposit?.tx_hash || null, + status, + amount_units: String(deposit?.amount || '0'), + amount: formatUnits(deposit?.amount || '0', asset?.decimals || 0), + confirmations: null, + first_seen_at: timestamp, + last_seen_at: timestamp, + credited_at: CREDITED_FUNDING_STATUSES.has(String(status || '').toUpperCase()) ? timestamp : null, + }; +} + +function normalizeDecision(decision) { + if (!decision) return null; + return { + decision_id: decision.decision_id || null, + decision_at: decision.decision_at || null, + quote_id: decision.quote_id || null, + pair: decision.pair || null, + direction: decision.direction || null, + request_kind: decision.request_kind || null, + decision: decision.decision || null, + decision_reason: decision.decision_reason || null, + gross_edge_pct: decision.gross_edge_pct || null, + threshold_pct: decision.threshold_pct || null, + max_notional_eure: decision.max_notional_eure || null, + strategy_armed: decision.strategy_armed ?? null, + inventory_asset: decision.inventory_asset || null, + eure_notional: decision.eure_notional || null, + }; +} + +function normalizeAlertList(alerts) { + return (alerts || []).map(normalizeAlert).sort((left, right) => sortTimestamps( + right.raised_at || right.first_raised_at || right.cleared_at, + left.raised_at || left.first_raised_at || left.cleared_at, + )); +} + +function normalizeAlert(alert) { + return { + alert_code: alert.alert_code, + status: alert.status, + severity: alert.severity, + reason: alert.reason, + service_scope: alert.service_scope, + pair: alert.pair || null, + asset_id: alert.asset_id || null, + tx_hash: alert.tx_hash || null, + raised_at: alert.raised_at || null, + first_raised_at: alert.first_raised_at || null, + cleared_at: alert.cleared_at || null, + last_evaluated_at: alert.last_evaluated_at || null, + details: alert.details || {}, + }; +} + +function appendUniqueRecentQuote(quotes, nextQuote, limit) { + const deduped = [nextQuote, ...quotes.filter((quote) => quote.quote_id !== nextQuote.quote_id)]; + return deduped.slice(0, limit); +} + +function normalizeLiveQuote(payload, event) { + if (!payload?.quote_id) return null; + return { + quote_id: payload.quote_id, + pair: payload.pair || `${payload.asset_in}->${payload.asset_out}`, + asset_in: payload.asset_in || null, + asset_out: payload.asset_out || null, + request_kind: payload.request_kind || null, + amount_in: payload.amount_in ?? null, + amount_out: payload.amount_out ?? null, + observed_at: event.observed_at || null, + ingested_at: event.ingested_at || null, + }; +} + +function buildAlertKey(alert) { + return [ + alert.alert_code, + alert.service_scope, + alert.pair || '', + alert.asset_id || '', + alert.tx_hash || '', + ].join('|'); +} + +function highestAlertSeverity(alerts) { + return (alerts || []).reduce((highest, alert) => { + const currentRank = ALERT_SEVERITY_ORDER[highest] || 0; + const nextRank = ALERT_SEVERITY_ORDER[alert.severity] || 0; + return nextRank > currentRank ? alert.severity : highest; + }, null); +} + +function computeCurrentPortfolioValue({ inventory, marketPrice, btcAsset, eureAsset }) { + if (!inventory || !marketPrice || !btcAsset || !eureAsset) return null; + + const btcUnits = String(inventory.spendable?.[btcAsset.assetId] || '0'); + const eureUnits = String(inventory.spendable?.[eureAsset.assetId] || '0'); + const btcScaled = unitsToScaledDecimal(btcUnits, btcAsset.decimals); + const eureScaled = unitsToScaledDecimal(eureUnits, eureAsset.decimals); + const priceScaled = parseScaledDecimal(marketPrice.eure_per_btc || marketPrice.eur_per_btc || '0'); + const total = eureScaled + multiplyScaled(btcScaled, priceScaled); + + return formatScaledDecimal(total); +} + +function valueAssetInEur({ asset, units, marketPrice }) { + if (!asset) return null; + if (asset.symbol === 'EURe') { + return formatUnits(units || '0', asset.decimals); + } + if (!marketPrice || asset.symbol !== 'BTC') return null; + + const scaledUnits = unitsToScaledDecimal(units || '0', asset.decimals); + const priceScaled = parseScaledDecimal(marketPrice.eure_per_btc || marketPrice.eur_per_btc || '0'); + return formatScaledDecimal(multiplyScaled(scaledUnits, priceScaled)); +} + +function formatDecimalDifference(left, right) { + return formatScaledDecimal(parseScaledDecimal(left) - parseScaledDecimal(right)); +} + +function unitsToScaledDecimal(units, decimals) { + return BigInt(String(units || '0')) * 10n ** BigInt(DECIMAL_SCALE - decimals); +} + +function parseScaledDecimal(value) { + const normalized = String(value ?? '0').trim(); + const negative = normalized.startsWith('-'); + const unsigned = normalized.replace(/^[+-]/, ''); + const [wholePart, fractionalPart = ''] = unsigned.split('.'); + const whole = BigInt(wholePart || '0'); + const fractional = BigInt( + (fractionalPart.padEnd(DECIMAL_SCALE, '0')).slice(0, DECIMAL_SCALE) || '0', + ); + const scaled = (whole * DECIMAL_FACTOR) + fractional; + return negative ? -scaled : scaled; +} + +function multiplyScaled(left, right) { + return (left * right) / DECIMAL_FACTOR; +} + +function formatScaledDecimal(value) { + const negative = value < 0n; + const absolute = negative ? -value : value; + const whole = absolute / DECIMAL_FACTOR; + const fractional = absolute % DECIMAL_FACTOR; + if (fractional === 0n) { + return `${negative ? '-' : ''}${whole}`; + } + const fractionalText = fractional + .toString() + .padStart(DECIMAL_SCALE, '0') + .replace(/0+$/, ''); + return `${negative ? '-' : ''}${whole}.${fractionalText}`; +} + +function formatUnits(units, decimals) { + const numeric = unitsToNumber(units, decimals); + if (!Number.isFinite(numeric)) return '0'; + if (Math.abs(numeric) >= 1000) return numeric.toLocaleString('en-US', { maximumFractionDigits: 8 }); + return numeric.toLocaleString('en-US', { maximumFractionDigits: 8 }); +} + +function ageMs(value) { + const parsed = Date.parse(value || ''); + if (!Number.isFinite(parsed)) return null; + return Math.max(0, Date.now() - parsed); +} + +function sortTimestamps(left, right) { + return timestampValue(left) - timestampValue(right); +} + +function timestampValue(value) { + const parsed = Date.parse(value || ''); + return Number.isFinite(parsed) ? parsed : -Infinity; +} diff --git a/src/core/runtime-health.mjs b/src/core/runtime-health.mjs new file mode 100644 index 0000000..4fdc475 --- /dev/null +++ b/src/core/runtime-health.mjs @@ -0,0 +1,249 @@ +export const SERVICE_HEALTH_LEVELS = ['healthy', 'warning', 'critical', 'offline', 'paused']; + +const HEALTH_RANK = { + healthy: 0, + warning: 1, + critical: 2, + offline: 3, + paused: 1, +}; + +const ALERT_RANK = { + info: 0, + warning: 1, + critical: 2, +}; + +export function createRuntimeHealthThresholds(config = {}) { + return { + ingestMessageStaleMs: Number(config.opsSentinelIngestMessageStaleMs || 30_000), + ingestQuoteStaleMs: Number(config.opsSentinelIngestQuoteStaleMs || 30_000), + ingestPublishStaleMs: Number(config.opsSentinelIngestPublishStaleMs || 30_000), + executorRelayStaleMs: Number(config.opsSentinelExecutorRelayStaleMs || 30_000), + historyWriterStaleMs: Number(config.opsSentinelHistoryWriterStaleMs || 45_000), + dashboardSourceDegradedMs: Number(config.opsSentinelDashboardSourceDegradedMs || 30_000), + sentinelStaleMs: Number(config.opsSentinelSelfStaleMs || 20_000), + anomalyWindowSize: Number(config.opsSentinelAnomalyWindowSize || 6), + anomalyQuoteRateCollapseRatio: Number(config.opsSentinelAnomalyQuoteRateCollapseRatio || 0.25), + anomalyReconnectSpikeMultiplier: Number(config.opsSentinelAnomalyReconnectSpikeMultiplier || 2), + containmentCooldownMs: Number(config.opsSentinelContainmentCooldownMs || 60_000), + }; +} + +export function evaluateRuntimeHealth({ + servicesByName, + activePair, + activeAlerts = [], + now = new Date().toISOString(), +} = {}) { + const serviceHealth = new Map(); + const alertIndex = indexAlertsByService(activeAlerts); + + for (const [service, snapshot] of Object.entries(servicesByName || {})) { + const alerts = alertIndex.get(service) || []; + serviceHealth.set(service, deriveServiceHealth({ + service, + snapshot, + activePair, + activeAlerts: alerts, + now, + })); + } + + return serviceHealth; +} + +export function deriveServiceHealth({ + service, + snapshot, + activePair = null, + activeAlerts = [], + now = new Date().toISOString(), +} = {}) { + const state = snapshot?.state || {}; + const health = snapshot?.health || {}; + const reachable = snapshot?.reachable !== false; + const paused = state.paused ?? health.paused ?? false; + const highestAlertSeverity = highestAlertSeverityForService(activeAlerts); + const freshnessAt = inferServiceFreshnessTimestamp(service, state, health); + const freshnessAgeMs = ageMs(freshnessAt, now); + const reasons = []; + let status = paused ? 'paused' : reachable ? 'healthy' : 'offline'; + + if (!reachable) { + reasons.push('service unreachable'); + } + + if (health.ok === false && reachable) { + status = escalateHealth(status, 'critical'); + reasons.push(health.reason || 'service health check failed'); + } + + if (highestAlertSeverity === 'critical') { + status = escalateHealth(status, 'critical'); + reasons.push(`critical alert active (${activeAlerts[0]?.alert_code || 'runtime'})`); + } else if (highestAlertSeverity === 'warning') { + status = escalateHealth(status, 'warning'); + reasons.push(`warning alert active (${activeAlerts[0]?.alert_code || 'runtime'})`); + } + + if (service === 'near-intents-ingest') { + if (state.ingest?.connected === false) { + status = escalateHealth(status, 'critical'); + reasons.push('websocket disconnected'); + } + if (state.ingest?.last_matching_quote_at && state.ingest?.last_published_at) { + const matchingAgeMs = ageMs(state.ingest.last_matching_quote_at, now); + const publishedAgeMs = ageMs(state.ingest.last_published_at, now); + if (matchingAgeMs != null && publishedAgeMs != null && publishedAgeMs > matchingAgeMs + 5_000) { + status = escalateHealth(status, 'critical'); + reasons.push('quote publish path stalled'); + } + } + } + + if (service === 'trade-executor' && state.relay?.connected === false) { + status = escalateHealth(status, 'critical'); + reasons.push('solver relay disconnected'); + } + + if (service === 'history-writer') { + if (state.database_connectivity === false) { + status = escalateHealth(status, 'critical'); + reasons.push('database connectivity failed'); + } else if (freshnessAgeMs != null && freshnessAgeMs > 45_000) { + status = escalateHealth(status, 'warning'); + reasons.push('writer freshness degraded'); + } + } + + if (service === 'operator-dashboard') { + if ((state.source_error_count || 0) > 0 || (health.source_error_count || 0) > 0) { + status = escalateHealth(status, 'warning'); + reasons.push('dashboard source degraded'); + } + } + + if ( + ['strategy-engine', 'trade-executor'].includes(service) + && (state.armed ?? false) + && hasCriticalTruthAlert(activeAlerts, activePair) + ) { + status = escalateHealth(status, 'critical'); + reasons.push('armed while critical upstream truth is stale'); + } + + return { + service, + status, + reachable, + paused, + armed: state.armed ?? null, + health_ok: status === 'healthy' || status === 'paused', + highest_alert_severity: highestAlertSeverity, + reasons, + freshness_at: freshnessAt, + freshness_age_ms: freshnessAgeMs, + }; +} + +export function inferServiceFreshnessTimestamp(service, state = {}, health = {}) { + switch (service) { + case 'near-intents-ingest': + return ( + state.ingest?.last_published_at + || state.ingest?.last_matching_quote_at + || state.ingest?.last_message_at + || null + ); + case 'market-reference-ingest': + return state.last_published_at || null; + case 'inventory-sync': + return state.last_sync_at || null; + case 'liquidity-manager': + return state.last_refresh_at || null; + case 'history-writer': + return state.last_write_at || state.last_metrics_at || null; + case 'ops-sentinel': + return state.last_runtime_eval_at || state.last_evaluated_at || health.last_event_at || null; + case 'strategy-engine': + return state.latest_decision?.decision_at || state.latest_inventory_event?.ingested_at || null; + case 'trade-executor': + return state.relay?.last_message_at || state.last_quote_status?.created_at || null; + case 'operator-dashboard': + return state.last_bootstrap_at || state.last_source_error_at || null; + default: + return null; + } +} + +export function buildRuntimeAlert({ + alert_code, + severity, + reason, + service_scope, + pair = null, + asset_id = null, + tx_hash = null, + details = {}, +}) { + return { + alert_code, + severity, + reason, + service_scope, + pair, + asset_id, + tx_hash, + details, + }; +} + +export function ageMs(value, now = new Date().toISOString()) { + if (!value) return null; + const left = new Date(value).getTime(); + const right = new Date(now).getTime(); + if (!Number.isFinite(left) || !Number.isFinite(right)) return null; + return Math.max(0, right - left); +} + +function highestAlertSeverityForService(alerts) { + let highest = null; + let highestRank = -1; + for (const alert of alerts || []) { + const rank = ALERT_RANK[alert.severity] ?? -1; + if (rank > highestRank) { + highest = alert.severity; + highestRank = rank; + } + } + return highest; +} + +function indexAlertsByService(activeAlerts) { + const index = new Map(); + for (const alert of activeAlerts || []) { + const list = index.get(alert.service_scope) || []; + list.push(alert); + index.set(alert.service_scope, list); + } + return index; +} + +function hasCriticalTruthAlert(alerts, activePair) { + return (alerts || []).some((alert) => ( + alert.severity === 'critical' + && ( + alert.pair == null + || alert.pair === activePair + || alert.alert_code.includes('stale') + || alert.alert_code.includes('disconnected') + ) + )); +} + +function escalateHealth(current, next) { + const currentRank = HEALTH_RANK[current] ?? 0; + const nextRank = HEALTH_RANK[next] ?? 0; + return nextRank > currentRank ? next : current; +} diff --git a/src/lib/config.mjs b/src/lib/config.mjs index 6f79d0d..57051df 100644 --- a/src/lib/config.mjs +++ b/src/lib/config.mjs @@ -19,6 +19,7 @@ const DEFAULTS = { strategyEngineControlPort: 8086, tradeExecutorControlPort: 8087, opsSentinelControlPort: 8088, + operatorDashboardControlPort: 8090, kafkaBrokers: ['127.0.0.1:9092'], kafkaClientId: 'unrip', kafkaTopicRawNearIntentsQuote: 'raw.near_intents.quote', @@ -36,6 +37,7 @@ const DEFAULTS = { kafkaConsumerGroupStrategy: 'strategy-engine-v1', kafkaConsumerGroupExecutor: 'trade-executor-v1', kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1', + kafkaConsumerGroupOperatorDashboard: 'operator-dashboard-v1', strategyStateDir: './var/strategy-state', executorStateDir: './var/executor-state', liquidityStateDir: './var/liquidity-state', @@ -73,6 +75,26 @@ const DEFAULTS = { fundingObservationStuckMs: 60 * 60 * 1000, opsSentinelEvaluationMs: 5_000, opsSentinelFundingCreditPendingMs: 5 * 60 * 1000, + opsSentinelIngestMessageStaleMs: 30_000, + opsSentinelIngestQuoteStaleMs: 30_000, + opsSentinelIngestPublishStaleMs: 30_000, + opsSentinelExecutorRelayStaleMs: 30_000, + opsSentinelHistoryWriterStaleMs: 45_000, + opsSentinelDashboardSourceDegradedMs: 30_000, + opsSentinelSelfStaleMs: 20_000, + opsSentinelAnomalyWindowSize: 6, + opsSentinelAnomalyQuoteRateCollapseRatio: 0.25, + opsSentinelAnomalyReconnectSpikeMultiplier: 2, + opsSentinelContainmentCooldownMs: 60_000, + opsSentinelAlertWebhookUrl: '', + opsSentinelAlertWebhookTimeoutMs: 5_000, + operatorDashboardAuthMode: 'stub', + operatorDashboardAuthUsername: 'admin', + operatorDashboardAuthPassword: '', + operatorDashboardAuthRealm: 'unrip operator dashboard', + operatorDashboardQuoteLimit: 10, + operatorDashboardTradePageSize: 20, + operatorDashboardUpstreamTimeoutMs: 3_000, }; function splitCsv(value) { @@ -106,6 +128,13 @@ function buildAsset({ assetId, symbol, decimals, chain, withdrawAddress = '' }) }; } +function defaultControlBaseUrl({ serviceName, port, namespace }) { + if (process.env.KUBERNETES_SERVICE_HOST) { + return `http://${serviceName}.${namespace}.svc.cluster.local:${port}`; + } + return `http://127.0.0.1:${port}`; +} + export function loadConfig({ envPath = '.env' } = {}) { loadDotenv(envPath); @@ -157,48 +186,134 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.NEAR_INTENTS_CONTROL_PORT, DEFAULTS.nearIntentsControlPort, ), + nearIntentsControlBaseUrl: + process.env.NEAR_INTENTS_CONTROL_BASE_URL + || defaultControlBaseUrl({ + serviceName: 'near-intents-ingest', + port: parseNumber( + process.env.NEAR_INTENTS_CONTROL_PORT, + DEFAULTS.nearIntentsControlPort, + ), + namespace: projectNamespace, + }), marketReferenceControlHost: process.env.MARKET_REFERENCE_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, marketReferenceControlPort: parseNumber( process.env.MARKET_REFERENCE_CONTROL_PORT, DEFAULTS.marketReferenceControlPort, ), + marketReferenceControlBaseUrl: + process.env.MARKET_REFERENCE_CONTROL_BASE_URL + || defaultControlBaseUrl({ + serviceName: 'market-reference-ingest', + port: parseNumber( + process.env.MARKET_REFERENCE_CONTROL_PORT, + DEFAULTS.marketReferenceControlPort, + ), + namespace: projectNamespace, + }), inventorySyncControlHost: process.env.INVENTORY_SYNC_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, inventorySyncControlPort: parseNumber( process.env.INVENTORY_SYNC_CONTROL_PORT, DEFAULTS.inventorySyncControlPort, ), + inventorySyncControlBaseUrl: + process.env.INVENTORY_SYNC_CONTROL_BASE_URL + || defaultControlBaseUrl({ + serviceName: 'inventory-sync', + port: parseNumber( + process.env.INVENTORY_SYNC_CONTROL_PORT, + DEFAULTS.inventorySyncControlPort, + ), + namespace: projectNamespace, + }), liquidityManagerControlHost: process.env.LIQUIDITY_MANAGER_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, liquidityManagerControlPort: parseNumber( process.env.LIQUIDITY_MANAGER_CONTROL_PORT, DEFAULTS.liquidityManagerControlPort, ), + liquidityManagerControlBaseUrl: + process.env.LIQUIDITY_MANAGER_CONTROL_BASE_URL + || defaultControlBaseUrl({ + serviceName: 'liquidity-manager', + port: parseNumber( + process.env.LIQUIDITY_MANAGER_CONTROL_PORT, + DEFAULTS.liquidityManagerControlPort, + ), + namespace: projectNamespace, + }), historyWriterControlHost: process.env.HISTORY_WRITER_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, historyWriterControlPort: parseNumber( process.env.HISTORY_WRITER_CONTROL_PORT, DEFAULTS.historyWriterControlPort, ), + historyWriterControlBaseUrl: + process.env.HISTORY_WRITER_CONTROL_BASE_URL + || defaultControlBaseUrl({ + serviceName: 'history-writer', + port: parseNumber( + process.env.HISTORY_WRITER_CONTROL_PORT, + DEFAULTS.historyWriterControlPort, + ), + namespace: projectNamespace, + }), opsSentinelControlHost: process.env.OPS_SENTINEL_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, opsSentinelControlPort: parseNumber( process.env.OPS_SENTINEL_CONTROL_PORT, DEFAULTS.opsSentinelControlPort, ), + opsSentinelControlBaseUrl: + process.env.OPS_SENTINEL_CONTROL_BASE_URL + || defaultControlBaseUrl({ + serviceName: 'ops-sentinel', + port: parseNumber( + process.env.OPS_SENTINEL_CONTROL_PORT, + DEFAULTS.opsSentinelControlPort, + ), + namespace: projectNamespace, + }), strategyEngineControlHost: process.env.STRATEGY_ENGINE_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, strategyEngineControlPort: parseNumber( process.env.STRATEGY_ENGINE_CONTROL_PORT, DEFAULTS.strategyEngineControlPort, ), + strategyEngineControlBaseUrl: + process.env.STRATEGY_ENGINE_CONTROL_BASE_URL + || defaultControlBaseUrl({ + serviceName: 'strategy-engine', + port: parseNumber( + process.env.STRATEGY_ENGINE_CONTROL_PORT, + DEFAULTS.strategyEngineControlPort, + ), + namespace: projectNamespace, + }), tradeExecutorControlHost: process.env.TRADE_EXECUTOR_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, tradeExecutorControlPort: parseNumber( process.env.TRADE_EXECUTOR_CONTROL_PORT, DEFAULTS.tradeExecutorControlPort, ), + tradeExecutorControlBaseUrl: + process.env.TRADE_EXECUTOR_CONTROL_BASE_URL + || defaultControlBaseUrl({ + serviceName: 'trade-executor', + port: parseNumber( + process.env.TRADE_EXECUTOR_CONTROL_PORT, + DEFAULTS.tradeExecutorControlPort, + ), + namespace: projectNamespace, + }), + operatorDashboardControlHost: + process.env.OPERATOR_DASHBOARD_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, + operatorDashboardControlPort: parseNumber( + process.env.OPERATOR_DASHBOARD_CONTROL_PORT, + DEFAULTS.operatorDashboardControlPort, + ), kafkaBrokers: splitCsv(process.env.KAFKA_BROKERS).length ? splitCsv(process.env.KAFKA_BROKERS) : DEFAULTS.kafkaBrokers, @@ -233,6 +348,9 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor, kafkaConsumerGroupOpsSentinel: process.env.KAFKA_CONSUMER_GROUP_OPS_SENTINEL || DEFAULTS.kafkaConsumerGroupOpsSentinel, + kafkaConsumerGroupOperatorDashboard: + process.env.KAFKA_CONSUMER_GROUP_OPERATOR_DASHBOARD + || DEFAULTS.kafkaConsumerGroupOperatorDashboard, strategyStateDir: process.env.STRATEGY_STATE_DIR || DEFAULTS.strategyStateDir, executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir, liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir, @@ -333,5 +451,75 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.OPS_SENTINEL_FUNDING_STUCK_MS, DEFAULTS.fundingObservationStuckMs, ), + opsSentinelIngestMessageStaleMs: parseNumber( + process.env.OPS_SENTINEL_INGEST_MESSAGE_STALE_MS, + DEFAULTS.opsSentinelIngestMessageStaleMs, + ), + opsSentinelIngestQuoteStaleMs: parseNumber( + process.env.OPS_SENTINEL_INGEST_QUOTE_STALE_MS, + DEFAULTS.opsSentinelIngestQuoteStaleMs, + ), + opsSentinelIngestPublishStaleMs: parseNumber( + process.env.OPS_SENTINEL_INGEST_PUBLISH_STALE_MS, + DEFAULTS.opsSentinelIngestPublishStaleMs, + ), + opsSentinelExecutorRelayStaleMs: parseNumber( + process.env.OPS_SENTINEL_EXECUTOR_RELAY_STALE_MS, + DEFAULTS.opsSentinelExecutorRelayStaleMs, + ), + opsSentinelHistoryWriterStaleMs: parseNumber( + process.env.OPS_SENTINEL_HISTORY_WRITER_STALE_MS, + DEFAULTS.opsSentinelHistoryWriterStaleMs, + ), + opsSentinelDashboardSourceDegradedMs: parseNumber( + process.env.OPS_SENTINEL_DASHBOARD_SOURCE_DEGRADED_MS, + DEFAULTS.opsSentinelDashboardSourceDegradedMs, + ), + opsSentinelSelfStaleMs: parseNumber( + process.env.OPS_SENTINEL_SELF_STALE_MS, + DEFAULTS.opsSentinelSelfStaleMs, + ), + opsSentinelAnomalyWindowSize: parseNumber( + process.env.OPS_SENTINEL_ANOMALY_WINDOW_SIZE, + DEFAULTS.opsSentinelAnomalyWindowSize, + ), + opsSentinelAnomalyQuoteRateCollapseRatio: parseNumber( + process.env.OPS_SENTINEL_ANOMALY_QUOTE_RATE_COLLAPSE_RATIO, + DEFAULTS.opsSentinelAnomalyQuoteRateCollapseRatio, + ), + opsSentinelAnomalyReconnectSpikeMultiplier: parseNumber( + process.env.OPS_SENTINEL_ANOMALY_RECONNECT_SPIKE_MULTIPLIER, + DEFAULTS.opsSentinelAnomalyReconnectSpikeMultiplier, + ), + opsSentinelContainmentCooldownMs: parseNumber( + process.env.OPS_SENTINEL_CONTAINMENT_COOLDOWN_MS, + DEFAULTS.opsSentinelContainmentCooldownMs, + ), + opsSentinelAlertWebhookUrl: + process.env.OPS_SENTINEL_ALERT_WEBHOOK_URL || DEFAULTS.opsSentinelAlertWebhookUrl, + opsSentinelAlertWebhookTimeoutMs: parseNumber( + process.env.OPS_SENTINEL_ALERT_WEBHOOK_TIMEOUT_MS, + DEFAULTS.opsSentinelAlertWebhookTimeoutMs, + ), + operatorDashboardAuthMode: + process.env.OPERATOR_DASHBOARD_AUTH_MODE || DEFAULTS.operatorDashboardAuthMode, + operatorDashboardAuthUsername: + process.env.OPERATOR_DASHBOARD_AUTH_USERNAME || DEFAULTS.operatorDashboardAuthUsername, + operatorDashboardAuthPassword: + process.env.OPERATOR_DASHBOARD_AUTH_PASSWORD || DEFAULTS.operatorDashboardAuthPassword, + operatorDashboardAuthRealm: + process.env.OPERATOR_DASHBOARD_AUTH_REALM || DEFAULTS.operatorDashboardAuthRealm, + operatorDashboardQuoteLimit: parseNumber( + process.env.OPERATOR_DASHBOARD_QUOTE_LIMIT, + DEFAULTS.operatorDashboardQuoteLimit, + ), + operatorDashboardTradePageSize: parseNumber( + process.env.OPERATOR_DASHBOARD_TRADE_PAGE_SIZE, + DEFAULTS.operatorDashboardTradePageSize, + ), + operatorDashboardUpstreamTimeoutMs: parseNumber( + process.env.OPERATOR_DASHBOARD_UPSTREAM_TIMEOUT_MS, + DEFAULTS.operatorDashboardUpstreamTimeoutMs, + ), }; } diff --git a/src/operator-dashboard/static/App.jsx b/src/operator-dashboard/static/App.jsx new file mode 100644 index 0000000..58d4918 --- /dev/null +++ b/src/operator-dashboard/static/App.jsx @@ -0,0 +1,188 @@ +import { useEffect, useReducer } from 'react'; + +import BannerStack from './components/BannerStack.jsx'; +import NavRail from './components/NavRail.jsx'; +import StatusBar from './components/StatusBar.jsx'; +import { fetchJson } from './lib/api.js'; +import FundsPage from './pages/FundsPage.jsx'; +import StrategyPage from './pages/StrategyPage.jsx'; +import SystemPage from './pages/SystemPage.jsx'; +import { dashboardReducer, initialDashboardState } from './state/dashboardReducer.js'; + +const TRADE_PAGE_SIZE = 20; + +function LoadingPanel() { + return ( +
+

Loading dashboard

+

Fetching session, durable history, and live service state.

+
+ ); +} + +export default function App() { + const [state, dispatch] = useReducer(dashboardReducer, initialDashboardState); + const currentPage = state.page || state.dashboard?.default_page || 'funds'; + const isReadyForSocket = Boolean(state.session && state.dashboard); + const criticalBanner = state.dashboard?.status_bar?.highest_alert_severity === 'critical' + ? 'Critical runtime alerts are active. Dashboard health is degraded until the underlying truth path recovers.' + : null; + + async function loadBootstrap(page = 1) { + const dashboard = await fetchJson(`/api/bootstrap?page=${page}&page_size=${TRADE_PAGE_SIZE}`); + dispatch({ type: 'bootstrap.loaded', dashboard }); + return dashboard; + } + + async function loadTradesPage(page) { + if (!Number.isFinite(page) || page < 1) return; + + dispatch({ type: 'notice.changed', notice: 'Loading trade history page...' }); + dispatch({ type: 'error.changed', error: null }); + + try { + const successfulTrades = await fetchJson(`/api/trades?page=${page}&page_size=${TRADE_PAGE_SIZE}`); + dispatch({ type: 'trades.loaded', successfulTrades }); + dispatch({ type: 'notice.changed', notice: null }); + } catch (error) { + dispatch({ type: 'error.changed', error: error.message }); + dispatch({ type: 'notice.changed', notice: null }); + } + } + + async function submitControl(service, action, body = {}, { reload = true } = {}) { + dispatch({ type: 'notice.changed', notice: `${action} in progress` }); + dispatch({ type: 'error.changed', error: null }); + + try { + const response = await fetchJson(`/api/control/${service}/${action}`, { + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: JSON.stringify(body || {}), + }); + + dispatch({ type: 'control.result', result: response.result }); + dispatch({ type: 'notice.changed', notice: `${action} completed` }); + + if (reload) { + const page = state.dashboard?.funds?.successful_trades?.page || 1; + await loadBootstrap(page); + } + } catch (error) { + dispatch({ type: 'error.changed', error: error.message }); + } + } + + useEffect(() => { + let cancelled = false; + + async function boot() { + try { + const session = await fetchJson('/api/session'); + if (cancelled) return; + dispatch({ type: 'session.loaded', session }); + await loadBootstrap(1); + } catch (error) { + if (cancelled) return; + dispatch({ type: 'error.changed', error: error.message }); + } + } + + boot(); + + return () => { + cancelled = true; + }; + }, []); + + useEffect(() => { + if (!isReadyForSocket) return undefined; + + let disposed = false; + let reconnectTimer = null; + let socket = null; + + function connect() { + if (disposed) return; + + dispatch({ type: 'websocket.state.changed', websocketState: 'connecting' }); + const scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'; + socket = new WebSocket(`${scheme}://${window.location.host}/ws`); + + socket.addEventListener('open', () => { + if (disposed) return; + dispatch({ type: 'websocket.state.changed', websocketState: 'connected' }); + dispatch({ type: 'error.changed', error: null }); + }); + + socket.addEventListener('close', () => { + if (disposed) return; + dispatch({ type: 'websocket.state.changed', websocketState: 'disconnected' }); + reconnectTimer = window.setTimeout(connect, 2000); + }); + + socket.addEventListener('error', () => { + if (disposed) return; + dispatch({ type: 'websocket.state.changed', websocketState: 'degraded' }); + }); + + socket.addEventListener('message', (event) => { + if (disposed) return; + dispatch({ type: 'socket.message.received', payload: JSON.parse(event.data) }); + }); + } + + connect(); + + return () => { + disposed = true; + if (reconnectTimer) window.clearTimeout(reconnectTimer); + socket?.close(); + }; + }, [isReadyForSocket]); + + return ( +
+ + + {!state.dashboard ? ( + + ) : ( + <> + + +
+ dispatch({ type: 'page.changed', page })} + /> + +
+ {currentPage === 'funds' ? ( + + ) : null} + {currentPage === 'strategy' ? ( + + ) : null} + {currentPage === 'system' ? ( + + ) : null} +
+
+ + )} +
+ ); +} diff --git a/src/operator-dashboard/static/components/BannerStack.jsx b/src/operator-dashboard/static/components/BannerStack.jsx new file mode 100644 index 0000000..87ddb68 --- /dev/null +++ b/src/operator-dashboard/static/components/BannerStack.jsx @@ -0,0 +1,18 @@ +export default function BannerStack({ notice, error, sourceErrors, criticalBanner }) { + return ( + <> + {criticalBanner ?
{criticalBanner}
: null} + {notice ?
{notice}
: null} + {error ?
{error}
: null} + {sourceErrors?.length ? ( +
+ {sourceErrors.map((item) => ( +
+ {`${item.source}: ${item.error?.message || 'source unavailable'}`} +
+ ))} +
+ ) : null} + + ); +} diff --git a/src/operator-dashboard/static/components/ServiceCard.jsx b/src/operator-dashboard/static/components/ServiceCard.jsx new file mode 100644 index 0000000..9bd9bc9 --- /dev/null +++ b/src/operator-dashboard/static/components/ServiceCard.jsx @@ -0,0 +1,23 @@ +import Pill from './Pill.jsx'; +import { formatAge, formatBoolean } from '../lib/format.js'; + +export default function ServiceCard({ service }) { + const healthLabel = service.health_status || (service.health_ok ? 'healthy' : service.reachable ? 'degraded' : 'offline'); + + return ( +
+
+ {service.label} + +
+
+
{`Paused ${formatBoolean(service.paused)}`}
+
{`Armed ${formatBoolean(service.armed)}`}
+
{`Freshness ${formatAge(service.freshness_age_ms)}`}
+ {service.health_reasons?.length ?
{service.health_reasons.join(' | ')}
: null} +
{service.base_url}
+ {service.last_error ?
{JSON.stringify(service.last_error)}
: null} +
+
+ ); +} diff --git a/src/operator-dashboard/static/pages/SystemPage.jsx b/src/operator-dashboard/static/pages/SystemPage.jsx new file mode 100644 index 0000000..3bdb04d --- /dev/null +++ b/src/operator-dashboard/static/pages/SystemPage.jsx @@ -0,0 +1,113 @@ +import AlertsGrid from '../components/AlertsGrid.jsx'; +import MetricCard from '../components/MetricCard.jsx'; +import ServiceCard from '../components/ServiceCard.jsx'; +import TableFrame from '../components/TableFrame.jsx'; +import { formatBoolean, formatTimestamp } from '../lib/format.js'; + +export default function SystemPage({ system, onControl }) { + return ( + <> +
+
+
+
Runtime health
+

System

+
+ Service health, alerting truth, writer freshness, and only safe control actions. +
+
+
+
+ {system.controls.map((control) => ( + + ))} +
+
+ +
+
+
+
Service view
+

Health and freshness

+
+
+
+ {system.service_health.map((service) => ( + + ))} +
+
+ +
+
+
+
+
Alert state
+

Active alerts

+
+
+ +
+
+
+
+
Alert history
+

Recent transitions

+
+
+ +
+
+ +
+
+
+
Persistence
+

Writer offsets and durability

+
+
+
+ + +
+ + + + + + + + + + + {Object.entries(system.persistence.offsets || {}).length ? ( + Object.entries(system.persistence.offsets || {}).map(([topic, offset]) => ( + + + + + + )) + ) : ( + + + + )} + +
TopicPartitionOffset
{topic}{String(offset.partition ?? '')}{String(offset.offset ?? '')}
No offsets captured yet.
+
+
+ + ); +} diff --git a/test/alert-engine.test.mjs b/test/alert-engine.test.mjs index 7f76f90..94f82e2 100644 --- a/test/alert-engine.test.mjs +++ b/test/alert-engine.test.mjs @@ -101,3 +101,29 @@ test('executor submission failure produces an alert event and clears on recovery assert.equal(transitions[0].alert_code, 'executor_submission_failed'); assert.equal(transitions[0].status, 'cleared'); }); + +test('runtime alerts raise and clear independently from event-derived alerts', () => { + const engine = createEngine(); + + let transitions = engine.applyRuntimeAlerts([{ + alert_code: 'near_intents_quotes_stale', + severity: 'critical', + reason: 'quotes are stale', + service_scope: 'near-intents-ingest', + pair: 'nep141:btc.omft.near->nep141:eure.omft.near', + asset_id: null, + tx_hash: null, + details: { + age_ms: 100_000, + }, + }], '2026-04-03T08:00:00.000Z'); + + assert.equal(transitions.length, 1); + assert.equal(transitions[0].alert_code, 'near_intents_quotes_stale'); + assert.equal(transitions[0].status, 'raised'); + + transitions = engine.applyRuntimeAlerts([], '2026-04-03T08:00:05.000Z'); + assert.equal(transitions.length, 1); + assert.equal(transitions[0].alert_code, 'near_intents_quotes_stale'); + assert.equal(transitions[0].status, 'cleared'); +}); diff --git a/test/alert-notifier.test.mjs b/test/alert-notifier.test.mjs new file mode 100644 index 0000000..743df4a --- /dev/null +++ b/test/alert-notifier.test.mjs @@ -0,0 +1,53 @@ +import http from 'node:http'; +import assert from 'node:assert/strict'; +import test from 'node:test'; + +import { createAlertNotifier } from '../src/core/alert-notifier.mjs'; + +test('alert notifier dedupes repeated deliveries and records clear transitions separately', async () => { + const requests = []; + const server = http.createServer(async (req, res) => { + let body = ''; + for await (const chunk of req) body += chunk; + requests.push(JSON.parse(body)); + res.statusCode = 200; + res.end('{}'); + }); + + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const address = server.address(); + const notifier = createAlertNotifier({ + webhookUrl: `http://127.0.0.1:${address.port}/alerts`, + }); + + const raised = { + alert_code: 'near_intents_quotes_stale', + status: 'raised', + severity: 'critical', + service_scope: 'near-intents-ingest', + reason: 'quote truth stale', + pair: 'btc->eure', + raised_at: '2026-04-08T10:00:00.000Z', + cleared_at: null, + last_evaluated_at: '2026-04-08T10:00:00.000Z', + details: {}, + }; + const cleared = { + ...raised, + status: 'cleared', + cleared_at: '2026-04-08T10:01:00.000Z', + }; + + const first = await notifier.notify(raised); + const second = await notifier.notify(raised); + const third = await notifier.notify(cleared); + + assert.equal(first.ok, true); + assert.equal(second.deduped, true); + assert.equal(third.ok, true); + assert.equal(requests.length, 2); + assert.equal(requests[0].alert.status, 'raised'); + assert.equal(requests[1].alert.status, 'cleared'); + + await new Promise((resolve, reject) => server.close((error) => (error ? reject(error) : resolve()))); +}); diff --git a/test/operator-dashboard.test.mjs b/test/operator-dashboard.test.mjs new file mode 100644 index 0000000..632b380 --- /dev/null +++ b/test/operator-dashboard.test.mjs @@ -0,0 +1,605 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { + applyDashboardLiveEvent, + buildDashboardBootstrap, + buildProfitabilitySummary, + createDashboardLiveState, + resolveDashboardControl, +} from '../src/core/operator-dashboard.mjs'; +import { + buildDashboardSessionToken, + parseBasicAuthorizationHeader, + resolveDashboardRequestAuth, +} from '../src/core/operator-dashboard-auth.mjs'; + +function buildConfig() { + const tradingBtc = { + assetId: 'nep141:btc.omft.near', + symbol: 'BTC', + decimals: 8, + chain: 'btc:mainnet', + }; + const tradingEure = { + assetId: 'nep141:eure.omft.near', + symbol: 'EURe', + decimals: 18, + chain: 'eth:100', + }; + + return { + activePair: `${tradingBtc.assetId}->${tradingEure.assetId}`, + operatorDashboardQuoteLimit: 10, + tradingBtc, + tradingEure, + assetRegistry: new Map([ + [tradingBtc.assetId, tradingBtc], + [tradingEure.assetId, tradingEure], + ]), + }; +} + +test('profitability summary separates baseline, hold, market move, and trading contribution', () => { + const summary = buildProfitabilitySummary({ + metric: { + computed_at: '2026-04-04T09:05:00.000Z', + baseline_anchor_at: '2026-04-04T08:00:00.000Z', + baseline_status: 'active', + payload: { + current_portfolio_value_eure: '110', + baseline_portfolio_value_eure_at_baseline_price: '100', + baseline_portfolio_value_eure_at_current_price: '105', + }, + }, + successfulTradeSummary: { + total: 4, + last_successful_trade_at: '2026-04-04T09:00:00.000Z', + }, + }); + + assert.equal(summary.pnl_vs_deposit_baseline_eure, '10'); + assert.equal(summary.pnl_vs_simple_hold_eure, '5'); + assert.equal(summary.market_move_contribution_eure, '5'); + assert.equal(summary.trading_contribution_eure, '5'); + assert.equal(summary.computed_at, '2026-04-04T09:05:00.000Z'); + assert.equal(summary.recent_trade_count, 4); + assert.equal(summary.last_successful_trade_at, '2026-04-04T09:00:00.000Z'); +}); + +test('profitability summary flags cash-flow-adjusted benchmarks after later funding changes', () => { + const summary = buildProfitabilitySummary({ + metric: { + computed_at: '2026-04-07T15:43:30.463Z', + baseline_anchor_at: '2026-04-02T18:10:43.569Z', + baseline_status: 'active', + payload: { + current_portfolio_value_eure: '144.627100025978799978', + baseline_portfolio_value_eure_at_baseline_price: '141.7921998', + baseline_portfolio_value_eure_at_current_price: '142.8458998', + external_cash_flows: { + flow_count: 2, + deposit_count: 1, + withdrawal_count: 1, + latest_effective_at: '2026-04-07T15:20:54.757Z', + net_value_eure_at_flow_time: '23.9999998', + }, + }, + }, + successfulTradeSummary: { + total: 7, + last_successful_trade_at: '2026-04-02T20:17:44.768Z', + }, + }); + + assert.equal(summary.external_flow_adjusted, true); + assert.equal(summary.external_flow_count, 2); + assert.equal(summary.pnl_vs_deposit_baseline_eure, '2.834900225978799978'); + assert.equal(summary.pnl_vs_simple_hold_eure, '1.781200225978799978'); + assert.equal(summary.market_move_contribution_eure, '1.0537'); + assert.match(summary.caveats[0], /external cash flows/); +}); + +test('control routing only resolves the allowlisted safe dashboard actions', () => { + const refresh = resolveDashboardControl({ + service: 'liquidity-manager', + action: 'refresh', + }); + const risky = resolveDashboardControl({ + service: 'strategy-engine', + action: 'arm', + }); + + assert.equal(refresh?.path, '/refresh'); + assert.equal(refresh?.risk_class, 'safe'); + assert.equal(risky, null); +}); + +test('basic auth resolves operator identity and reuses a session cookie', () => { + const authorizationHeader = `Basic ${Buffer.from('admin:secret-password').toString('base64')}`; + const first = resolveDashboardRequestAuth({ + mode: 'basic', + authorizationHeader, + username: 'admin', + password: 'secret-password', + }); + + const token = buildDashboardSessionToken({ + username: 'admin', + password: 'secret-password', + }); + const second = resolveDashboardRequestAuth({ + mode: 'basic', + cookieHeader: `operator_dashboard_session=${token}`, + username: 'admin', + password: 'secret-password', + }); + + assert.equal(parseBasicAuthorizationHeader(authorizationHeader).username, 'admin'); + assert.equal(first.authenticated, true); + assert.equal(first.setSessionCookie, true); + assert.equal(second.authenticated, true); + assert.equal(second.via, 'session_cookie'); +}); + +test('live quote updates stay capped at ten items and successful trades update live counters', () => { + const config = buildConfig(); + const state = createDashboardLiveState({ + config, + successfulTradeCount: 2, + lastSuccessfulTradeAt: '2026-04-04T08:00:00.000Z', + }); + + for (let index = 0; index < 11; index += 1) { + applyDashboardLiveEvent(state, { + topic: 'norm.swap_demand', + event: { + observed_at: `2026-04-04T08:00:${String(index).padStart(2, '0')}.000Z`, + ingested_at: `2026-04-04T08:00:${String(index).padStart(2, '0')}.000Z`, + payload: { + quote_id: `quote-${index}`, + asset_in: config.tradingBtc.assetId, + asset_out: config.tradingEure.assetId, + pair: config.activePair, + request_kind: 'exact_in', + amount_in: '100', + amount_out: '200', + }, + }, + }); + } + + const updates = applyDashboardLiveEvent(state, { + topic: 'exec.trade_result', + event: { + observed_at: '2026-04-04T08:30:00.000Z', + ingested_at: '2026-04-04T08:30:00.000Z', + payload: { + status: 'submitted', + }, + }, + }); + + assert.equal(state.recent_quotes.length, 10); + assert.equal(state.recent_quotes[0].quote_id, 'quote-10'); + assert.equal(state.recent_quotes.at(-1).quote_id, 'quote-1'); + assert.equal(state.successful_trade_count, 3); + assert.equal(state.last_successful_trade_at, '2026-04-04T08:30:00.000Z'); + assert.equal(updates[0].type, 'status_bar.updated'); +}); + +test('bootstrap aggregation keeps Funds as default and carries live control state', () => { + const config = buildConfig(); + const bootstrap = buildDashboardBootstrap({ + config, + auth: { + authenticated: true, + subject: 'local-operator', + mode: 'stub', + roles: ['operator'], + }, + portfolioMetric: { + computed_at: '2026-04-04T09:05:00.000Z', + baseline_anchor_at: '2026-04-04T08:00:00.000Z', + baseline_status: 'active', + payload: { + current_portfolio_value_eure: '110', + baseline_portfolio_value_eure_at_baseline_price: '100', + baseline_portfolio_value_eure_at_current_price: '105', + }, + }, + inventorySnapshot: { + ingested_at: '2026-04-04T09:00:00.000Z', + payload: { + synced_at: '2026-04-04T09:00:00.000Z', + reconciliation_status: 'ok', + spendable: { + [config.tradingBtc.assetId]: '100000000', + [config.tradingEure.assetId]: '1000000000000000000', + }, + pending_inbound: { + [config.tradingBtc.assetId]: '0', + [config.tradingEure.assetId]: '0', + }, + pending_outbound: { + [config.tradingBtc.assetId]: '0', + [config.tradingEure.assetId]: '0', + }, + }, + }, + marketPrice: { + ingested_at: '2026-04-04T09:00:00.000Z', + payload: { + observed_at: '2026-04-04T09:00:00.000Z', + eure_per_btc: '100', + }, + }, + recentQuotes: [], + successfulTrades: { + page: 1, + page_size: 20, + total: 0, + total_pages: 1, + items: [], + }, + successfulTradeSummary: { + total: 1, + last_successful_trade_at: '2026-04-04T09:30:00.000Z', + }, + fundingObservations: [ + { + payload: { + funding_observation_id: 'fund-1', + asset_id: config.tradingBtc.assetId, + chain: config.tradingBtc.chain, + funding_handle: 'btc-address', + tx_hash: 'tx-1', + status: 'CREDITED', + amount: '100000000', + confirmations: 3, + first_seen_at: '2026-04-04T07:30:00.000Z', + last_seen_at: '2026-04-04T07:40:00.000Z', + credited_at: '2026-04-04T07:45:00.000Z', + }, + }, + ], + recentTradeDecisions: [ + { + observed_at: '2026-04-04T09:10:00.000Z', + ingested_at: '2026-04-04T09:10:01.000Z', + payload: { + decision_id: 'decision-1', + quote_id: 'quote-1', + pair: config.activePair, + decision: 'rejected', + decision_reason: 'strategy_disarmed', + }, + }, + ], + recentAlertTransitions: [], + serviceSnapshots: [ + { + service: 'liquidity-manager', + label: 'Liquidity Manager', + base_url: 'http://liquidity-manager', + reachable: true, + health: { ok: true }, + state: { + paused: false, + funding_observer_paused: false, + withdrawals_frozen: true, + withdrawal_defaults: { + [config.tradingBtc.assetId]: 'btc-destination', + }, + deposit_addresses: { + [config.tradingBtc.chain]: { + address: 'btc-address', + refreshed_at: '2026-04-04T09:00:00.000Z', + }, + }, + tracked_withdrawals: {}, + }, + }, + { + service: 'ops-sentinel', + label: 'Ops Sentinel', + base_url: 'http://ops-sentinel', + reachable: true, + health: { ok: true }, + state: { + active_alerts: [], + recent_transitions: [], + }, + }, + { + service: 'strategy-engine', + label: 'Strategy Engine', + base_url: 'http://strategy-engine', + reachable: true, + health: { ok: true }, + state: { + armed: true, + paused: false, + threshold_pct: 2, + max_notional_eure: 5, + latest_decision: { + decision_id: 'decision-1', + quote_id: 'quote-1', + pair: config.activePair, + decision: 'rejected', + decision_reason: 'strategy_disarmed', + }, + recent_decisions: [{ + decision_id: 'decision-1', + quote_id: 'quote-1', + pair: config.activePair, + decision: 'rejected', + decision_reason: 'strategy_disarmed', + }], + skipped_counts: {}, + }, + }, + { + service: 'trade-executor', + label: 'Trade Executor', + base_url: 'http://trade-executor', + reachable: true, + health: { ok: true }, + state: { + armed: true, + paused: false, + draining: false, + in_flight_count: 0, + completed_count: 1, + }, + }, + { + service: 'history-writer', + label: 'History Writer', + base_url: 'http://history-writer', + reachable: true, + health: { ok: true }, + state: { + database_connectivity: true, + offsets: {}, + }, + }, + ], + }); + + assert.equal(bootstrap.default_page, 'funds'); + assert.equal(bootstrap.funds.profitability.computed_at, '2026-04-04T09:05:00.000Z'); + assert.equal(bootstrap.funds.funding.control_state.withdrawals_frozen, true); + assert.equal(bootstrap.funds.funding.handles[0].address, 'btc-address'); + assert.equal(bootstrap.status_bar.strategy_armed, true); + assert.equal(bootstrap.status_bar.executor_armed, true); + assert.equal(bootstrap.strategy.strategy_state.recent_decisions[0].decision_at, '2026-04-04T09:10:00.000Z'); + assert.equal(bootstrap.strategy.strategy_state.recent_decisions[0].decision_reason, 'strategy_disarmed'); +}); + +test('system service health uses sentinel-derived severity so stale ingest is never shown healthy', () => { + const config = buildConfig(); + const bootstrap = buildDashboardBootstrap({ + config, + auth: { + authenticated: true, + subject: 'local-operator', + mode: 'stub', + roles: ['operator'], + }, + portfolioMetric: null, + inventorySnapshot: null, + marketPrice: null, + recentQuotes: [], + successfulTrades: { + page: 1, + page_size: 20, + total: 0, + total_pages: 1, + items: [], + }, + successfulTradeSummary: { + total: 0, + last_successful_trade_at: null, + }, + fundingObservations: [], + recentTradeDecisions: [], + recentAlertTransitions: [], + serviceSnapshots: [ + { + service: 'near-intents-ingest', + label: 'Intents Ingest', + base_url: 'http://near-intents-ingest', + reachable: true, + health: { ok: true }, + state: { + ingest: { + connected: true, + last_message_at: '2026-04-04T09:00:00.000Z', + last_matching_quote_at: '2026-04-04T09:00:00.000Z', + last_published_at: '2026-04-03T02:12:00.000Z', + }, + }, + }, + { + service: 'ops-sentinel', + label: 'Ops Sentinel', + base_url: 'http://ops-sentinel', + reachable: true, + health: { ok: true }, + state: { + active_alerts: [{ + alert_code: 'near_intents_publish_stale', + status: 'raised', + severity: 'critical', + reason: 'published quote freshness is stale', + service_scope: 'near-intents-ingest', + pair: config.activePair, + raised_at: '2026-04-04T09:30:00.000Z', + first_raised_at: '2026-04-04T09:30:00.000Z', + cleared_at: null, + last_evaluated_at: '2026-04-04T09:30:00.000Z', + details: { + publish_age_ms: 110_880_000, + }, + }], + recent_transitions: [], + service_health: [{ + service: 'near-intents-ingest', + status: 'critical', + reachable: true, + paused: false, + armed: null, + health_ok: false, + highest_alert_severity: 'critical', + reasons: ['critical alert active (near_intents_publish_stale)'], + freshness_at: '2026-04-03T02:12:00.000Z', + freshness_age_ms: 110_880_000, + }], + }, + }, + ], + }); + + const ingest = bootstrap.system.service_health.find((service) => service.service === 'near-intents-ingest'); + assert.equal(ingest.health_ok, false); + assert.equal(ingest.health_status, 'critical'); + assert.match(ingest.health_reasons.join(' '), /critical alert active/); + assert.equal(bootstrap.status_bar.highest_alert_severity, 'critical'); +}); + +test('funding summary includes credited bridge deposits without observer-backed funding observations', () => { + const config = buildConfig(); + const bootstrap = buildDashboardBootstrap({ + config, + auth: { + authenticated: true, + subject: 'local-operator', + mode: 'stub', + roles: ['operator'], + }, + portfolioMetric: null, + inventorySnapshot: null, + marketPrice: null, + recentQuotes: [], + successfulTrades: { + page: 1, + page_size: 20, + total: 0, + total_pages: 1, + items: [], + }, + successfulTradeSummary: { + total: 0, + last_successful_trade_at: null, + }, + fundingObservations: [], + recentDepositStatuses: [ + { + observed_at: '2026-04-07T15:20:00.000Z', + ingested_at: '2026-04-07T15:20:01.000Z', + payload: { + action_type: 'deposit_status_observed', + chain: config.tradingEure.chain, + asset_id: config.tradingEure.assetId, + status: 'COMPLETED', + details: { + tx_hash: 'eth-tx-1', + address: '0xdeposit', + amount: '24999999800000000000', + }, + }, + }, + ], + recentTradeDecisions: [], + recentAlertTransitions: [], + serviceSnapshots: [ + { + service: 'liquidity-manager', + label: 'Liquidity Manager', + base_url: 'http://liquidity-manager', + reachable: true, + health: { ok: true }, + state: { + paused: false, + funding_observer_paused: false, + withdrawals_frozen: true, + withdrawal_defaults: {}, + deposit_addresses: { + [config.tradingEure.chain]: { + address: '0xdeposit', + refreshed_at: '2026-04-07T15:20:10.000Z', + }, + }, + deposits: { + eurDeposit: { + tx_hash: 'eth-tx-1', + chain: config.tradingEure.chain, + asset_id: config.tradingEure.assetId, + amount: '24999999800000000000', + address: '0xdeposit', + status: 'COMPLETED', + }, + }, + tracked_withdrawals: {}, + last_refresh_at: '2026-04-07T15:20:10.000Z', + }, + }, + { + service: 'ops-sentinel', + label: 'Ops Sentinel', + base_url: 'http://ops-sentinel', + reachable: true, + health: { ok: true }, + state: { + active_alerts: [], + recent_transitions: [], + }, + }, + { + service: 'strategy-engine', + label: 'Strategy Engine', + base_url: 'http://strategy-engine', + reachable: true, + health: { ok: true }, + state: { + armed: true, + paused: false, + recent_decisions: [], + skipped_counts: {}, + }, + }, + { + service: 'trade-executor', + label: 'Trade Executor', + base_url: 'http://trade-executor', + reachable: true, + health: { ok: true }, + state: { + armed: true, + paused: false, + draining: false, + in_flight_count: 0, + completed_count: 0, + }, + }, + { + service: 'history-writer', + label: 'History Writer', + base_url: 'http://history-writer', + reachable: true, + health: { ok: true }, + state: { + database_connectivity: true, + offsets: {}, + }, + }, + ], + }); + + assert.equal(bootstrap.funds.funding.latest_observed_at, '2026-04-07T15:20:00.000Z'); + assert.equal(bootstrap.funds.funding.credited_deposits[0].asset_id, config.tradingEure.assetId); + assert.equal(bootstrap.funds.funding.credited_deposits[0].amount, '24.9999998'); + assert.equal(bootstrap.funds.funding.recent_observations[0].tx_hash, 'eth-tx-1'); + assert.equal(bootstrap.funds.recent_deposits[0].tx_hash, 'eth-tx-1'); +});