Persist NEAR status changes only
Some checks failed
deploy / deploy (push) Failing after 29s

Proof: npm test; npm run operator-dashboard:build; node --test test/near-intents-status.test.mjs test/environment-status-history.test.mjs test/operator-dashboard.test.mjs test/operator-dashboard-ui-static.test.mjs test/ops-sentinel-static.test.mjs; PYTHONPATH=. python3 test/repo_deployments_test.py; kubectl kustomize deploy/k8s/base.

Assumptions: NEAR Intents public status page API remains the official upstream environmental-status source; status fingerprint changes are the durable boundary for saving environmental history.

Still fake: This stores and displays official upstream status changes, but it does not create an alternate quote source or make NEAR quoting operational during an upstream pause.
This commit is contained in:
philipp 2026-04-17 14:34:10 +02:00
parent 99ca09b69e
commit 601450c664
18 changed files with 805 additions and 24 deletions

View file

@ -16,7 +16,7 @@ spec:
- |
set -eu
BROKERS="redpanda.unrip.svc.cluster.local:9092"
TOPICS="raw.near_intents.quote norm.swap_demand ref.market_price state.intent_inventory ops.liquidity_action ops.funding_observation ops.alert decision.trade_decision cmd.execute_trade exec.trade_result"
TOPICS="raw.near_intents.quote norm.swap_demand ref.market_price state.intent_inventory ops.liquidity_action ops.funding_observation ops.alert ops.environment_status decision.trade_decision cmd.execute_trade exec.trade_result"
RETENTION_MS="172800000"
RETENTION_BYTES="268435456"

View file

@ -12,6 +12,7 @@ data:
NEAR_INTENTS_VERIFIER_CONTRACT: intents.near
NEAR_RPC_URL: https://rpc.fastnear.com
NEAR_INTENTS_PAIR_FILTER: nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omft.near
NEAR_INTENTS_STATUS_POLL_MS: "60000"
NEAR_INTENTS_ACCOUNT_ID: unrip-dev.near
TRADING_BTC_ASSET_ID: nep141:btc.omft.near
TRADING_BTC_SYMBOL: BTC
@ -62,6 +63,7 @@ data:
KAFKA_TOPIC_OPS_LIQUIDITY_ACTION: ops.liquidity_action
KAFKA_TOPIC_OPS_FUNDING_OBSERVATION: ops.funding_observation
KAFKA_TOPIC_OPS_ALERT: ops.alert
KAFKA_TOPIC_OPS_ENVIRONMENT_STATUS: ops.environment_status
KAFKA_TOPIC_DECISION_TRADE_DECISION: decision.trade_decision
KAFKA_TOPIC_CMD_EXECUTE_TRADE: cmd.execute_trade
KAFKA_TOPIC_EXEC_TRADE_RESULT: exec.trade_result

View file

