All checks were successful
deploy / deploy (push) Successful in 32s
Proof: ops-sentinel runtime evaluation no longer throws ReferenceError while checking disabled executor containment; covered by a regression test and full npm test. Assumptions: Runtime containment remains intentionally disabled by shouldContainExecutorForAlerts returning false; this change only restores the symbol needed for evaluation to complete. Still fake: Alert surfaces remain suppressed per the current safety/alert simplification; venue-native terminal fill events and realized per-trade PnL are still unavailable.
660 lines
23 KiB
JavaScript
660 lines
23 KiB
JavaScript
import process from 'node:process';
|
|
|
|
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
|
import { createProducer } from '../bus/kafka/producer.mjs';
|
|
import { startControlApi } from '../core/control-api.mjs';
|
|
import { createAlertEngine } from '../core/alert-engine.mjs';
|
|
import { createAlertNotifier } from '../core/alert-notifier.mjs';
|
|
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
|
|
import { createLogger, serializeError } from '../core/log.mjs';
|
|
import { listDashboardServices } from '../core/operator-dashboard.mjs';
|
|
import {
|
|
ageMs,
|
|
buildRuntimeAlert,
|
|
createRuntimeHealthThresholds,
|
|
evaluateRuntimeHealth,
|
|
shouldContainExecutorForAlerts,
|
|
shouldRaiseIngestPublishStale,
|
|
} from '../core/runtime-health.mjs';
|
|
import {
|
|
assertFundingObservationEvent,
|
|
assertInventorySnapshotEvent,
|
|
assertLiquidityActionEvent,
|
|
assertMarketPriceEvent,
|
|
assertOpsAlertEvent,
|
|
assertTradeResult,
|
|
} from '../core/schemas.mjs';
|
|
import { loadConfig } from '../lib/config.mjs';
|
|
|
|
const config = loadConfig();
|
|
const thresholds = createRuntimeHealthThresholds(config);
|
|
const logger = createLogger({
|
|
service: 'ops-sentinel',
|
|
component: 'alerts',
|
|
namespace: config.projectNamespace,
|
|
});
|
|
|
|
const producer = await createProducer({
|
|
brokers: config.kafkaBrokers,
|
|
clientId: config.kafkaClientId,
|
|
logger,
|
|
});
|
|
const consumer = await createConsumer({
|
|
groupId: config.kafkaConsumerGroupOpsSentinel,
|
|
brokers: config.kafkaBrokers,
|
|
clientId: config.kafkaClientId,
|
|
logger,
|
|
});
|
|
|
|
const topics = [
|
|
config.kafkaTopicRefMarketPrice,
|
|
config.kafkaTopicStateIntentInventory,
|
|
config.kafkaTopicOpsLiquidityAction,
|
|
config.kafkaTopicOpsFundingObservation,
|
|
config.kafkaTopicExecTradeResult,
|
|
];
|
|
|
|
const state = {
|
|
paused: false,
|
|
last_error: null,
|
|
last_event_at: null,
|
|
publish_count: 0,
|
|
last_runtime_eval_at: null,
|
|
service_snapshots: [],
|
|
service_health: [],
|
|
latest_runtime_alerts: [],
|
|
containment: {
|
|
executor_auto_disarmed: null,
|
|
last_action_at: null,
|
|
last_action_reason: null,
|
|
last_action_result: null,
|
|
},
|
|
anomaly_samples: [],
|
|
};
|
|
|
|
const alertEngine = createAlertEngine({
|
|
activePair: config.activePair,
|
|
priceStaleMs: config.opsSentinelPriceStaleMs,
|
|
inventoryStaleMs: config.opsSentinelInventoryStaleMs,
|
|
fundingCreditPendingMs: config.opsSentinelFundingCreditPendingMs,
|
|
fundingStuckMs: config.opsSentinelFundingStuckMs,
|
|
evaluationIntervalMs: config.opsSentinelEvaluationMs,
|
|
});
|
|
|
|
const notifier = createAlertNotifier({
|
|
webhookUrl: config.opsSentinelAlertWebhookUrl,
|
|
webhookTimeoutMs: config.opsSentinelAlertWebhookTimeoutMs,
|
|
logger: logger.child({ component: 'webhook-notifier' }),
|
|
});
|
|
|
|
const monitoredServices = listDashboardServices(config);
|
|
|
|
for (const topic of topics) {
|
|
await consumer.subscribe({ topic, fromBeginning: true });
|
|
}
|
|
|
|
await consumer.run({
|
|
eachMessage: async ({ topic, message }) => {
|
|
if (!message.value || state.paused) return;
|
|
|
|
try {
|
|
const event = parseEventMessage(message.value.toString());
|
|
normalizePayloadForAlert(topic, event);
|
|
state.last_error = null;
|
|
state.last_event_at = new Date().toISOString();
|
|
} catch (error) {
|
|
state.last_error = serializeError(error);
|
|
logger.error('ops_sentinel_consume_failed', {
|
|
topic,
|
|
details: {
|
|
error: serializeError(error),
|
|
},
|
|
});
|
|
}
|
|
},
|
|
});
|
|
|
|
const timer = setInterval(() => {
|
|
if (state.paused) return;
|
|
|
|
evaluateRuntimeHealthLoop().catch((error) => {
|
|
state.last_error = serializeError(error);
|
|
logger.error('ops_sentinel_runtime_evaluate_failed', {
|
|
topic: config.kafkaTopicOpsAlert,
|
|
details: {
|
|
error: serializeError(error),
|
|
},
|
|
});
|
|
});
|
|
}, config.opsSentinelEvaluationMs);
|
|
timer.unref?.();
|
|
|
|
const controlApi = startControlApi({
|
|
host: config.opsSentinelControlHost,
|
|
port: config.opsSentinelControlPort,
|
|
logger: logger.child({ component: 'control-api' }),
|
|
service: 'ops-sentinel',
|
|
namespace: config.projectNamespace,
|
|
stateProvider: {
|
|
getState() {
|
|
return {
|
|
paused: state.paused,
|
|
publish_count: state.publish_count,
|
|
last_error: state.last_error,
|
|
last_event_at: state.last_event_at,
|
|
last_runtime_eval_at: state.last_runtime_eval_at,
|
|
service_snapshots: state.service_snapshots,
|
|
service_health: state.service_health,
|
|
latest_runtime_alerts: [],
|
|
containment: state.containment,
|
|
notifier: notifier.getState(),
|
|
anomaly_samples: state.anomaly_samples.slice(-thresholds.anomalyWindowSize),
|
|
active_alerts: [],
|
|
recent_transitions: [],
|
|
};
|
|
},
|
|
},
|
|
healthProvider: {
|
|
getHealth() {
|
|
const staleMs = ageMs(state.last_runtime_eval_at);
|
|
return {
|
|
ok: !state.paused && (staleMs == null || staleMs <= thresholds.sentinelStaleMs),
|
|
paused: state.paused,
|
|
last_event_at: state.last_event_at,
|
|
last_runtime_eval_at: state.last_runtime_eval_at,
|
|
last_error: state.last_error,
|
|
stale: staleMs != null && staleMs > thresholds.sentinelStaleMs,
|
|
stale_after_ms: thresholds.sentinelStaleMs,
|
|
};
|
|
},
|
|
},
|
|
routes: [
|
|
{
|
|
method: 'POST',
|
|
path: '/pause',
|
|
handler: () => {
|
|
state.paused = true;
|
|
consumer.pause(topics.map((topic) => ({ topic })));
|
|
return { ok: true, paused: true };
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/resume',
|
|
handler: () => {
|
|
state.paused = false;
|
|
consumer.resume(topics.map((topic) => ({ topic })));
|
|
return { ok: true, paused: false };
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/evaluate',
|
|
handler: async () => {
|
|
await evaluateRuntimeHealthLoop();
|
|
return { ok: true, evaluated_at: state.last_runtime_eval_at };
|
|
},
|
|
},
|
|
],
|
|
});
|
|
|
|
async function evaluateRuntimeHealthLoop() {
|
|
const now = new Date().toISOString();
|
|
const previousRuntimeEvalAt = state.last_runtime_eval_at;
|
|
const serviceSnapshots = await Promise.all(monitoredServices.map(loadServiceSnapshot));
|
|
state.service_snapshots = serviceSnapshots;
|
|
state.last_runtime_eval_at = now;
|
|
|
|
const servicesByName = Object.fromEntries(serviceSnapshots.map((snapshot) => [snapshot.service, snapshot]));
|
|
const anomalyAlerts = buildAnomalyAlerts({ servicesByName, now });
|
|
const runtimeAlerts = buildDeterministicRuntimeAlerts({ servicesByName, now, previousRuntimeEvalAt });
|
|
const desiredRuntimeAlerts = [...runtimeAlerts, ...anomalyAlerts];
|
|
state.service_health = [...evaluateRuntimeHealth({
|
|
servicesByName,
|
|
activePair: config.activePair,
|
|
activeAlerts: [],
|
|
now,
|
|
}).values()];
|
|
state.latest_runtime_alerts = [];
|
|
state.containment.executor_auto_disarmed = null;
|
|
state.containment.last_action_at = now;
|
|
state.containment.last_action_reason = 'automatic_executor_containment_disabled';
|
|
state.containment.last_action_result = {
|
|
ok: true,
|
|
automatic_containment_enabled: false,
|
|
};
|
|
}
|
|
|
|
async function loadServiceSnapshot(service) {
|
|
const [stateResult, healthResult] = await Promise.allSettled([
|
|
fetchUpstreamJson(`${service.base_url}/state`),
|
|
fetchUpstreamJson(`${service.base_url}/healthz`),
|
|
]);
|
|
|
|
const statePayload = stateResult.status === 'fulfilled' ? stateResult.value : null;
|
|
const healthPayload = healthResult.status === 'fulfilled' ? healthResult.value : null;
|
|
const error = stateResult.status === 'rejected'
|
|
? serializeError(stateResult.reason)
|
|
: healthResult.status === 'rejected'
|
|
? serializeError(healthResult.reason)
|
|
: null;
|
|
|
|
return {
|
|
...service,
|
|
reachable: Boolean(statePayload || healthPayload),
|
|
state: statePayload,
|
|
health: healthPayload,
|
|
error,
|
|
};
|
|
}
|
|
|
|
async function fetchUpstreamJson(url) {
|
|
return fetchJson(url, {
|
|
signal: AbortSignal.timeout(config.operatorDashboardUpstreamTimeoutMs),
|
|
});
|
|
}
|
|
|
|
function buildDeterministicRuntimeAlerts({ servicesByName, now, previousRuntimeEvalAt = null }) {
|
|
const alerts = [];
|
|
const ingest = servicesByName['near-intents-ingest'];
|
|
const ingestState = ingest?.state?.ingest || {};
|
|
const ingestHealth = ingest?.health || {};
|
|
const matchingQuoteAgeMs = ageMs(ingestState.last_matching_quote_at, now);
|
|
const publishedAgeMs = ageMs(ingestState.last_published_at, now);
|
|
const messageAgeMs = ageMs(ingestState.last_message_at, now);
|
|
|
|
if (!ingest?.reachable || ingestState.connected === false || ingestHealth.connected === false) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'near_intents_ingest_disconnected',
|
|
severity: 'critical',
|
|
reason: 'near-intents-ingest websocket is disconnected or unreachable',
|
|
service_scope: 'near-intents-ingest',
|
|
pair: config.activePair,
|
|
details: {
|
|
reachable: ingest?.reachable ?? false,
|
|
connected: ingestState.connected ?? ingestHealth.connected ?? null,
|
|
last_message_at: ingestState.last_message_at || null,
|
|
last_connected_at: ingestState.last_connected_at || null,
|
|
last_disconnected_at: ingestState.last_disconnected_at || null,
|
|
},
|
|
}));
|
|
}
|
|
|
|
if (matchingQuoteAgeMs == null || matchingQuoteAgeMs > thresholds.ingestQuoteStaleMs) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'near_intents_quotes_stale',
|
|
severity: 'critical',
|
|
reason: matchingQuoteAgeMs == null
|
|
? 'near-intents-ingest has not observed a matching quote'
|
|
: `matching quote freshness ${matchingQuoteAgeMs}ms exceeds ${thresholds.ingestQuoteStaleMs}ms`,
|
|
service_scope: 'near-intents-ingest',
|
|
pair: config.activePair,
|
|
details: {
|
|
last_matching_quote_at: ingestState.last_matching_quote_at || null,
|
|
age_ms: matchingQuoteAgeMs,
|
|
stale_after_ms: thresholds.ingestQuoteStaleMs,
|
|
last_message_at: ingestState.last_message_at || null,
|
|
message_age_ms: messageAgeMs,
|
|
},
|
|
}));
|
|
}
|
|
|
|
if (shouldRaiseIngestPublishStale({
|
|
lastMatchingQuoteAt: ingestState.last_matching_quote_at || null,
|
|
lastPublishedAt: ingestState.last_published_at || null,
|
|
matchingQuoteAgeMs,
|
|
publishedAgeMs,
|
|
publishStaleMs: thresholds.ingestPublishStaleMs,
|
|
})) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'near_intents_publish_stale',
|
|
severity: 'critical',
|
|
reason: publishedAgeMs == null
|
|
? 'near-intents-ingest has not published a matching quote'
|
|
: `published quote freshness ${publishedAgeMs}ms exceeds ${thresholds.ingestPublishStaleMs}ms`,
|
|
service_scope: 'near-intents-ingest',
|
|
pair: config.activePair,
|
|
details: {
|
|
last_matching_quote_at: ingestState.last_matching_quote_at || null,
|
|
last_published_at: ingestState.last_published_at || null,
|
|
quote_age_ms: matchingQuoteAgeMs,
|
|
publish_age_ms: publishedAgeMs,
|
|
stale_after_ms: thresholds.ingestPublishStaleMs,
|
|
},
|
|
}));
|
|
}
|
|
|
|
const executor = servicesByName['trade-executor'];
|
|
const relay = executor?.state?.relay || {};
|
|
const relayAgeMs = ageMs(relay.last_message_at, now);
|
|
if (!executor?.reachable || relay.connected === false || (relayAgeMs != null && relayAgeMs > thresholds.executorRelayStaleMs)) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'trade_executor_relay_disconnected',
|
|
severity: 'critical',
|
|
reason: !executor?.reachable || relay.connected === false
|
|
? 'trade-executor solver relay is disconnected or unreachable'
|
|
: `trade-executor relay freshness ${relayAgeMs}ms exceeds ${thresholds.executorRelayStaleMs}ms`,
|
|
service_scope: 'trade-executor',
|
|
pair: config.activePair,
|
|
details: {
|
|
reachable: executor?.reachable ?? false,
|
|
connected: relay.connected ?? null,
|
|
last_message_at: relay.last_message_at || null,
|
|
age_ms: relayAgeMs,
|
|
stale_after_ms: thresholds.executorRelayStaleMs,
|
|
},
|
|
}));
|
|
}
|
|
|
|
const writer = servicesByName['history-writer'];
|
|
const writerState = writer?.state || {};
|
|
const writerAgeMs = ageMs(writerState.last_write_at, now);
|
|
const rawOffset = parseOffset(writerState.offsets?.[config.kafkaTopicRawNearIntentsQuote]?.offset);
|
|
const normOffset = parseOffset(writerState.offsets?.[config.kafkaTopicNormSwapDemand]?.offset);
|
|
const ingestPublishedCount = Number(ingestState.published_count || 0);
|
|
const lastSample = state.anomaly_samples.at(-1) || null;
|
|
const writerProgressed = lastSample
|
|
? rawOffset > lastSample.raw_offset || normOffset > lastSample.norm_offset
|
|
: true;
|
|
|
|
if (
|
|
!writer?.reachable
|
|
|| writerState.database_connectivity === false
|
|
|| writerAgeMs == null
|
|
|| writerAgeMs > thresholds.historyWriterStaleMs
|
|
|| (lastSample && ingestPublishedCount > lastSample.ingest_published_count && !writerProgressed)
|
|
) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'history_writer_stalled',
|
|
severity: 'critical',
|
|
reason: !writer?.reachable
|
|
? 'history-writer is unreachable'
|
|
: writerState.database_connectivity === false
|
|
? 'history-writer lost database connectivity'
|
|
: lastSample && ingestPublishedCount > lastSample.ingest_published_count && !writerProgressed
|
|
? 'ingest published quotes but durable history offsets stopped advancing'
|
|
: `history-writer freshness ${writerAgeMs}ms exceeds ${thresholds.historyWriterStaleMs}ms`,
|
|
service_scope: 'history-writer',
|
|
pair: config.activePair,
|
|
details: {
|
|
last_write_at: writerState.last_write_at || null,
|
|
age_ms: writerAgeMs,
|
|
stale_after_ms: thresholds.historyWriterStaleMs,
|
|
raw_offset: rawOffset,
|
|
normalized_offset: normOffset,
|
|
ingest_published_count: ingestPublishedCount,
|
|
},
|
|
}));
|
|
}
|
|
|
|
const dashboard = servicesByName['operator-dashboard'];
|
|
const dashboardState = dashboard?.state || {};
|
|
const dashboardSourceErrorCount = Number(
|
|
dashboardState.source_error_count
|
|
|| dashboard?.health?.source_error_count
|
|
|| 0,
|
|
);
|
|
const dashboardBootstrapAgeMs = ageMs(dashboardState.last_bootstrap_at, now);
|
|
if (
|
|
!dashboard?.reachable
|
|
|| dashboardSourceErrorCount > 0
|
|
|| (dashboardBootstrapAgeMs != null && dashboardBootstrapAgeMs > thresholds.dashboardSourceDegradedMs)
|
|
) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'operator_dashboard_source_degraded',
|
|
severity: 'warning',
|
|
reason: !dashboard?.reachable
|
|
? 'operator-dashboard is unreachable'
|
|
: dashboardSourceErrorCount > 0
|
|
? 'operator-dashboard has upstream source errors'
|
|
: `operator-dashboard bootstrap freshness ${dashboardBootstrapAgeMs}ms exceeds ${thresholds.dashboardSourceDegradedMs}ms`,
|
|
service_scope: 'operator-dashboard',
|
|
pair: config.activePair,
|
|
details: {
|
|
source_error_count: dashboardSourceErrorCount,
|
|
last_source_error_at: dashboardState.last_source_error_at || null,
|
|
last_bootstrap_at: dashboardState.last_bootstrap_at || null,
|
|
bootstrap_age_ms: dashboardBootstrapAgeMs,
|
|
},
|
|
}));
|
|
}
|
|
|
|
const selfAgeMs = ageMs(previousRuntimeEvalAt, now);
|
|
if (selfAgeMs != null && selfAgeMs > thresholds.sentinelStaleMs) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'sentinel_stale',
|
|
severity: 'critical',
|
|
reason: `ops-sentinel evaluation freshness ${selfAgeMs}ms exceeds ${thresholds.sentinelStaleMs}ms`,
|
|
service_scope: 'ops-sentinel',
|
|
pair: config.activePair,
|
|
details: {
|
|
last_runtime_eval_at: state.last_runtime_eval_at,
|
|
previous_runtime_eval_at: previousRuntimeEvalAt,
|
|
age_ms: selfAgeMs,
|
|
stale_after_ms: thresholds.sentinelStaleMs,
|
|
},
|
|
}));
|
|
}
|
|
|
|
if (notifier.getState().last_delivery_status === 'failed') {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'sentinel_alert_delivery_failed',
|
|
severity: 'warning',
|
|
reason: 'external alert delivery failed',
|
|
service_scope: 'ops-sentinel',
|
|
pair: config.activePair,
|
|
details: notifier.getState(),
|
|
}));
|
|
}
|
|
|
|
const executorArmed = executor?.state?.armed === true;
|
|
const criticalTruthFailure = shouldContainExecutorForAlerts(alerts);
|
|
if (executorArmed && criticalTruthFailure) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'executor_armed_with_stale_truth',
|
|
severity: 'critical',
|
|
reason: 'trade-executor remains armed while the upstream truth path is critically broken',
|
|
service_scope: 'trade-executor',
|
|
pair: config.activePair,
|
|
details: {
|
|
armed: true,
|
|
containment_available: true,
|
|
recommended_action: 'disarm',
|
|
},
|
|
}));
|
|
}
|
|
|
|
return alerts;
|
|
}
|
|
|
|
function buildAnomalyAlerts({ servicesByName, now }) {
|
|
const ingestState = servicesByName['near-intents-ingest']?.state?.ingest || {};
|
|
const writerState = servicesByName['history-writer']?.state || {};
|
|
const nextSample = {
|
|
at: now,
|
|
ingest_published_count: Number(ingestState.published_count || 0),
|
|
ingest_reconnect_count: Number(ingestState.reconnect_count || 0),
|
|
raw_offset: parseOffset(writerState.offsets?.[config.kafkaTopicRawNearIntentsQuote]?.offset),
|
|
norm_offset: parseOffset(writerState.offsets?.[config.kafkaTopicNormSwapDemand]?.offset),
|
|
};
|
|
|
|
state.anomaly_samples.push(nextSample);
|
|
state.anomaly_samples = state.anomaly_samples.slice(-(thresholds.anomalyWindowSize + 1));
|
|
|
|
if (state.anomaly_samples.length < thresholds.anomalyWindowSize) {
|
|
return [];
|
|
}
|
|
|
|
const windows = [];
|
|
for (let index = 1; index < state.anomaly_samples.length; index += 1) {
|
|
const previous = state.anomaly_samples[index - 1];
|
|
const current = state.anomaly_samples[index];
|
|
windows.push({
|
|
quote_delta: Math.max(0, current.ingest_published_count - previous.ingest_published_count),
|
|
reconnect_delta: Math.max(0, current.ingest_reconnect_count - previous.ingest_reconnect_count),
|
|
durable_delta: Math.max(0, current.norm_offset - previous.norm_offset),
|
|
});
|
|
}
|
|
|
|
const currentWindow = windows.at(-1);
|
|
const baseline = windows.slice(0, -1);
|
|
const averageQuoteDelta = average(baseline.map((entry) => entry.quote_delta));
|
|
const averageReconnectDelta = average(baseline.map((entry) => entry.reconnect_delta));
|
|
const averageDurableDelta = average(baseline.map((entry) => entry.durable_delta));
|
|
const alerts = [];
|
|
|
|
if (
|
|
averageQuoteDelta > 0
|
|
&& currentWindow.quote_delta <= averageQuoteDelta * thresholds.anomalyQuoteRateCollapseRatio
|
|
) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'near_intents_quote_rate_collapse',
|
|
severity: 'warning',
|
|
reason: 'quote publish rate collapsed versus recent baseline',
|
|
service_scope: 'near-intents-ingest',
|
|
pair: config.activePair,
|
|
details: {
|
|
current_window_quote_delta: currentWindow.quote_delta,
|
|
baseline_average_quote_delta: averageQuoteDelta,
|
|
collapse_ratio: thresholds.anomalyQuoteRateCollapseRatio,
|
|
},
|
|
}));
|
|
}
|
|
|
|
if (
|
|
averageReconnectDelta >= 0
|
|
&& currentWindow.reconnect_delta > 0
|
|
&& currentWindow.reconnect_delta >= Math.max(2, averageReconnectDelta * thresholds.anomalyReconnectSpikeMultiplier)
|
|
) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'near_intents_reconnect_spike',
|
|
severity: 'warning',
|
|
reason: 'near-intents reconnect frequency spiked versus recent baseline',
|
|
service_scope: 'near-intents-ingest',
|
|
pair: config.activePair,
|
|
details: {
|
|
current_window_reconnect_delta: currentWindow.reconnect_delta,
|
|
baseline_average_reconnect_delta: averageReconnectDelta,
|
|
spike_multiplier: thresholds.anomalyReconnectSpikeMultiplier,
|
|
},
|
|
}));
|
|
}
|
|
|
|
if (
|
|
currentWindow.quote_delta > 0
|
|
&& currentWindow.durable_delta === 0
|
|
&& averageDurableDelta >= 0
|
|
) {
|
|
alerts.push(buildRuntimeAlert({
|
|
alert_code: 'near_intents_pipeline_flow_mismatch',
|
|
severity: 'warning',
|
|
reason: 'ingest quote flow advanced while durable writer progress stalled',
|
|
service_scope: 'history-writer',
|
|
pair: config.activePair,
|
|
details: {
|
|
current_window_quote_delta: currentWindow.quote_delta,
|
|
current_window_durable_delta: currentWindow.durable_delta,
|
|
baseline_average_durable_delta: averageDurableDelta,
|
|
},
|
|
}));
|
|
}
|
|
|
|
return alerts;
|
|
}
|
|
|
|
async function maybeContainRisk({ servicesByName, desiredRuntimeAlerts, now }) {
|
|
void servicesByName;
|
|
void desiredRuntimeAlerts;
|
|
state.containment.executor_auto_disarmed = null;
|
|
state.containment.last_action_at = now;
|
|
state.containment.last_action_reason = 'automatic_executor_containment_disabled';
|
|
state.containment.last_action_result = {
|
|
ok: true,
|
|
automatic_containment_enabled: false,
|
|
};
|
|
}
|
|
|
|
async function publishTransitions(transitions) {
|
|
for (const transition of transitions) {
|
|
const alertEventId = `${transition.alert_code}-${transition.status}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`;
|
|
const event = buildEventEnvelope({
|
|
source: 'ops-sentinel',
|
|
venue: 'unrip',
|
|
eventType: 'ops_alert',
|
|
observedAt: transition.last_evaluated_at,
|
|
payload: {
|
|
alert_event_id: alertEventId,
|
|
...transition,
|
|
},
|
|
});
|
|
assertOpsAlertEvent(event);
|
|
await producer.sendJson(config.kafkaTopicOpsAlert, event, {
|
|
key: `${transition.alert_code}:${transition.service_scope}:${transition.tx_hash || transition.pair || 'global'}`,
|
|
});
|
|
state.publish_count += 1;
|
|
await notifier.notify({
|
|
...transition,
|
|
alert_event_id: alertEventId,
|
|
});
|
|
}
|
|
}
|
|
|
|
function normalizePayloadForAlert(topic, event) {
|
|
switch (topic) {
|
|
case config.kafkaTopicRefMarketPrice:
|
|
assertMarketPriceEvent(event);
|
|
return {
|
|
...event.payload,
|
|
observed_at: event.observed_at,
|
|
ingested_at: event.ingested_at,
|
|
};
|
|
case config.kafkaTopicStateIntentInventory:
|
|
assertInventorySnapshotEvent(event);
|
|
return {
|
|
...event.payload,
|
|
observed_at: event.observed_at,
|
|
ingested_at: event.ingested_at,
|
|
};
|
|
case config.kafkaTopicOpsLiquidityAction:
|
|
assertLiquidityActionEvent(event);
|
|
return {
|
|
...event.payload,
|
|
observed_at: event.observed_at,
|
|
ingested_at: event.ingested_at,
|
|
};
|
|
case config.kafkaTopicOpsFundingObservation:
|
|
assertFundingObservationEvent(event);
|
|
return event.payload;
|
|
case config.kafkaTopicExecTradeResult:
|
|
assertTradeResult(event);
|
|
return {
|
|
...event.payload,
|
|
observed_at: event.observed_at,
|
|
ingested_at: event.ingested_at,
|
|
};
|
|
default:
|
|
throw new Error(`unsupported ops-sentinel topic: ${topic}`);
|
|
}
|
|
}
|
|
|
|
function parseOffset(value) {
|
|
if (value == null) return 0;
|
|
const parsed = Number(value);
|
|
return Number.isFinite(parsed) ? parsed : 0;
|
|
}
|
|
|
|
function average(values) {
|
|
if (!values.length) return 0;
|
|
return values.reduce((sum, value) => sum + value, 0) / values.length;
|
|
}
|
|
|
|
async function shutdown() {
|
|
clearInterval(timer);
|
|
await controlApi.close().catch(() => {});
|
|
await consumer.disconnect();
|
|
await producer.disconnect();
|
|
process.exit(0);
|
|
}
|
|
|
|
process.on('SIGINT', shutdown);
|
|
process.on('SIGTERM', shutdown);
|