import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { createAlertEngine } from '../core/alert-engine.mjs'; import { createAlertNotifier } from '../core/alert-notifier.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { buildNearIntentsStatusEventPayload, normalizeNearIntentsStatus, } from '../core/near-intents-status.mjs'; import { listDashboardServices } from '../core/operator-dashboard.mjs'; import { ageMs, buildMakerCompetitivenessRuntimeAlerts, buildRuntimeAlert, createRuntimeHealthThresholds, evaluateRuntimeHealth, shouldContainExecutorForAlerts, shouldRaiseIngestPublishStale, } from '../core/runtime-health.mjs'; import { summarizeServiceSnapshotForSentinel } from '../core/service-snapshot-summary.mjs'; import { assertEnvironmentStatusEvent, assertFundingObservationEvent, assertInventorySnapshotEvent, assertLiquidityActionEvent, assertMarketPriceEvent, assertOpsAlertEvent, assertTradeResult, } from '../core/schemas.mjs'; import { loadConfig } from '../lib/config.mjs'; import { fetchJson } from '../lib/http.mjs'; import { createPostgresPool, createTradingConfigStore, ensureHistorySchema, seedTradingConfig, } from '../lib/postgres.mjs'; const config = loadConfig(); const logger = createLogger({ service: 'ops-sentinel', component: 'alerts', namespace: config.projectNamespace, }); const configPool = createPostgresPool({ connectionString: config.postgresUrl, }); await ensureHistorySchema(configPool); await seedTradingConfig(configPool); const tradingConfigStore = createTradingConfigStore({ pool: configPool, logger: logger.child({ component: 'trading-config' }), }); const initialTradingConfig = await tradingConfigStore.forceRefresh(); Object.assign(config, { ...initialTradingConfig, assetRegistry: initialTradingConfig.assetRegistry || config.assetRegistry, trackedAssets: initialTradingConfig.trackedAssets?.length ? initialTradingConfig.trackedAssets : config.trackedAssets, activePair: initialTradingConfig.activePair || config.activePair, }); const thresholds = createRuntimeHealthThresholds(config); const producer = await createProducer({ brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupOpsSentinel, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const topics = [ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, config.kafkaTopicOpsLiquidityAction, config.kafkaTopicOpsFundingObservation, config.kafkaTopicExecTradeResult, ]; const state = { paused: false, last_error: null, last_event_at: null, publish_count: 0, last_runtime_eval_at: null, service_snapshots: [], service_health: [], latest_runtime_alerts: [], near_intents_status: null, last_environment_status_poll_at: null, last_environment_status_publish_at: null, last_environment_status_duplicate_at: null, last_environment_status_error: null, last_environment_status_fingerprint: null, environment_status_publish_count: 0, containment: { executor_auto_disarmed: null, last_action_at: null, last_action_reason: null, last_action_result: null, }, anomaly_samples: [], }; const alertEngine = createAlertEngine({ activePair: config.activePair, activePairs: (config.observedPairs || config.pairs || []).map((pair) => pair.key || pair.pairId).filter(Boolean), priceRoutes: (config.pairs || []) .filter((pair) => pair.priceRoute?.routeId) .map((pair) => ({ pair: pair.key || pair.pairId, price_route_id: pair.priceRoute.routeId, reference_pair: pair.priceRoute.routeConfig?.reference_pair || pair.priceRoute.source || null, })), priceStaleMs: config.opsSentinelPriceStaleMs, inventoryStaleMs: config.opsSentinelInventoryStaleMs, fundingCreditPendingMs: config.opsSentinelFundingCreditPendingMs, fundingStuckMs: config.opsSentinelFundingStuckMs, evaluationIntervalMs: config.opsSentinelEvaluationMs, }); const notifier = createAlertNotifier({ webhookUrl: config.opsSentinelAlertWebhookUrl, webhookTimeoutMs: config.opsSentinelAlertWebhookTimeoutMs, logger: logger.child({ component: 'webhook-notifier' }), }); const monitoredServices = listDashboardServices(config); for (const topic of topics) { await consumer.subscribe({ topic, fromBeginning: true }); } await consumer.run({ eachMessage: async ({ topic, message }) => { if (!message.value || state.paused) return; try { const event = parseEventMessage(message.value.toString()); normalizePayloadForAlert(topic, event); state.last_error = null; state.last_event_at = new Date().toISOString(); } catch (error) { state.last_error = serializeError(error); logger.error('ops_sentinel_consume_failed', { topic, details: { error: serializeError(error), }, }); } }, }); const timer = setInterval(() => { if (state.paused) return; evaluateRuntimeHealthLoop().catch((error) => { state.last_error = serializeError(error); logger.error('ops_sentinel_runtime_evaluate_failed', { topic: config.kafkaTopicOpsAlert, details: { error: serializeError(error), }, }); }); }, config.opsSentinelEvaluationMs); timer.unref?.(); const environmentStatusTimer = setInterval(() => { if (state.paused) return; pollNearIntentsEnvironmentStatus().catch((error) => { state.last_environment_status_error = serializeError(error); logger.error('near_intents_environment_status_poll_failed', { topic: config.kafkaTopicOpsEnvironmentStatus, details: { error: serializeError(error), }, }); }); }, config.nearIntentsStatusPollMs); environmentStatusTimer.unref?.(); pollNearIntentsEnvironmentStatus().catch((error) => { state.last_environment_status_error = serializeError(error); logger.error('near_intents_environment_status_initial_poll_failed', { topic: config.kafkaTopicOpsEnvironmentStatus, details: { error: serializeError(error), }, }); }); const controlApi = startControlApi({ host: config.opsSentinelControlHost, port: config.opsSentinelControlPort, logger: logger.child({ component: 'control-api' }), service: 'ops-sentinel', namespace: config.projectNamespace, stateProvider: { getState() { return { paused: state.paused, publish_count: state.publish_count, last_error: state.last_error, last_event_at: state.last_event_at, last_runtime_eval_at: state.last_runtime_eval_at, service_snapshots: state.service_snapshots, service_health: state.service_health, latest_runtime_alerts: state.latest_runtime_alerts, near_intents_status: state.near_intents_status, last_environment_status_poll_at: state.last_environment_status_poll_at, last_environment_status_publish_at: state.last_environment_status_publish_at, last_environment_status_duplicate_at: state.last_environment_status_duplicate_at, last_environment_status_error: state.last_environment_status_error, last_environment_status_fingerprint: state.last_environment_status_fingerprint, environment_status_publish_count: state.environment_status_publish_count, containment: state.containment, notifier: notifier.getState(), trading_config: tradingConfigStore.getState(), anomaly_samples: state.anomaly_samples.slice(-thresholds.anomalyWindowSize), active_alerts: state.latest_runtime_alerts, recent_transitions: [], }; }, }, healthProvider: { getHealth() { const staleMs = ageMs(state.last_runtime_eval_at); return { ok: !state.paused && tradingConfigStore.getState().ok === true && (staleMs == null || staleMs <= thresholds.sentinelStaleMs), paused: state.paused, trading_config_ok: tradingConfigStore.getState().ok, trading_config_block_reason: tradingConfigStore.getState().block_reason, last_event_at: state.last_event_at, last_runtime_eval_at: state.last_runtime_eval_at, last_error: state.last_error, stale: staleMs != null && staleMs > thresholds.sentinelStaleMs, stale_after_ms: thresholds.sentinelStaleMs, reason: tradingConfigStore.getState().ok === true ? null : tradingConfigStore.getState().block_reason || 'trading config unavailable', }; }, }, routes: [ { method: 'POST', path: '/pause', handler: () => { state.paused = true; consumer.pause(topics.map((topic) => ({ topic }))); return { ok: true, paused: true }; }, }, { method: 'POST', path: '/resume', handler: () => { state.paused = false; consumer.resume(topics.map((topic) => ({ topic }))); return { ok: true, paused: false }; }, }, { method: 'POST', path: '/evaluate', handler: async () => { await evaluateRuntimeHealthLoop(); return { ok: true, evaluated_at: state.last_runtime_eval_at }; }, }, ], }); async function evaluateRuntimeHealthLoop() { const now = new Date().toISOString(); const previousRuntimeEvalAt = state.last_runtime_eval_at; const serviceSnapshots = await Promise.all(monitoredServices.map(loadServiceSnapshot)); state.service_snapshots = serviceSnapshots.map(summarizeServiceSnapshotForSentinel); state.last_runtime_eval_at = now; const servicesByName = Object.fromEntries(serviceSnapshots.map((snapshot) => [snapshot.service, snapshot])); const anomalyAlerts = buildAnomalyAlerts({ servicesByName, now }); const runtimeAlerts = buildDeterministicRuntimeAlerts({ servicesByName, now, previousRuntimeEvalAt }); const desiredRuntimeAlerts = [...runtimeAlerts, ...anomalyAlerts]; state.service_health = [...evaluateRuntimeHealth({ servicesByName, activePair: config.activePair, activePairs: (config.observedPairs || config.pairs || []).map((pair) => pair.key || pair.pairId).filter(Boolean), activeAlerts: desiredRuntimeAlerts, now, }).values()]; state.latest_runtime_alerts = desiredRuntimeAlerts; state.containment.executor_auto_disarmed = null; state.containment.last_action_at = now; state.containment.last_action_reason = 'automatic_executor_containment_disabled'; state.containment.last_action_result = { ok: true, automatic_containment_enabled: false, }; } async function loadServiceSnapshot(service) { const [stateResult, healthResult] = await Promise.allSettled([ fetchUpstreamJson(`${service.base_url}/state`), fetchUpstreamJson(`${service.base_url}/healthz`), ]); const statePayload = stateResult.status === 'fulfilled' ? stateResult.value : null; const healthPayload = healthResult.status === 'fulfilled' ? healthResult.value : null; const error = stateResult.status === 'rejected' ? serializeError(stateResult.reason) : healthResult.status === 'rejected' ? serializeError(healthResult.reason) : null; return { ...service, reachable: Boolean(statePayload || healthPayload), state: statePayload, health: healthPayload, error, }; } async function fetchUpstreamJson(url) { return fetchJson(url, { signal: AbortSignal.timeout(config.operatorDashboardUpstreamTimeoutMs), }); } function buildDeterministicRuntimeAlerts({ servicesByName, now, previousRuntimeEvalAt = null }) { const alerts = []; const ingest = servicesByName['near-intents-ingest']; const ingestState = ingest?.state?.ingest || {}; const ingestHealth = ingest?.health || {}; const matchingQuoteAgeMs = ageMs(ingestState.last_matching_quote_at, now); const publishedAgeMs = ageMs(ingestState.last_published_at, now); const messageAgeMs = ageMs(ingestState.last_message_at, now); if (!ingest?.reachable || ingestState.connected === false || ingestHealth.connected === false) { alerts.push(buildRuntimeAlert({ alert_code: 'near_intents_ingest_disconnected', severity: 'critical', reason: 'near-intents-ingest websocket is disconnected or unreachable', service_scope: 'near-intents-ingest', pair: config.activePair, details: { reachable: ingest?.reachable ?? false, connected: ingestState.connected ?? ingestHealth.connected ?? null, last_message_at: ingestState.last_message_at || null, last_connected_at: ingestState.last_connected_at || null, last_disconnected_at: ingestState.last_disconnected_at || null, }, })); } if (matchingQuoteAgeMs == null || matchingQuoteAgeMs > thresholds.ingestQuoteStaleMs) { alerts.push(buildRuntimeAlert({ alert_code: 'near_intents_quotes_stale', severity: 'critical', reason: matchingQuoteAgeMs == null ? 'near-intents-ingest has not observed a matching quote' : `matching quote freshness ${matchingQuoteAgeMs}ms exceeds ${thresholds.ingestQuoteStaleMs}ms`, service_scope: 'near-intents-ingest', pair: config.activePair, details: { last_matching_quote_at: ingestState.last_matching_quote_at || null, age_ms: matchingQuoteAgeMs, stale_after_ms: thresholds.ingestQuoteStaleMs, last_message_at: ingestState.last_message_at || null, message_age_ms: messageAgeMs, }, })); } if (shouldRaiseIngestPublishStale({ lastMatchingQuoteAt: ingestState.last_matching_quote_at || null, lastPublishedAt: ingestState.last_published_at || null, matchingQuoteAgeMs, publishedAgeMs, publishStaleMs: thresholds.ingestPublishStaleMs, })) { alerts.push(buildRuntimeAlert({ alert_code: 'near_intents_publish_stale', severity: 'critical', reason: publishedAgeMs == null ? 'near-intents-ingest has not published a matching quote' : `published quote freshness ${publishedAgeMs}ms exceeds ${thresholds.ingestPublishStaleMs}ms`, service_scope: 'near-intents-ingest', pair: config.activePair, details: { last_matching_quote_at: ingestState.last_matching_quote_at || null, last_published_at: ingestState.last_published_at || null, quote_age_ms: matchingQuoteAgeMs, publish_age_ms: publishedAgeMs, stale_after_ms: thresholds.ingestPublishStaleMs, }, })); } const executor = servicesByName['trade-executor']; const relay = executor?.state?.relay || {}; const relayAgeMs = ageMs(relay.last_message_at, now); if (!executor?.reachable || relay.connected === false || (relayAgeMs != null && relayAgeMs > thresholds.executorRelayStaleMs)) { alerts.push(buildRuntimeAlert({ alert_code: 'trade_executor_relay_disconnected', severity: 'critical', reason: !executor?.reachable || relay.connected === false ? 'trade-executor solver relay is disconnected or unreachable' : `trade-executor relay freshness ${relayAgeMs}ms exceeds ${thresholds.executorRelayStaleMs}ms`, service_scope: 'trade-executor', pair: config.activePair, details: { reachable: executor?.reachable ?? false, connected: relay.connected ?? null, last_message_at: relay.last_message_at || null, age_ms: relayAgeMs, stale_after_ms: thresholds.executorRelayStaleMs, }, })); } const writer = servicesByName['history-writer']; const writerState = writer?.state || {}; const writerAgeMs = ageMs(writerState.last_write_at, now); const rawOffset = parseOffset(writerState.offsets?.[config.kafkaTopicRawNearIntentsQuote]?.offset); const normOffset = parseOffset(writerState.offsets?.[config.kafkaTopicNormSwapDemand]?.offset); const ingestPublishedCount = Number(ingestState.published_count || 0); const lastSample = state.anomaly_samples.at(-1) || null; const writerProgressed = lastSample ? rawOffset > lastSample.raw_offset || normOffset > lastSample.norm_offset : true; if ( !writer?.reachable || writerState.database_connectivity === false || writerAgeMs == null || writerAgeMs > thresholds.historyWriterStaleMs || (lastSample && ingestPublishedCount > lastSample.ingest_published_count && !writerProgressed) ) { alerts.push(buildRuntimeAlert({ alert_code: 'history_writer_stalled', severity: 'critical', reason: !writer?.reachable ? 'history-writer is unreachable' : writerState.database_connectivity === false ? 'history-writer lost database connectivity' : lastSample && ingestPublishedCount > lastSample.ingest_published_count && !writerProgressed ? 'ingest published quotes but durable history offsets stopped advancing' : `history-writer freshness ${writerAgeMs}ms exceeds ${thresholds.historyWriterStaleMs}ms`, service_scope: 'history-writer', pair: config.activePair, details: { last_write_at: writerState.last_write_at || null, age_ms: writerAgeMs, stale_after_ms: thresholds.historyWriterStaleMs, raw_offset: rawOffset, normalized_offset: normOffset, ingest_published_count: ingestPublishedCount, }, })); } const dashboard = servicesByName['operator-dashboard']; const dashboardState = dashboard?.state || {}; alerts.push(...buildMakerCompetitivenessRuntimeAlerts({ makerCompetitiveness: dashboardState.latest_maker_competitiveness || dashboardState.maker_competitiveness || null, })); const dashboardSourceErrorCount = Number( dashboardState.source_error_count || dashboard?.health?.source_error_count || 0, ); const dashboardBootstrapAgeMs = ageMs(dashboardState.last_bootstrap_at, now); if ( !dashboard?.reachable || dashboardSourceErrorCount > 0 || (dashboardBootstrapAgeMs != null && dashboardBootstrapAgeMs > thresholds.dashboardSourceDegradedMs) ) { alerts.push(buildRuntimeAlert({ alert_code: 'operator_dashboard_source_degraded', severity: 'warning', reason: !dashboard?.reachable ? 'operator-dashboard is unreachable' : dashboardSourceErrorCount > 0 ? 'operator-dashboard has upstream source errors' : `operator-dashboard bootstrap freshness ${dashboardBootstrapAgeMs}ms exceeds ${thresholds.dashboardSourceDegradedMs}ms`, service_scope: 'operator-dashboard', pair: config.activePair, details: { source_error_count: dashboardSourceErrorCount, last_source_error_at: dashboardState.last_source_error_at || null, last_bootstrap_at: dashboardState.last_bootstrap_at || null, bootstrap_age_ms: dashboardBootstrapAgeMs, }, })); } const selfAgeMs = ageMs(previousRuntimeEvalAt, now); if (selfAgeMs != null && selfAgeMs > thresholds.sentinelStaleMs) { alerts.push(buildRuntimeAlert({ alert_code: 'sentinel_stale', severity: 'critical', reason: `ops-sentinel evaluation freshness ${selfAgeMs}ms exceeds ${thresholds.sentinelStaleMs}ms`, service_scope: 'ops-sentinel', pair: config.activePair, details: { last_runtime_eval_at: state.last_runtime_eval_at, previous_runtime_eval_at: previousRuntimeEvalAt, age_ms: selfAgeMs, stale_after_ms: thresholds.sentinelStaleMs, }, })); } if (notifier.getState().last_delivery_status === 'failed') { alerts.push(buildRuntimeAlert({ alert_code: 'sentinel_alert_delivery_failed', severity: 'warning', reason: 'external alert delivery failed', service_scope: 'ops-sentinel', pair: config.activePair, details: notifier.getState(), })); } const executorArmed = executor?.state?.armed === true; const criticalTruthFailure = shouldContainExecutorForAlerts(alerts); if (executorArmed && criticalTruthFailure) { alerts.push(buildRuntimeAlert({ alert_code: 'executor_armed_with_stale_truth', severity: 'critical', reason: 'trade-executor remains armed while the upstream truth path is critically broken', service_scope: 'trade-executor', pair: config.activePair, details: { armed: true, containment_available: true, recommended_action: 'disarm', }, })); } return alerts; } function buildAnomalyAlerts({ servicesByName, now }) { const ingestState = servicesByName['near-intents-ingest']?.state?.ingest || {}; const writerState = servicesByName['history-writer']?.state || {}; const nextSample = { at: now, ingest_published_count: Number(ingestState.published_count || 0), ingest_reconnect_count: Number(ingestState.reconnect_count || 0), raw_offset: parseOffset(writerState.offsets?.[config.kafkaTopicRawNearIntentsQuote]?.offset), norm_offset: parseOffset(writerState.offsets?.[config.kafkaTopicNormSwapDemand]?.offset), }; state.anomaly_samples.push(nextSample); state.anomaly_samples = state.anomaly_samples.slice(-(thresholds.anomalyWindowSize + 1)); if (state.anomaly_samples.length < thresholds.anomalyWindowSize) { return []; } const windows = []; for (let index = 1; index < state.anomaly_samples.length; index += 1) { const previous = state.anomaly_samples[index - 1]; const current = state.anomaly_samples[index]; windows.push({ quote_delta: Math.max(0, current.ingest_published_count - previous.ingest_published_count), reconnect_delta: Math.max(0, current.ingest_reconnect_count - previous.ingest_reconnect_count), durable_delta: Math.max(0, current.norm_offset - previous.norm_offset), }); } const currentWindow = windows.at(-1); const baseline = windows.slice(0, -1); const averageQuoteDelta = average(baseline.map((entry) => entry.quote_delta)); const averageReconnectDelta = average(baseline.map((entry) => entry.reconnect_delta)); const averageDurableDelta = average(baseline.map((entry) => entry.durable_delta)); const alerts = []; if ( averageQuoteDelta > 0 && currentWindow.quote_delta <= averageQuoteDelta * thresholds.anomalyQuoteRateCollapseRatio ) { alerts.push(buildRuntimeAlert({ alert_code: 'near_intents_quote_rate_collapse', severity: 'warning', reason: 'quote publish rate collapsed versus recent baseline', service_scope: 'near-intents-ingest', pair: config.activePair, details: { current_window_quote_delta: currentWindow.quote_delta, baseline_average_quote_delta: averageQuoteDelta, collapse_ratio: thresholds.anomalyQuoteRateCollapseRatio, }, })); } if ( averageReconnectDelta >= 0 && currentWindow.reconnect_delta > 0 && currentWindow.reconnect_delta >= Math.max(2, averageReconnectDelta * thresholds.anomalyReconnectSpikeMultiplier) ) { alerts.push(buildRuntimeAlert({ alert_code: 'near_intents_reconnect_spike', severity: 'warning', reason: 'near-intents reconnect frequency spiked versus recent baseline', service_scope: 'near-intents-ingest', pair: config.activePair, details: { current_window_reconnect_delta: currentWindow.reconnect_delta, baseline_average_reconnect_delta: averageReconnectDelta, spike_multiplier: thresholds.anomalyReconnectSpikeMultiplier, }, })); } if ( currentWindow.quote_delta > 0 && currentWindow.durable_delta === 0 && averageDurableDelta >= 0 ) { alerts.push(buildRuntimeAlert({ alert_code: 'near_intents_pipeline_flow_mismatch', severity: 'warning', reason: 'ingest quote flow advanced while durable writer progress stalled', service_scope: 'history-writer', pair: config.activePair, details: { current_window_quote_delta: currentWindow.quote_delta, current_window_durable_delta: currentWindow.durable_delta, baseline_average_durable_delta: averageDurableDelta, }, })); } return alerts; } async function maybeContainRisk({ servicesByName, desiredRuntimeAlerts, now }) { void servicesByName; void desiredRuntimeAlerts; state.containment.executor_auto_disarmed = null; state.containment.last_action_at = now; state.containment.last_action_reason = 'automatic_executor_containment_disabled'; state.containment.last_action_result = { ok: true, automatic_containment_enabled: false, }; } async function pollNearIntentsEnvironmentStatus() { const observedAt = new Date().toISOString(); const [servicesResponse, postsResponse, postEnumsResponse] = await Promise.all([ fetchNearIntentsStatusJson(config.nearIntentsStatusServicesUrl), fetchNearIntentsStatusJson(config.nearIntentsStatusPostsUrl), fetchNearIntentsStatusJson(config.nearIntentsStatusPostEnumsUrl), ]); const normalized = normalizeNearIntentsStatus({ servicesResponse, postsResponse, postEnumsResponse, observedAt, trackedAssets: config.trackedAssets, }); state.near_intents_status = normalized; state.last_environment_status_poll_at = observedAt; state.last_environment_status_error = null; if (normalized.status_fingerprint === state.last_environment_status_fingerprint) { state.last_environment_status_duplicate_at = observedAt; return { published: false, status: normalized }; } const payload = buildNearIntentsStatusEventPayload(normalized, { changedAt: observedAt, previousFingerprint: state.last_environment_status_fingerprint, }); const event = buildEventEnvelope({ source: 'ops-sentinel', venue: 'near_intents', eventType: 'environment_status', observedAt, payload, raw: { source: 'near_intents_status_page', services: servicesResponse, posts: postsResponse, post_enums: postEnumsResponse, }, }); assertEnvironmentStatusEvent(event); await producer.sendJson(config.kafkaTopicOpsEnvironmentStatus, event, { key: payload.environment_key, }); state.last_environment_status_fingerprint = payload.status_fingerprint; state.last_environment_status_publish_at = observedAt; state.environment_status_publish_count += 1; state.publish_count += 1; return { published: true, status: normalized }; } async function fetchNearIntentsStatusJson(url) { return fetchJson(url, { signal: AbortSignal.timeout(config.nearIntentsStatusTimeoutMs), }); } async function publishTransitions(transitions) { for (const transition of transitions) { const alertEventId = `${transition.alert_code}-${transition.status}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`; const event = buildEventEnvelope({ source: 'ops-sentinel', venue: 'unrip', eventType: 'ops_alert', observedAt: transition.last_evaluated_at, payload: { alert_event_id: alertEventId, ...transition, }, }); assertOpsAlertEvent(event); await producer.sendJson(config.kafkaTopicOpsAlert, event, { key: `${transition.alert_code}:${transition.service_scope}:${transition.tx_hash || transition.pair || 'global'}`, }); state.publish_count += 1; await notifier.notify({ ...transition, alert_event_id: alertEventId, }); } } function normalizePayloadForAlert(topic, event) { switch (topic) { case config.kafkaTopicRefMarketPrice: assertMarketPriceEvent(event); return { ...event.payload, observed_at: event.observed_at, ingested_at: event.ingested_at, }; case config.kafkaTopicStateIntentInventory: assertInventorySnapshotEvent(event); return { ...event.payload, observed_at: event.observed_at, ingested_at: event.ingested_at, }; case config.kafkaTopicOpsLiquidityAction: assertLiquidityActionEvent(event); return { ...event.payload, observed_at: event.observed_at, ingested_at: event.ingested_at, }; case config.kafkaTopicOpsFundingObservation: assertFundingObservationEvent(event); return event.payload; case config.kafkaTopicExecTradeResult: assertTradeResult(event); return { ...event.payload, observed_at: event.observed_at, ingested_at: event.ingested_at, }; default: throw new Error(`unsupported ops-sentinel topic: ${topic}`); } } function parseOffset(value) { if (value == null) return 0; const parsed = Number(value); return Number.isFinite(parsed) ? parsed : 0; } function average(values) { if (!values.length) return 0; return values.reduce((sum, value) => sum + value, 0) / values.length; } async function shutdown() { clearInterval(timer); clearInterval(environmentStatusTimer); await controlApi.close().catch(() => {}); await consumer.disconnect(); await producer.disconnect(); await configPool.end().catch(() => {}); process.exit(0); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);