@ -19,6 +19,7 @@ import {
claimNotificationDelivery,
createPostgresPool,
ensureHistorySchema,
insertEnvironmentStatusChange,
finishNotificationDelivery,
insertHistoryEvent,
loadLatestPortfolioMetric,
@ -83,6 +84,7 @@ const topics = [
config.kafkaTopicOpsLiquidityAction,
config.kafkaTopicOpsFundingObservation,
config.kafkaTopicOpsAlert,
config.kafkaTopicOpsEnvironmentStatus,
config.kafkaTopicDecisionTradeDecision,
config.kafkaTopicCmdExecuteTrade,
config.kafkaTopicExecTradeResult,
@ -113,6 +115,10 @@ const state = {
last_write_at: null,
last_funding_observation_write_at: null,
last_alert_write_at: null,
last_environment_status_write_at: null,
last_environment_status_seen_at: null,
last_environment_status_duplicate_at: null,
last_environment_status_fingerprint: null,
last_metrics_at: null,
last_error: null,
error_count: 0,
@ -148,25 +154,43 @@ await consumer.run({
try {
const event = parseEventMessage(message.value.toString());
const routed = routeHistoryRecord({ topic, event });
await insertHistoryEvent(pool, {
const writeResult = topic === config.kafkaTopicOpsEnvironmentStatus
? await insertEnvironmentStatusChange(pool, {
topic,
event,
record: routed.record,
})
: await insertHistoryEvent(pool, {
table: routed.table,
topic,
event,
record: routed.record,
});
}).then(() => ({ inserted: true }));
state.last_write_at = new Date().toISOString();
const handledAt = new Date().toISOString();
if (writeResult.inserted) {
state.last_write_at = handledAt;
}
state.last_error = null;
state.offsets[topic] = {
partition,
offset: message.offset,
};
if (topic === config.kafkaTopicOpsFundingObservation) {
if (topic === config.kafkaTopicOpsFundingObservation && writeResult.inserted) {
state.last_funding_observation_write_at = state.last_write_at;
}
if (topic === config.kafkaTopicOpsAlert) {
if (topic === config.kafkaTopicOpsAlert && writeResult.inserted) {
state.last_alert_write_at = state.last_write_at;
}
if (topic === config.kafkaTopicOpsEnvironmentStatus) {
state.last_environment_status_seen_at = handledAt;
state.last_environment_status_fingerprint = writeResult.status_fingerprint || event.payload.status_fingerprint || null;
if (writeResult.inserted) {
state.last_environment_status_write_at = handledAt;
} else {
state.last_environment_status_duplicate_at = handledAt;
}
}
await publishLiquidityNotification({ topic, event });
if (portfolioMetricTopics.has(topic)) {
try {
@ -255,6 +279,10 @@ const controlApi = startControlApi({
paused: state.paused,
last_write_at: state.last_write_at,
last_alert_write_at: state.last_alert_write_at,
last_environment_status_write_at: state.last_environment_status_write_at,
last_environment_status_seen_at: state.last_environment_status_seen_at,
last_environment_status_duplicate_at: state.last_environment_status_duplicate_at,
last_environment_status_fingerprint: state.last_environment_status_fingerprint,
last_metrics_at: state.last_metrics_at,
freshness_age_ms: Number.isFinite(freshnessAgeMs) ? Math.max(0, freshnessAgeMs) : null,
database_connectivity: connectivity,

View file

@ -37,6 +37,7 @@ import {
loadLatestPortfolioMetric,
loadRecentAlertTransitions,
loadRecentDepositStatuses,
loadRecentEnvironmentStatuses,
loadRecentExecuteTradeCommands,
loadRecentExecutionResults,
loadRecentIntentRequests,
@ -124,6 +125,11 @@ const initialRecentQuoteOutcomes = await safeSourceLoad(
() => loadRecentQuoteOutcomes(pool, { limit: 200 }),
[],
);
const initialNearIntentsStatus = await safeSourceLoad(
'near_intents_status',
() => loadNearIntentsStatus(),
null,
);
const liveState = createDashboardLiveState({
config,
@ -136,6 +142,7 @@ const liveState = createDashboardLiveState({
latestInventory: initialInventory,
recentSubmissionCount: initialSubmissionSummary.total,
lastSubmissionAt: initialSubmissionSummary.last_submission_at,
nearIntentsStatus: initialNearIntentsStatus,
activeAlerts:
initialServiceSnapshots.find((snapshot) => snapshot.service === 'ops-sentinel')?.state?.active_alerts
|| [],
@ -155,6 +162,7 @@ const liveTopics = [
config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory,
config.kafkaTopicOpsAlert,
config.kafkaTopicOpsEnvironmentStatus,
config.kafkaTopicExecTradeResult,
];
@ -402,6 +410,7 @@ async function loadBootstrapPayload({ auth, page, pageSize }) {
recentQuoteOutcomes,
recentIntentRequests,
recentAlertTransitions,
recentEnvironmentStatuses,
serviceSnapshots,
nearIntentsStatus,
] = await Promise.all([
@ -484,6 +493,12 @@ async function loadBootstrapPayload({ auth, page, pageSize }) {
[],
sourceErrors,
),
safeSourceLoad(
'recent_environment_statuses',
() => loadRecentEnvironmentStatuses(pool, { limit: 20 }),
[],
sourceErrors,
),
loadServiceSnapshots(),
safeSourceLoad('near_intents_status', () => loadNearIntentsStatus(), null, sourceErrors),
]);
@ -505,6 +520,7 @@ async function loadBootstrapPayload({ auth, page, pageSize }) {
recentQuoteOutcomes,
recentIntentRequests,
recentAlertTransitions,
recentEnvironmentStatuses,
serviceSnapshots,
nearIntentsStatus,
sourceErrors,

View file

@ -7,6 +7,10 @@ import { createAlertEngine } from '../core/alert-engine.mjs';
import { createAlertNotifier } from '../core/alert-notifier.mjs';
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import {
buildNearIntentsStatusEventPayload,
normalizeNearIntentsStatus,
} from '../core/near-intents-status.mjs';
import { listDashboardServices } from '../core/operator-dashboard.mjs';
import {
ageMs,
@ -17,6 +21,7 @@ import {
shouldRaiseIngestPublishStale,
} from '../core/runtime-health.mjs';
import {
assertEnvironmentStatusEvent,
assertFundingObservationEvent,
assertInventorySnapshotEvent,
assertLiquidityActionEvent,
@ -25,6 +30,7 @@ import {
assertTradeResult,
} from '../core/schemas.mjs';
import { loadConfig } from '../lib/config.mjs';
import { fetchJson } from '../lib/http.mjs';
const config = loadConfig();
const thresholds = createRuntimeHealthThresholds(config);
@ -63,6 +69,13 @@ const state = {
service_snapshots: [],
service_health: [],
latest_runtime_alerts: [],
near_intents_status: null,
last_environment_status_poll_at: null,
last_environment_status_publish_at: null,
last_environment_status_duplicate_at: null,
last_environment_status_error: null,
last_environment_status_fingerprint: null,
environment_status_publish_count: 0,
containment: {
executor_auto_disarmed: null,
last_action_at: null,
@ -129,6 +142,31 @@ const timer = setInterval(() => {
}, config.opsSentinelEvaluationMs);
timer.unref?.();
const environmentStatusTimer = setInterval(() => {
if (state.paused) return;
pollNearIntentsEnvironmentStatus().catch((error) => {
state.last_environment_status_error = serializeError(error);
logger.error('near_intents_environment_status_poll_failed', {
topic: config.kafkaTopicOpsEnvironmentStatus,
details: {
error: serializeError(error),
},
});
});
}, config.nearIntentsStatusPollMs);
environmentStatusTimer.unref?.();
pollNearIntentsEnvironmentStatus().catch((error) => {
state.last_environment_status_error = serializeError(error);
logger.error('near_intents_environment_status_initial_poll_failed', {
topic: config.kafkaTopicOpsEnvironmentStatus,
details: {
error: serializeError(error),
},
});
});
const controlApi = startControlApi({
host: config.opsSentinelControlHost,
port: config.opsSentinelControlPort,
@ -146,6 +184,13 @@ const controlApi = startControlApi({
service_snapshots: state.service_snapshots,
service_health: state.service_health,
latest_runtime_alerts: [],
near_intents_status: state.near_intents_status,
last_environment_status_poll_at: state.last_environment_status_poll_at,
last_environment_status_publish_at: state.last_environment_status_publish_at,
last_environment_status_duplicate_at: state.last_environment_status_duplicate_at,
last_environment_status_error: state.last_environment_status_error,
last_environment_status_fingerprint: state.last_environment_status_fingerprint,
environment_status_publish_count: state.environment_status_publish_count,
containment: state.containment,
notifier: notifier.getState(),
anomaly_samples: state.anomaly_samples.slice(-thresholds.anomalyWindowSize),
@ -574,6 +619,65 @@ async function maybeContainRisk({ servicesByName, desiredRuntimeAlerts, now }) {
};
}
async function pollNearIntentsEnvironmentStatus() {
const observedAt = new Date().toISOString();
const [servicesResponse, postsResponse, postEnumsResponse] = await Promise.all([
fetchNearIntentsStatusJson(config.nearIntentsStatusServicesUrl),
fetchNearIntentsStatusJson(config.nearIntentsStatusPostsUrl),
fetchNearIntentsStatusJson(config.nearIntentsStatusPostEnumsUrl),
]);
const normalized = normalizeNearIntentsStatus({
servicesResponse,
postsResponse,
postEnumsResponse,
observedAt,
});
state.near_intents_status = normalized;
state.last_environment_status_poll_at = observedAt;
state.last_environment_status_error = null;
if (normalized.status_fingerprint === state.last_environment_status_fingerprint) {
state.last_environment_status_duplicate_at = observedAt;
return { published: false, status: normalized };
}
const payload = buildNearIntentsStatusEventPayload(normalized, {
changedAt: observedAt,
previousFingerprint: state.last_environment_status_fingerprint,
});
const event = buildEventEnvelope({
source: 'ops-sentinel',
venue: 'near_intents',
eventType: 'environment_status',
observedAt,
payload,
raw: {
source: 'near_intents_status_page',
services: servicesResponse,
posts: postsResponse,
post_enums: postEnumsResponse,
},
});
assertEnvironmentStatusEvent(event);
await producer.sendJson(config.kafkaTopicOpsEnvironmentStatus, event, {
key: payload.environment_key,
});
state.last_environment_status_fingerprint = payload.status_fingerprint;
state.last_environment_status_publish_at = observedAt;
state.environment_status_publish_count += 1;
state.publish_count += 1;
return { published: true, status: normalized };
}
async function fetchNearIntentsStatusJson(url) {
return fetchJson(url, {
signal: AbortSignal.timeout(config.nearIntentsStatusTimeoutMs),
});
}
async function publishTransitions(transitions) {
for (const transition of transitions) {
const alertEventId = `${transition.alert_code}-${transition.status}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`;
@ -650,6 +754,7 @@ function average(values) {
async function shutdown() {
clearInterval(timer);
clearInterval(environmentStatusTimer);
await controlApi.close().catch(() => {});
await consumer.disconnect();
await producer.disconnect();

View file

@ -1,4 +1,5 @@
import {
assertEnvironmentStatusEvent,
assertExecuteTradeCommand,
assertFundingObservationEvent,
assertIntentRequestPreflightEvent,
@ -103,6 +104,19 @@ export function routeHistoryRecord({ topic, event }) {
decision_key: event.payload.alert_event_id,
},
};
case 'ops.environment_status':
assertEnvironmentStatusEvent(event);
return {
table: 'environment_status_events',
record: {
event_id: event.event_id,
observed_at: event.observed_at,
ingested_at: event.ingested_at,
quote_id: null,
pair: null,
decision_key: event.payload.environment_key,
},
};
case 'decision.trade_decision':
assertTradeDecisionEvent(event);
return {

View file

@ -1,3 +1,5 @@
import crypto from 'node:crypto';
const TERMINAL_STATUS_NAMES = new Set([
'resolved',
'completed',
@ -48,22 +50,73 @@ export function normalizeNearIntentsStatus({
incidents.flatMap((incident) => incident.impacts.map((impact) => impact.service_name || impact.service_id)),
)].filter(Boolean);
const primaryIncident = incidents[0] || null;
return {
const status = incidents.length > 0 ? 'disrupted' : 'operational';
const label = incidents.length > 0 ? 'upstream paused' : 'operational';
const decisiveReason = primaryIncident
? [primaryIncident.title, primaryIncident.message_text].filter(Boolean).join(': ')
: 'NEAR Intents status page reports no active incident.';
const quotingStopped = incidents.some((incident) => /1click|quoting|solver|swap/i.test(
`${incident.title || ''} ${incident.message_text || ''}`,
));
const normalized = {
observed_at: observedAt,
source: 'near_intents_status_page',
status: incidents.length > 0 ? 'disrupted' : 'operational',
label: incidents.length > 0 ? 'upstream paused' : 'operational',
status,
label,
current_incident_count: incidents.length,
current_incidents: incidents,
affected_services: affectedServices,
quoting_stopped: incidents.some((incident) => /1click|quoting|solver|swap/i.test(
`${incident.title || ''} ${incident.message_text || ''}`,
)),
decisive_reason: primaryIncident
? [primaryIncident.title, primaryIncident.message_text].filter(Boolean).join(': ')
: 'NEAR Intents status page reports no active incident.',
quoting_stopped: quotingStopped,
decisive_reason: decisiveReason,
};
return {
...normalized,
status_fingerprint: buildNearIntentsStatusFingerprint(normalized),
};
}
export function buildNearIntentsStatusEventPayload(status, {
changedAt = new Date().toISOString(),
previousFingerprint = null,
} = {}) {
const statusFingerprint = status.status_fingerprint || buildNearIntentsStatusFingerprint(status);
return {
environment_status_id: `near-intents-status-${changedAt}-${statusFingerprint.slice(0, 12)}`,
environment_key: 'near_intents_status_page',
source: status.source || 'near_intents_status_page',
status: status.status || 'unknown',
label: status.label || status.status || 'unknown',
status_fingerprint: statusFingerprint,
previous_status_fingerprint: previousFingerprint,
observed_at: status.observed_at || changedAt,
changed_at: changedAt,
decisive_reason: status.decisive_reason || null,
current_incident_count: status.current_incident_count || 0,
current_incidents: status.current_incidents || [],
affected_services: status.affected_services || [],
quoting_stopped: status.quoting_stopped ?? null,
};
}
export function buildNearIntentsStatusFingerprint(status = {}) {
const stable = {
status: status.status || 'unknown',
label: status.label || null,
decisive_reason: status.decisive_reason || null,
current_incidents: (status.current_incidents || []).map((incident) => ({
id: incident.id || null,
title: incident.title || null,
status: incident.status || null,
severity: incident.severity || null,
last_update_at: incident.last_update_at || null,
message_text: incident.message_text || null,
impacts: incident.impacts || [],
})),
affected_services: [...(status.affected_services || [])].sort(),
quoting_stopped: status.quoting_stopped ?? null,
};
return crypto.createHash('sha256').update(JSON.stringify(stable)).digest('hex');
}
function normalizePost(post, { statusById, severityById, serviceById }) {

View file

@ -302,6 +302,7 @@ export function createDashboardLiveState({
recentSubmissionCount = 0,
lastSubmissionAt = null,
activeAlerts = [],
nearIntentsStatus = null,
} = {}) {
const state = {
config,
@ -319,6 +320,7 @@ export function createDashboardLiveState({
latest_inventory: latestInventory?.payload || latestInventory || null,
recent_submission_count: Number(recentSubmissionCount || 0),
last_submission_at: lastSubmissionAt || null,
near_intents_status: nearIntentsStatus,
active_alerts: new Map(),
};
@ -410,6 +412,23 @@ export function applyDashboardLiveEvent(state, { topic, event }) {
case 'ops.alert': {
return [];
}
case 'ops.environment_status': {
state.near_intents_status = event.payload;
return [
{
type: 'status_bar.updated',
status_bar: buildLiveStatusBar(state),
},
{
type: 'environment_status.updated',
environment_status: {
observed_at: event.observed_at || event.payload.observed_at || null,
ingested_at: event.ingested_at || null,
payload: event.payload,
},
},
];
}
case 'exec.trade_result': {
const execution = normalizeLiveExecutionResult(event.payload, event);
if (!execution) return [];
@ -453,6 +472,7 @@ export function buildDashboardBootstrap({
recentQuoteOutcomes = [],
recentIntentRequests = [],
recentAlertTransitions,
recentEnvironmentStatuses = [],
serviceSnapshots,
nearIntentsStatus = null,
sourceErrors = [],
@ -487,6 +507,11 @@ export function buildDashboardBootstrap({
config,
submissionPage,
});
const environmentStatus = buildEnvironmentStatusHistory({
recentEnvironmentStatuses,
currentStatus: nearIntentsStatus,
});
const effectiveNearIntentsStatus = nearIntentsStatus || environmentStatus.current;
return {
session: auth,
@ -499,7 +524,7 @@ export function buildDashboardBootstrap({
marketPrice,
activeAlerts,
servicesByName,
nearIntentsStatus,
nearIntentsStatus: effectiveNearIntentsStatus,
}),
funds: {
profitability,
@ -538,7 +563,8 @@ export function buildDashboardBootstrap({
servicesByName,
activeAlerts,
recentAlerts,
nearIntentsStatus,
nearIntentsStatus: effectiveNearIntentsStatus,
environmentStatus,
}),
};
}
@ -618,6 +644,10 @@ export function buildProfitabilitySummary({ metric, submissionSummary } = {}) {
export function buildLiveStatusBar(state) {
return {
near_intents_upstream_status: state.near_intents_status?.status || null,
near_intents_upstream_label: state.near_intents_status?.label || null,
near_intents_upstream_reason: state.near_intents_status?.decisive_reason || null,
near_intents_upstream_observed_at: state.near_intents_status?.observed_at || null,
latest_reference_price_eure_per_btc: state.latest_market_price?.eure_per_btc || null,
market_observed_at:
state.latest_market_price?.observed_at
@ -1453,7 +1483,34 @@ function summarizeGrossEdgeEstimate(rows = []) {
};
}
function buildSystemSummary({ servicesByName, activeAlerts, recentAlerts, nearIntentsStatus = null }) {
function buildEnvironmentStatusHistory({ recentEnvironmentStatuses = [], currentStatus = null } = {}) {
const recentChanges = (recentEnvironmentStatuses || []).map((entry) => ({
observed_at: entry.observed_at || entry.payload?.observed_at || null,
ingested_at: entry.ingested_at || null,
payload: entry.payload || entry,
}));
const latestDurable = recentChanges[0]?.payload || null;
const current = currentStatus || latestDurable;
return {
current,
recent_changes: recentChanges,
latest_durable_change_at:
recentChanges[0]?.payload?.changed_at
|| recentChanges[0]?.observed_at
|| recentChanges[0]?.ingested_at
|| null,
change_count: recentChanges.length,
};
}
function buildSystemSummary({
servicesByName,
activeAlerts,
recentAlerts,
nearIntentsStatus = null,
environmentStatus = { current: null, recent_changes: [] },
}) {
const historyWriterState = servicesByName['history-writer']?.state || {};
void activeAlerts;
void recentAlerts;
@ -1470,11 +1527,15 @@ function buildSystemSummary({ servicesByName, activeAlerts, recentAlerts, nearIn
active: [],
recent: [],
},
environment_status: environmentStatus,
persistence: {
database_connectivity: historyWriterState.database_connectivity ?? null,
last_write_at: historyWriterState.last_write_at || null,
last_alert_write_at: historyWriterState.last_alert_write_at || null,
last_funding_observation_write_at: historyWriterState.last_funding_observation_write_at || null,
last_environment_status_write_at: historyWriterState.last_environment_status_write_at || null,
last_environment_status_seen_at: historyWriterState.last_environment_status_seen_at || null,
last_environment_status_duplicate_at: historyWriterState.last_environment_status_duplicate_at || null,
last_metrics_at: historyWriterState.last_metrics_at || null,
last_quote_outcomes_at: historyWriterState.last_quote_outcomes_at || null,
latest_portfolio_metrics: historyWriterState.latest_portfolio_metrics || null,
@ -1934,6 +1995,7 @@ function normalizeDashboardLiveTopic(state, topic) {
[config.kafkaTopicRefMarketPrice, 'ref.market_price'],
[config.kafkaTopicStateIntentInventory, 'state.intent_inventory'],
[config.kafkaTopicOpsAlert, 'ops.alert'],
[config.kafkaTopicOpsEnvironmentStatus, 'ops.environment_status'],
[config.kafkaTopicExecTradeResult, 'exec.trade_result'],
]);
return aliases.get(topic) || topic;

View file

@ -122,6 +122,31 @@ export function assertOpsAlertEvent(event) {
return event;
}
export function assertEnvironmentStatusEvent(event) {
assertEventEnvelope(event);
if (event.event_type !== 'environment_status') {
throw new Error(`Unexpected event_type: ${event.event_type}`);
}
const payload = event.payload;
requireString(payload.environment_status_id, 'payload.environment_status_id');
requireString(payload.environment_key, 'payload.environment_key');
requireString(payload.source, 'payload.source');
requireString(payload.status, 'payload.status');
requireString(payload.label, 'payload.label');
requireString(payload.status_fingerprint, 'payload.status_fingerprint');
requireString(payload.observed_at, 'payload.observed_at');
requireString(payload.changed_at, 'payload.changed_at');
if (payload.previous_status_fingerprint != null) {
requireString(payload.previous_status_fingerprint, 'payload.previous_status_fingerprint');
}
if (payload.decisive_reason != null) requireString(payload.decisive_reason, 'payload.decisive_reason');
requireNumber(payload.current_incident_count, 'payload.current_incident_count');
if (!Array.isArray(payload.current_incidents)) throw new Error('Missing payload.current_incidents');
if (!Array.isArray(payload.affected_services)) throw new Error('Missing payload.affected_services');
return event;
}
export function assertTradeDecisionEvent(event) {
assertEventEnvelope(event);
if (event.event_type !== 'trade_decision') throw new Error(`Unexpected event_type: ${event.event_type}`);

View file

@ -29,6 +29,7 @@ const DEFAULTS = {
kafkaTopicOpsLiquidityAction: 'ops.liquidity_action',
kafkaTopicOpsFundingObservation: 'ops.funding_observation',
kafkaTopicOpsAlert: 'ops.alert',
kafkaTopicOpsEnvironmentStatus: 'ops.environment_status',
kafkaTopicDecisionTradeDecision: 'decision.trade_decision',
kafkaTopicCmdExecuteTrade: 'cmd.execute_trade',
kafkaTopicExecTradeResult: 'exec.trade_result',
@ -109,6 +110,7 @@ const DEFAULTS = {
nearIntentsStatusPostsUrl: 'https://status.near-intents.org/api/posts?is_featured=true&limit=500',
nearIntentsStatusPostEnumsUrl: 'https://status.near-intents.org/api/post_enums',
nearIntentsStatusTimeoutMs: 3_000,
nearIntentsStatusPollMs: 60_000,
notificationNtfyBaseUrl: '',
notificationNtfyTopic: 'unrip',
notificationNtfyToken: '',
@ -350,6 +352,8 @@ export function loadConfig({ envPath = '.env' } = {}) {
process.env.KAFKA_TOPIC_OPS_FUNDING_OBSERVATION || DEFAULTS.kafkaTopicOpsFundingObservation,
kafkaTopicOpsAlert:
process.env.KAFKA_TOPIC_OPS_ALERT || DEFAULTS.kafkaTopicOpsAlert,
kafkaTopicOpsEnvironmentStatus:
process.env.KAFKA_TOPIC_OPS_ENVIRONMENT_STATUS || DEFAULTS.kafkaTopicOpsEnvironmentStatus,
kafkaTopicDecisionTradeDecision:
process.env.KAFKA_TOPIC_DECISION_TRADE_DECISION || DEFAULTS.kafkaTopicDecisionTradeDecision,
kafkaTopicCmdExecuteTrade:
@ -589,6 +593,10 @@ export function loadConfig({ envPath = '.env' } = {}) {
process.env.NEAR_INTENTS_STATUS_TIMEOUT_MS,
DEFAULTS.nearIntentsStatusTimeoutMs,
),
nearIntentsStatusPollMs: parseNumber(
process.env.NEAR_INTENTS_STATUS_POLL_MS,
DEFAULTS.nearIntentsStatusPollMs,
),
notificationNtfyBaseUrl:
process.env.NOTIFICATION_NTFY_BASE_URL || DEFAULTS.notificationNtfyBaseUrl,
notificationNtfyTopic:

View file

@ -11,6 +11,7 @@ const TABLES = [
'liquidity_actions',
'funding_observations',
'ops_alerts',
'environment_status_events',
'trade_decisions',
'execute_trade_commands',
'trade_execution_results',
@ -102,6 +103,21 @@ export async function ensureHistorySchema(pool) {
table: 'ops_alerts',
expression: "(payload->>'asset_id')",
});
await ensureExpressionIndex(pool, {
name: 'environment_status_events_key_idx',
table: 'environment_status_events',
expression: "(payload->>'environment_key')",
});
await ensureExpressionIndex(pool, {
name: 'environment_status_events_status_idx',
table: 'environment_status_events',
expression: "(payload->>'status')",
});
await ensureExpressionIndex(pool, {
name: 'environment_status_events_fingerprint_idx',
table: 'environment_status_events',
expression: "(payload->>'status_fingerprint')",
});
await pool.query(`
CREATE TABLE IF NOT EXISTS ${PORTFOLIO_METRICS_TABLE} (
@ -260,6 +276,63 @@ export async function insertHistoryEvent(pool, { table, topic, event, record })
);
}
export async function insertEnvironmentStatusChange(pool, { topic, event, record }) {
const fingerprint = event.payload?.status_fingerprint || null;
const environmentKey = event.payload?.environment_key || record.decision_key || null;
const result = await pool.query(
`
WITH latest AS (
SELECT payload->>'status_fingerprint' AS status_fingerprint
FROM environment_status_events
WHERE decision_key = $10
ORDER BY COALESCE(observed_at, ingested_at) DESC, ingested_at DESC
LIMIT 1
)
INSERT INTO environment_status_events (
event_id,
topic,
venue,
source,
event_type,
observed_at,
ingested_at,
quote_id,
pair,
decision_key,
payload,
raw
)
SELECT $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11::jsonb,$12::jsonb
WHERE NOT EXISTS (
SELECT 1 FROM latest WHERE status_fingerprint = $13
)
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id
`,
[
event.event_id,
topic,
event.venue,
event.source,
event.event_type,
event.observed_at,
event.ingested_at,
record.quote_id,
record.pair,
environmentKey,
JSON.stringify(event.payload),
event.raw ? JSON.stringify(event.raw) : null,
fingerprint,
],
);
return {
inserted: result.rowCount > 0,
status_fingerprint: fingerprint,
environment_key: environmentKey,
};
}
export async function loadPortfolioMetricInputs(pool, {
btcAsset = null,
eureAsset = null,
@ -1114,6 +1187,24 @@ export async function loadRecentAlertTransitions(pool, { limit = 20 } = {}) {
}));
}
export async function loadRecentEnvironmentStatuses(pool, { limit = 20 } = {}) {
const result = await pool.query(
`
SELECT observed_at, ingested_at, payload
FROM environment_status_events
ORDER BY COALESCE(observed_at, ingested_at) DESC
LIMIT $1
`,
[Math.max(1, Number(limit) || 20)],
);
return result.rows.map((row) => ({
observed_at: toIsoTimestamp(row.observed_at),
ingested_at: toIsoTimestamp(row.ingested_at),
payload: row.payload,
}));
}
export async function loadRecentTradeDecisions(pool, { limit = 20 } = {}) {
const result = await pool.query(
`

View file

@ -44,6 +44,61 @@ export default function SystemPage({ system, onControl }) {
</div>
</section>
<section className="panel">
<div className="panel-head">
<div>
<div className="eyebrow">Environmental conditions</div>
<h3>NEAR Intents upstream status changes</h3>
<div className="panel-subtitle">
Stored only when the normalized official status changes, not on every poll.
</div>
</div>
</div>
<div className="metric-grid">
<MetricCard
label="Current upstream status"
meta={system.environment_status?.current?.decisive_reason || 'No durable status reason stored yet'}
value={system.environment_status?.current?.label || system.environment_status?.current?.status || 'Unavailable'}
/>
<MetricCard
label="Latest stored change"
meta="Deduped by status fingerprint"
value={formatTimestamp(system.environment_status?.latest_durable_change_at)}
/>
</div>
<TableFrame style={{ marginTop: 14 }}>
<table>
<thead>
<tr>
<th>Changed at</th>
<th>Status</th>
<th>Reason</th>
<th>Fingerprint</th>
</tr>
</thead>
<tbody>
{system.environment_status?.recent_changes?.length ? (
system.environment_status.recent_changes.map((entry) => {
const payload = entry.payload || {};
return (
<tr key={payload.environment_status_id || payload.status_fingerprint || entry.ingested_at}>
<td>{formatTimestamp(payload.changed_at || entry.observed_at || entry.ingested_at)}</td>
<td>{payload.label || payload.status || 'Unavailable'}</td>
<td>{payload.decisive_reason || 'No reason stored'}</td>
<td className="mono">{payload.status_fingerprint || 'Unavailable'}</td>
</tr>
);
})
) : (
<tr>
<td colSpan="4">No environment status changes stored yet.</td>
</tr>
)}
</tbody>
</table>
</TableFrame>
</section>
<section className="panel">
<div className="panel-head">
<div>

View file

@ -74,6 +74,31 @@ function applySocketMessage(dashboard, payload, session) {
},
},
};
case 'environment_status.updated': {
const existing = dashboard.system?.environment_status || { recent_changes: [] };
return {
session,
dashboard: {
...dashboard,
system: {
...dashboard.system,
environment_status: {
...existing,
current: payload.environment_status?.payload || existing.current || null,
recent_changes: [
payload.environment_status,
...(existing.recent_changes || []),
].filter(Boolean).slice(0, 20),
latest_durable_change_at:
payload.environment_status?.payload?.changed_at
|| payload.environment_status?.observed_at
|| existing.latest_durable_change_at
|| null,
},
},
},
};
}
default:
return { dashboard, session };
}

View file

@ -0,0 +1,82 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { buildEventEnvelope } from '../src/core/event-envelope.mjs';
import { routeHistoryRecord } from '../src/core/history-records.mjs';
import { insertEnvironmentStatusChange } from '../src/lib/postgres.mjs';
function buildEnvironmentEvent({ fingerprint, status = 'disrupted', changedAt = '2026-04-16T12:40:00.000Z' }) {
return buildEventEnvelope({
source: 'ops-sentinel',
venue: 'near_intents',
eventType: 'environment_status',
observedAt: changedAt,
ingestedAt: changedAt,
payload: {
environment_status_id: `status-${fingerprint}-${changedAt}`,
environment_key: 'near_intents_status_page',
source: 'near_intents_status_page',
status,
label: status === 'operational' ? 'operational' : 'upstream paused',
status_fingerprint: fingerprint,
previous_status_fingerprint: null,
observed_at: changedAt,
changed_at: changedAt,
decisive_reason: status === 'operational' ? 'No active incident.' : '1Click quoting paused.',
current_incident_count: status === 'operational' ? 0 : 1,
current_incidents: [],
affected_services: [],
quoting_stopped: status !== 'operational',
},
});
}
function createDedupePool() {
const rows = [];
return {
rows,
async query(_sql, params) {
const environmentKey = params[9];
const payload = JSON.parse(params[10]);
const fingerprint = params[12];
const latest = rows.filter((row) => row.decision_key === environmentKey).at(-1);
if (latest?.payload.status_fingerprint === fingerprint) {
return { rowCount: 0, rows: [] };
}
rows.push({
event_id: params[0],
decision_key: environmentKey,
payload,
});
return { rowCount: 1, rows: [{ event_id: params[0] }] };
},
};
}
test('environment status events route into their own durable history table', () => {
const event = buildEnvironmentEvent({ fingerprint: 'fp-1' });
const routed = routeHistoryRecord({
topic: 'ops.environment_status',
event,
});
assert.equal(routed.table, 'environment_status_events');
assert.equal(routed.record.decision_key, 'near_intents_status_page');
assert.equal(routed.record.quote_id, null);
});
test('environment status persistence stores only fingerprint changes', async () => {
const pool = createDedupePool();
const first = buildEnvironmentEvent({ fingerprint: 'fp-1', changedAt: '2026-04-16T12:40:00.000Z' });
const duplicate = buildEnvironmentEvent({ fingerprint: 'fp-1', changedAt: '2026-04-16T12:41:00.000Z' });
const changed = buildEnvironmentEvent({ fingerprint: 'fp-2', status: 'operational', changedAt: '2026-04-16T12:42:00.000Z' });
const firstRoute = routeHistoryRecord({ topic: 'ops.environment_status', event: first });
const duplicateRoute = routeHistoryRecord({ topic: 'ops.environment_status', event: duplicate });
const changedRoute = routeHistoryRecord({ topic: 'ops.environment_status', event: changed });
assert.equal((await insertEnvironmentStatusChange(pool, { topic: 'ops.environment_status', event: first, record: firstRoute.record })).inserted, true);
assert.equal((await insertEnvironmentStatusChange(pool, { topic: 'ops.environment_status', event: duplicate, record: duplicateRoute.record })).inserted, false);
assert.equal((await insertEnvironmentStatusChange(pool, { topic: 'ops.environment_status', event: changed, record: changedRoute.record })).inserted, true);
assert.deepEqual(pool.rows.map((row) => row.payload.status_fingerprint), ['fp-1', 'fp-2']);
});

View file

@ -1,7 +1,10 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { normalizeNearIntentsStatus } from '../src/core/near-intents-status.mjs';
import {
buildNearIntentsStatusEventPayload,
normalizeNearIntentsStatus,
} from '../src/core/near-intents-status.mjs';
const postEnumsResponse = {
post_enums: [
@ -81,3 +84,75 @@ test('resolved NEAR Intents status posts do not make the relay look disrupted',
assert.equal(normalized.current_incident_count, 0);
assert.match(normalized.decisive_reason, /no active incident/i);
});
test('NEAR Intents status fingerprint is stable across polls and changes on official updates', () => {
const first = normalizeNearIntentsStatus({
observedAt: '2026-04-16T12:40:00.000Z',
servicesResponse,
postEnumsResponse,
postsResponse: {
posts: [{
id: 'PM7LK6N',
title: '1Click Quoting is temporarily stopped',
post_type: 'incident',
latest_update: {
status_id: 'PSCS3IV',
severity_id: 'P187122',
reported_at: 1776342420000,
impacts: [{ service_id: 'PXQFSY1', severity_id: 'PCIGMKW' }],
message: '<p>The protocol is paused.</p>',
},
}],
},
});
const samePollLater = normalizeNearIntentsStatus({
observedAt: '2026-04-16T12:41:00.000Z',
servicesResponse,
postEnumsResponse,
postsResponse: {
posts: [{
id: 'PM7LK6N',
title: '1Click Quoting is temporarily stopped',
post_type: 'incident',
latest_update: {
status_id: 'PSCS3IV',
severity_id: 'P187122',
reported_at: 1776342420000,
impacts: [{ service_id: 'PXQFSY1', severity_id: 'PCIGMKW' }],
message: '<p>The protocol is paused.</p>',
},
}],
},
});
const updatedOfficialMessage = normalizeNearIntentsStatus({
observedAt: '2026-04-16T12:42:00.000Z',
servicesResponse,
postEnumsResponse,
postsResponse: {
posts: [{
id: 'PM7LK6N',
title: '1Click Quoting is temporarily stopped',
post_type: 'incident',
latest_update: {
status_id: 'PSCS3IV',
severity_id: 'P187122',
reported_at: 1776346020000,
impacts: [{ service_id: 'PXQFSY1', severity_id: 'PCIGMKW' }],
message: '<p>The protocol remains paused for 12 hours.</p>',
},
}],
},
});
assert.equal(first.status_fingerprint, samePollLater.status_fingerprint);
assert.notEqual(first.status_fingerprint, updatedOfficialMessage.status_fingerprint);
const payload = buildNearIntentsStatusEventPayload(updatedOfficialMessage, {
changedAt: '2026-04-16T12:42:00.000Z',
previousFingerprint: first.status_fingerprint,
});
assert.equal(payload.status_fingerprint, updatedOfficialMessage.status_fingerprint);
assert.equal(payload.previous_status_fingerprint, first.status_fingerprint);
assert.match(payload.environment_status_id, /^near-intents-status-/);
});

View file

@ -7,6 +7,7 @@ const fundsSource = readFileSync(new URL('../src/operator-dashboard/static/pages
const stylesSource = readFileSync(new URL('../src/operator-dashboard/static/styles.css', import.meta.url), 'utf8');
const serviceCardSource = readFileSync(new URL('../src/operator-dashboard/static/components/ServiceCard.jsx', import.meta.url), 'utf8');
const statusBarSource = readFileSync(new URL('../src/operator-dashboard/static/components/StatusBar.jsx', import.meta.url), 'utf8');
const systemSource = readFileSync(new URL('../src/operator-dashboard/static/pages/SystemPage.jsx', import.meta.url), 'utf8');
test('strategy page owns consolidated quote lifecycle and successful trade tables', () => {
assert.match(strategySource, /Quote lifecycle/);
@ -55,3 +56,11 @@ test('dashboard UI exposes official NEAR upstream status separately from local f
assert.match(serviceCardSource, /Upstream at/);
assert.match(serviceCardSource, /decisive_reason/);
});
test('system page exposes deduped environmental conditions history', () => {
assert.match(systemSource, /Environmental conditions/);
assert.match(systemSource, /Stored only when the normalized official status changes/);
assert.match(systemSource, /NEAR Intents upstream status changes/);
assert.match(systemSource, /status_fingerprint/);
});

View file

@ -43,6 +43,7 @@ function buildConfig() {
kafkaTopicRefMarketPrice: 'ref.market_price',
kafkaTopicStateIntentInventory: 'state.intent_inventory',
kafkaTopicOpsAlert: 'ops.alert',
kafkaTopicOpsEnvironmentStatus: 'ops.environment_status',
kafkaTopicExecTradeResult: 'exec.trade_result',
tradingBtc,
tradingEure,
@ -221,6 +222,24 @@ test('basic auth resolves operator identity and reuses a session cookie', () =>
assert.equal(second.via, 'session_cookie');
});
test('live status bar preserves initial upstream status during websocket session ready', () => {
const config = buildConfig();
const state = createDashboardLiveState({
config,
nearIntentsStatus: {
status: 'disrupted',
label: 'upstream paused',
observed_at: '2026-04-16T12:40:00.000Z',
decisive_reason: '1Click quoting paused.',
},
});
const statusBar = buildLiveStatusBar(state);
assert.equal(statusBar.near_intents_upstream_label, 'upstream paused');
assert.equal(statusBar.near_intents_upstream_reason, '1Click quoting paused.');
});
test('live quote updates stay capped and publish lifecycle rows without refresh', () => {
const config = buildConfig();
const state = createDashboardLiveState({
@ -1617,3 +1636,106 @@ test('dashboard surfaces NEAR upstream disruption without calling submitted work
assert.equal(services['history-writer'].health_status, 'online');
assert.equal(services['history-writer'].upstream_status, null);
});
test('bootstrap exposes deduped environment status history as environmental conditions', () => {
const config = buildConfig();
const dashboard = buildDashboardBootstrap({
config,
auth: { authenticated: true },
portfolioMetric: null,
inventorySnapshot: null,
marketPrice: null,
recentQuotes: [],
submissionPage: { page: 1, page_size: 20, total: 0, total_pages: 1, items: [] },
submissionSummary: { total: 0, last_submission_at: null },
fundingObservations: [],
recentDepositStatuses: [],
recentTradeDecisions: [],
recentExecuteTradeCommands: [],
recentExecutionResults: [],
recentQuoteOutcomes: [],
recentIntentRequests: [],
recentAlertTransitions: [],
recentEnvironmentStatuses: [{
observed_at: '2026-04-16T12:40:00.000Z',
ingested_at: '2026-04-16T12:40:01.000Z',
payload: {
environment_status_id: 'env-1',
environment_key: 'near_intents_status_page',
status: 'disrupted',
label: 'upstream paused',
status_fingerprint: 'fp-1',
observed_at: '2026-04-16T12:40:00.000Z',
changed_at: '2026-04-16T12:40:00.000Z',
decisive_reason: '1Click quoting paused.',
},
}],
serviceSnapshots: [
{
service: 'near-intents-ingest',
label: 'NEAR Intents Ingest',
base_url: 'http://near-intents-ingest',
reachable: true,
health: { ok: true },
state: {},
},
{
service: 'history-writer',
label: 'History Writer',
base_url: 'http://history-writer',
reachable: true,
health: { ok: true },
state: {
last_environment_status_write_at: '2026-04-16T12:40:01.000Z',
last_environment_status_duplicate_at: '2026-04-16T12:41:01.000Z',
},
},
],
});
assert.equal(dashboard.status_bar.near_intents_upstream_label, 'upstream paused');
assert.equal(dashboard.system.environment_status.current.status_fingerprint, 'fp-1');
assert.equal(dashboard.system.environment_status.recent_changes.length, 1);
assert.equal(dashboard.system.persistence.last_environment_status_write_at, '2026-04-16T12:40:01.000Z');
assert.equal(dashboard.system.persistence.last_environment_status_duplicate_at, '2026-04-16T12:41:01.000Z');
});
test('live environment status updates status bar and dashboard history without refresh', () => {
const config = buildConfig();
const state = createDashboardLiveState({ config });
const updates = applyDashboardLiveEvent(state, {
topic: config.kafkaTopicOpsEnvironmentStatus,
event: {
observed_at: '2026-04-16T12:40:00.000Z',
ingested_at: '2026-04-16T12:40:01.000Z',
payload: {
environment_status_id: 'env-live-1',
environment_key: 'near_intents_status_page',
status: 'disrupted',
label: 'upstream paused',
status_fingerprint: 'fp-live-1',
observed_at: '2026-04-16T12:40:00.000Z',
changed_at: '2026-04-16T12:40:00.000Z',
decisive_reason: '1Click quoting paused.',
},
},
});
assert.equal(updates[0].type, 'status_bar.updated');
assert.equal(updates[0].status_bar.near_intents_upstream_label, 'upstream paused');
assert.equal(updates[1].type, 'environment_status.updated');
assert.equal(updates[1].environment_status.payload.status_fingerprint, 'fp-live-1');
const dashboard = {
funds: { profitability: {} },
status_bar: {},
system: { environment_status: { recent_changes: [] } },
};
const reduced = dashboardReducer({ dashboard, session: { authenticated: true } }, {
type: 'socket.message.received',
payload: updates[1],
});
assert.equal(reduced.dashboard.system.environment_status.current.status_fingerprint, 'fp-live-1');
assert.equal(reduced.dashboard.system.environment_status.recent_changes.length, 1);
});

View file

@ -11,3 +11,12 @@ test('ops sentinel imports executor containment guard used by runtime evaluation
/import\s*\{[\s\S]*shouldContainExecutorForAlerts[\s\S]*\}\s*from\s*'\.\.\/core\/runtime-health\.mjs';/,
);
});
test('ops sentinel polls official NEAR status and publishes environment status without alerting', () => {
assert.match(source, /pollNearIntentsEnvironmentStatus/);
assert.match(source, /kafkaTopicOpsEnvironmentStatus/);
assert.match(source, /buildNearIntentsStatusEventPayload/);
assert.match(source, /normalized\.status_fingerprint === state\.last_environment_status_fingerprint/);
assert.match(source, /assertEnvironmentStatusEvent/);
});