From 860471f2676de36a6db6859c66102834cd86fb80 Mon Sep 17 00:00:00 2001 From: philipp Date: Fri, 3 Apr 2026 17:50:39 +0200 Subject: [PATCH] Add pre-credit funding visibility and durable alerts Proof: Implement the active turn for pre-credit funding visibility and durable operator alerts while keeping spendable inventory truth limited to bridge/verifier credit. Assumptions: The BTC deposit handle can be observed through a mempool.space-compatible API, bridge recent_deposits remains the credit truth for correlation, and pausing market-reference-ingest or inventory-sync briefly for alert validation is safe without disarming strategy or executor. Still fake: Gnosis pre-credit observation is not implemented, executor failure alert validation may still depend on an existing real failure unless a separate live failure is explicitly approved, and a new live deposit is still required to prove a fresh pre-credit-to-credit path if no suitable recent funding exists. --- .env.example | 15 + compose.yml | 9 + deploy/k8s/base/bootstrap-job.yaml | 2 +- deploy/k8s/base/unrip.yaml | 45 +++ deploy/redpanda/rpk-topics.txt | 2 + package.json | 1 + scripts/deploy/bootstrap.sh | 2 +- src/apps/history-writer.mjs | 10 + src/apps/inventory-sync.mjs | 63 +++-- src/apps/liquidity-manager.mjs | 312 ++++++++++++++++++++- src/apps/ops-sentinel.mjs | 220 +++++++++++++++ src/core/alert-engine.mjs | 370 +++++++++++++++++++++++++ src/core/funding-observations.mjs | 225 +++++++++++++++ src/core/history-records.mjs | 28 ++ src/core/liquidity-state.mjs | 8 + src/core/schemas.mjs | 48 ++++ src/lib/config.mjs | 51 ++++ src/lib/postgres.mjs | 50 ++++ src/observers/btc-address-observer.mjs | 95 +++++++ test/alert-engine.test.mjs | 103 +++++++ test/funding-observations.test.mjs | 110 ++++++++ test/liquidity-state.test.mjs | 7 + 22 files changed, 1745 insertions(+), 31 deletions(-) create mode 100644 src/apps/ops-sentinel.mjs create mode 100644 src/core/alert-engine.mjs create mode 100644 src/core/funding-observations.mjs create mode 100644 src/observers/btc-address-observer.mjs create mode 100644 test/alert-engine.test.mjs create mode 100644 test/funding-observations.test.mjs diff --git a/.env.example b/.env.example index 4faacd5..8547df9 100644 --- a/.env.example +++ b/.env.example @@ -37,6 +37,8 @@ STRATEGY_ENGINE_CONTROL_HOST=0.0.0.0 STRATEGY_ENGINE_CONTROL_PORT=8086 TRADE_EXECUTOR_CONTROL_HOST=0.0.0.0 TRADE_EXECUTOR_CONTROL_PORT=8087 +OPS_SENTINEL_CONTROL_HOST=0.0.0.0 +OPS_SENTINEL_CONTROL_PORT=8088 # Kafka backbone KAFKA_BROKERS=redpanda:9092 @@ -46,6 +48,8 @@ KAFKA_TOPIC_NORM_SWAP_DEMAND=norm.swap_demand KAFKA_TOPIC_REF_MARKET_PRICE=ref.market_price KAFKA_TOPIC_STATE_INTENT_INVENTORY=state.intent_inventory KAFKA_TOPIC_OPS_LIQUIDITY_ACTION=ops.liquidity_action +KAFKA_TOPIC_OPS_FUNDING_OBSERVATION=ops.funding_observation +KAFKA_TOPIC_OPS_ALERT=ops.alert KAFKA_TOPIC_DECISION_TRADE_DECISION=decision.trade_decision KAFKA_TOPIC_CMD_EXECUTE_TRADE=cmd.execute_trade KAFKA_TOPIC_EXEC_TRADE_RESULT=exec.trade_result @@ -53,6 +57,7 @@ KAFKA_CONSUMER_GROUP_HISTORY=history-writer-v1 KAFKA_CONSUMER_GROUP_INVENTORY=inventory-sync-v1 KAFKA_CONSUMER_GROUP_STRATEGY=strategy-engine-v1 KAFKA_CONSUMER_GROUP_EXECUTOR=trade-executor-v1 +KAFKA_CONSUMER_GROUP_OPS_SENTINEL=ops-sentinel-v1 # PostgreSQL durable history store POSTGRES_URL=postgresql://unrip:unrip@postgres:5432/unrip @@ -79,3 +84,13 @@ STRATEGY_INVENTORY_MAX_AGE_MS=30000 EXECUTOR_INITIAL_ARMED=false EXECUTOR_RESPONSE_TIMEOUT_MS=10000 LIQUIDITY_WITHDRAWALS_FROZEN=true + +# Pre-credit funding visibility and alerting +BTC_FUNDING_OBSERVER_ENABLED=true +BTC_FUNDING_OBSERVER_BASE_URL=https://mempool.space/api +FUNDING_OBSERVATION_STUCK_MS=3600000 +OPS_SENTINEL_EVALUATION_MS=5000 +OPS_SENTINEL_PRICE_STALE_MS=30000 +OPS_SENTINEL_INVENTORY_STALE_MS=30000 +OPS_SENTINEL_FUNDING_CREDIT_PENDING_MS=300000 +OPS_SENTINEL_FUNDING_STUCK_MS=3600000 diff --git a/compose.yml b/compose.yml index 29f3259..8852ccf 100644 --- a/compose.yml +++ b/compose.yml @@ -99,6 +99,15 @@ services: condition: service_healthy restart: unless-stopped + ops-sentinel: + build: . + command: ["node", "src/apps/ops-sentinel.mjs"] + env_file: [.env] + depends_on: + redpanda: + condition: service_healthy + restart: unless-stopped + strategy-engine: build: . command: ["node", "src/apps/strategy-engine.mjs"] diff --git a/deploy/k8s/base/bootstrap-job.yaml b/deploy/k8s/base/bootstrap-job.yaml index cf8d059..eb706f9 100644 --- a/deploy/k8s/base/bootstrap-job.yaml +++ b/deploy/k8s/base/bootstrap-job.yaml @@ -16,7 +16,7 @@ spec: - | set -eu BROKERS="redpanda.unrip.svc.cluster.local:9092" - TOPICS="raw.near_intents.quote norm.swap_demand ref.market_price state.intent_inventory ops.liquidity_action decision.trade_decision cmd.execute_trade exec.trade_result" + TOPICS="raw.near_intents.quote norm.swap_demand ref.market_price state.intent_inventory ops.liquidity_action ops.funding_observation ops.alert decision.trade_decision cmd.execute_trade exec.trade_result" RETENTION_MS="172800000" RETENTION_BYTES="268435456" diff --git a/deploy/k8s/base/unrip.yaml b/deploy/k8s/base/unrip.yaml index dc64416..eebfa14 100644 --- a/deploy/k8s/base/unrip.yaml +++ b/deploy/k8s/base/unrip.yaml @@ -38,6 +38,8 @@ data: STRATEGY_ENGINE_CONTROL_PORT: "8086" TRADE_EXECUTOR_CONTROL_HOST: 0.0.0.0 TRADE_EXECUTOR_CONTROL_PORT: "8087" + OPS_SENTINEL_CONTROL_HOST: 0.0.0.0 + OPS_SENTINEL_CONTROL_PORT: "8088" KAFKA_BROKERS: redpanda.unrip.svc.cluster.local:9092 KAFKA_CLIENT_ID: unrip KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE: raw.near_intents.quote @@ -45,6 +47,8 @@ data: KAFKA_TOPIC_REF_MARKET_PRICE: ref.market_price KAFKA_TOPIC_STATE_INTENT_INVENTORY: state.intent_inventory KAFKA_TOPIC_OPS_LIQUIDITY_ACTION: ops.liquidity_action + KAFKA_TOPIC_OPS_FUNDING_OBSERVATION: ops.funding_observation + KAFKA_TOPIC_OPS_ALERT: ops.alert KAFKA_TOPIC_DECISION_TRADE_DECISION: decision.trade_decision KAFKA_TOPIC_CMD_EXECUTE_TRADE: cmd.execute_trade KAFKA_TOPIC_EXEC_TRADE_RESULT: exec.trade_result @@ -52,6 +56,7 @@ data: KAFKA_CONSUMER_GROUP_INVENTORY: inventory-sync-v1 KAFKA_CONSUMER_GROUP_STRATEGY: strategy-engine-v1 KAFKA_CONSUMER_GROUP_EXECUTOR: trade-executor-v1 + KAFKA_CONSUMER_GROUP_OPS_SENTINEL: ops-sentinel-v1 EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state LIQUIDITY_STATE_DIR: /var/lib/unrip/liquidity-state MARKET_REFERENCE_REFRESH_MS: "5000" @@ -69,6 +74,14 @@ data: EXECUTOR_INITIAL_ARMED: "false" EXECUTOR_RESPONSE_TIMEOUT_MS: "10000" LIQUIDITY_WITHDRAWALS_FROZEN: "true" + BTC_FUNDING_OBSERVER_ENABLED: "true" + BTC_FUNDING_OBSERVER_BASE_URL: https://mempool.space/api + FUNDING_OBSERVATION_STUCK_MS: "3600000" + OPS_SENTINEL_EVALUATION_MS: "5000" + OPS_SENTINEL_PRICE_STALE_MS: "30000" + OPS_SENTINEL_INVENTORY_STALE_MS: "30000" + OPS_SENTINEL_FUNDING_CREDIT_PENDING_MS: "300000" + OPS_SENTINEL_FUNDING_STUCK_MS: "3600000" --- apiVersion: v1 kind: PersistentVolumeClaim @@ -261,6 +274,38 @@ spec: --- apiVersion: apps/v1 kind: Deployment +metadata: + name: ops-sentinel + namespace: unrip +spec: + replicas: 1 + selector: + matchLabels: + app: ops-sentinel + template: + metadata: + labels: + app: ops-sentinel + app.kubernetes.io/part-of: unrip + spec: + imagePullSecrets: + - name: unrip-registry-creds + containers: + - name: app + image: ghcr.io/example/unrip:bootstrap + imagePullPolicy: IfNotPresent + command: ["node", "src/apps/ops-sentinel.mjs"] + ports: + - name: control-api + containerPort: 8088 + envFrom: + - configMapRef: + name: unrip-config + - secretRef: + name: unrip-secrets +--- +apiVersion: apps/v1 +kind: Deployment metadata: name: strategy-engine namespace: unrip diff --git a/deploy/redpanda/rpk-topics.txt b/deploy/redpanda/rpk-topics.txt index 5704dfd..36f3adc 100644 --- a/deploy/redpanda/rpk-topics.txt +++ b/deploy/redpanda/rpk-topics.txt @@ -3,6 +3,8 @@ norm.swap_demand ref.market_price state.intent_inventory ops.liquidity_action +ops.funding_observation +ops.alert decision.trade_decision cmd.execute_trade exec.trade_result diff --git a/package.json b/package.json index 47b825c..e284715 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ "inventory:sync": "node src/apps/inventory-sync.mjs", "liquidity:manager": "node src/apps/liquidity-manager.mjs", "history:writer": "node src/apps/history-writer.mjs", + "ops:sentinel": "node src/apps/ops-sentinel.mjs", "strategy:engine": "node src/apps/strategy-engine.mjs", "trade:executor": "node src/apps/trade-executor.mjs", "start": "node index.mjs", diff --git a/scripts/deploy/bootstrap.sh b/scripts/deploy/bootstrap.sh index ec2915c..f498947 100755 --- a/scripts/deploy/bootstrap.sh +++ b/scripts/deploy/bootstrap.sh @@ -10,7 +10,7 @@ FORGEJO_REMOTE_NAME="${FORGEJO_REMOTE_NAME:-forgejo}" PROJECT_NAME="${PROJECT_NAME:-unrip}" PROJECT_NAMESPACE="${PROJECT_NAMESPACE:-$PROJECT_NAME}" -PROJECT_DEPLOYMENTS="${PROJECT_DEPLOYMENTS:-near-intents-ingest,market-reference-ingest,liquidity-manager,inventory-sync,history-writer,strategy-engine,trade-executor}" +PROJECT_DEPLOYMENTS="${PROJECT_DEPLOYMENTS:-near-intents-ingest,market-reference-ingest,liquidity-manager,inventory-sync,history-writer,ops-sentinel,strategy-engine,trade-executor}" PROJECT_REGISTRY_SECRET_NAME="${PROJECT_REGISTRY_SECRET_NAME:-${PROJECT_NAME}-registry-creds}" APP_SECRET_NAME="${APP_SECRET_NAME:-${PROJECT_NAME}-secrets}" SYNC_FORGEJO_REMOTE="${SYNC_FORGEJO_REMOTE:-1}" diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 5c7ad53..5c35d35 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -41,6 +41,8 @@ const topics = [ config.kafkaTopicRefMarketPrice, config.kafkaTopicStateIntentInventory, config.kafkaTopicOpsLiquidityAction, + config.kafkaTopicOpsFundingObservation, + config.kafkaTopicOpsAlert, config.kafkaTopicDecisionTradeDecision, config.kafkaTopicCmdExecuteTrade, config.kafkaTopicExecTradeResult, @@ -60,6 +62,8 @@ const state = { paused: false, draining: false, last_write_at: null, + last_funding_observation_write_at: null, + last_alert_write_at: null, last_metrics_at: null, last_error: null, error_count: 0, @@ -92,6 +96,12 @@ await consumer.run({ partition, offset: message.offset, }; + if (topic === config.kafkaTopicOpsFundingObservation) { + state.last_funding_observation_write_at = state.last_write_at; + } + if (topic === config.kafkaTopicOpsAlert) { + state.last_alert_write_at = state.last_write_at; + } if (portfolioMetricTopics.has(topic)) { try { await refreshPortfolioMetrics(); diff --git a/src/apps/inventory-sync.mjs b/src/apps/inventory-sync.mjs index 6d067a9..81a3020 100644 --- a/src/apps/inventory-sync.mjs +++ b/src/apps/inventory-sync.mjs @@ -4,9 +4,14 @@ import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; +import { buildFundingVisibility } from '../core/funding-observations.mjs'; import { buildInventorySnapshot } from '../core/inventory.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; -import { assertInventorySnapshotEvent, assertLiquidityActionEvent } from '../core/schemas.mjs'; +import { + assertFundingObservationEvent, + assertInventorySnapshotEvent, + assertLiquidityActionEvent, +} from '../core/schemas.mjs'; import { loadConfig } from '../lib/config.mjs'; import { createNearBridgeClient } from '../venues/near-intents/bridge-client.mjs'; import { createVerifierClient } from '../venues/near-intents/verifier-client.mjs'; @@ -48,6 +53,13 @@ const consumer = await createConsumer({ const state = { paused: false, tracked_withdrawals: {}, + funding_observations: {}, + funding_visibility: { + last_observed_at: null, + pre_credit_inbound: {}, + by_asset: {}, + by_handle: {}, + }, last_snapshot: null, last_sync_at: null, last_error: null, @@ -55,29 +67,42 @@ const state = { }; await consumer.subscribe({ topic: config.kafkaTopicOpsLiquidityAction, fromBeginning: true }); +await consumer.subscribe({ topic: config.kafkaTopicOpsFundingObservation, fromBeginning: true }); await consumer.run({ - eachMessage: async ({ message }) => { + eachMessage: async ({ topic, message }) => { if (!message.value) return; try { const event = parseEventMessage(message.value.toString()); - assertLiquidityActionEvent(event); - if (event.payload.action_type === 'withdrawal_tracked' - || event.payload.action_type === 'withdrawal_status_changed') { - const details = event.payload.details || {}; - if (details.withdrawal_hash) { - state.tracked_withdrawals[details.withdrawal_hash] = { - withdrawal_hash: details.withdrawal_hash, - asset_id: details.asset_id, - chain: details.chain, - amount: String(details.amount || '0'), - status: details.status || event.payload.status, - address: details.address || null, - }; + if (topic === config.kafkaTopicOpsLiquidityAction) { + assertLiquidityActionEvent(event); + if (event.payload.action_type === 'withdrawal_tracked' + || event.payload.action_type === 'withdrawal_status_changed') { + const details = event.payload.details || {}; + if (details.withdrawal_hash) { + state.tracked_withdrawals[details.withdrawal_hash] = { + withdrawal_hash: details.withdrawal_hash, + asset_id: details.asset_id, + chain: details.chain, + amount: String(details.amount || '0'), + status: details.status || event.payload.status, + address: details.address || null, + }; + } } + return; + } + + if (topic === config.kafkaTopicOpsFundingObservation) { + assertFundingObservationEvent(event); + state.funding_observations[event.payload.funding_observation_id] = event.payload; + state.funding_visibility = buildFundingVisibility( + Object.values(state.funding_observations), + { now: new Date().toISOString() }, + ); } } catch (error) { - logger.error('liquidity_action_consume_failed', { - topic: config.kafkaTopicOpsLiquidityAction, + logger.error('inventory_side_input_consume_failed', { + topic, details: { error: serializeError(error), }, @@ -162,6 +187,10 @@ const controlApi = startControlApi({ return { account_id: config.nearIntentsAccountId, ...state, + funding_visibility: buildFundingVisibility( + Object.values(state.funding_observations), + { now: new Date().toISOString() }, + ), }; }, }, diff --git a/src/apps/liquidity-manager.mjs b/src/apps/liquidity-manager.mjs index b4dddab..c80f4ce 100644 --- a/src/apps/liquidity-manager.mjs +++ b/src/apps/liquidity-manager.mjs @@ -3,12 +3,20 @@ import process from 'node:process'; import { createProducer } from '../bus/kafka/producer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope } from '../core/event-envelope.mjs'; +import { + buildFundingObservationKey, + correlateFundingObservation, + hasFundingObservationChanged, + matchBridgeDeposit, + summarizeFundingObservations, +} from '../core/funding-observations.mjs'; import { createJsonStateStore } from '../core/json-state-store.mjs'; import { normalizeLiquidityState } from '../core/liquidity-state.mjs'; import { buildBridgeWithdrawalPlan } from '../core/liquidity-withdrawals.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; -import { assertLiquidityActionEvent } from '../core/schemas.mjs'; +import { assertFundingObservationEvent, assertLiquidityActionEvent } from '../core/schemas.mjs'; import { loadConfig } from '../lib/config.mjs'; +import { createBtcAddressObserver } from '../observers/btc-address-observer.mjs'; import { createNearBridgeClient } from '../venues/near-intents/bridge-client.mjs'; import { createVerifierClient } from '../venues/near-intents/verifier-client.mjs'; @@ -41,21 +49,37 @@ const verifierClient = createVerifierClient({ accountId: config.nearIntentsAccountId, signerPrivateKey: config.nearIntentsSignerPrivateKey, }); +const btcAddressObserver = config.btcFundingObserverEnabled + ? createBtcAddressObserver({ + baseUrl: config.btcFundingObserverBaseUrl, + }) + : null; const store = createJsonStateStore({ stateDir: config.liquidityStateDir, fileName: 'liquidity.json', initialState: { paused: false, + funding_observer_paused: false, withdrawals_frozen: config.withdrawalsFrozen, deposit_addresses: {}, deposits: {}, tracked_withdrawals: {}, supported_tokens: {}, + funding_observations: {}, + funding_observations_by_handle: {}, + funding_visibility_by_asset: {}, + uncredited_funding_total_by_asset: {}, + credit_correlation: {}, + observer_health: {}, last_refresh_at: null, + last_funding_observation_at: null, + funding_observer_last_refresh_at: null, + funding_observer_last_error: null, last_error: null, last_withdrawal_request: null, last_withdrawal_result: null, publish_count: 0, + funding_publish_count: 0, }, }); @@ -65,6 +89,10 @@ const assetsByChain = new Map([ [config.tradingEure.chain, config.tradingEure.assetId], ]); +const fundingObserverByChain = new Map( + btcAddressObserver ? [[config.tradingBtc.chain, btcAddressObserver]] : [], +); + async function refresh() { const state = normalizeLiquidityState(store.getState(), { withdrawalsFrozen: config.withdrawalsFrozen, @@ -85,6 +113,7 @@ async function refresh() { state.last_refresh_at = new Date().toISOString(); state.last_error = null; + applyFundingObservationSummary(state, state.last_refresh_at); store.setState(state); } catch (error) { state.last_error = serializeError(error); @@ -103,11 +132,12 @@ async function refreshChain(chain, state) { accountId: config.nearIntentsAccountId, chain, }); + const refreshedAt = new Date().toISOString(); const previousAddress = state.deposit_addresses[chain]?.address || null; state.deposit_addresses[chain] = { ...(state.deposit_addresses[chain] || {}), ...depositAddress, - refreshed_at: new Date().toISOString(), + refreshed_at: refreshedAt, }; if (previousAddress !== depositAddress.address) { @@ -124,7 +154,8 @@ async function refreshChain(chain, state) { accountId: config.nearIntentsAccountId, chain, }); - for (const deposit of deposits?.deposits || []) { + const bridgeDeposits = deposits?.deposits || []; + for (const deposit of bridgeDeposits) { const key = `${chain}:${deposit.tx_hash || deposit.address}:${deposit.defuse_asset_identifier}`; const assetId = mapDepositAssetId(deposit.defuse_asset_identifier, chain); const normalized = { @@ -149,6 +180,172 @@ async function refreshChain(chain, state) { }, state); } } + + await refreshFundingObservations({ + chain, + state, + fundingHandle: depositAddress.address, + bridgeDeposits, + }); +} + +async function refreshFundingObservations({ chain, state, fundingHandle, bridgeDeposits }) { + const refreshedAt = new Date().toISOString(); + const observer = fundingObserverByChain.get(chain); + if (!fundingHandle) { + state.observer_health[chain] = { + chain, + healthy: false, + configured: false, + supported: Boolean(observer), + paused: state.funding_observer_paused, + source: observer ? 'configured' : 'unsupported', + refreshed_at: refreshedAt, + }; + applyFundingObservationSummary(state, refreshedAt); + return; + } + + if (!observer) { + state.observer_health[chain] = { + chain, + healthy: false, + configured: true, + supported: false, + paused: false, + handle: fundingHandle, + source: 'unsupported', + refreshed_at: refreshedAt, + }; + applyFundingObservationSummary(state, refreshedAt); + return; + } + + if (state.funding_observer_paused) { + state.observer_health[chain] = { + ...(state.observer_health[chain] || {}), + chain, + healthy: true, + configured: true, + supported: true, + paused: true, + handle: fundingHandle, + refreshed_at: refreshedAt, + source: state.observer_health[chain]?.source || 'btc_mempool_space', + }; + applyFundingObservationSummary(state, refreshedAt); + return; + } + + try { + const observed = await observer.listTransactions({ address: fundingHandle }); + for (const tx of observed.transactions) { + const key = buildFundingObservationKey({ + chain, + fundingHandle, + txHash: tx.tx_hash, + }); + const previous = state.funding_observations[key] || null; + const next = correlateFundingObservation({ + existing: previous, + accountId: config.nearIntentsAccountId, + assetId: assetsByChain.get(chain), + chain, + fundingHandle, + source: tx.source || observed.source, + txHash: tx.tx_hash, + amount: tx.amount, + confirmations: tx.confirmations, + observedAt: tx.observed_at || observed.observed_at, + bridgeDeposit: matchBridgeDeposit({ + txHash: tx.tx_hash, + fundingHandle, + bridgeDeposits, + }), + stuckAfterMs: config.fundingObservationStuckMs, + }); + state.funding_observations[key] = next; + if (hasFundingObservationChanged(previous, next)) { + await publishFundingObservation(next, state); + } + } + + for (const [key, previous] of Object.entries(state.funding_observations)) { + if (previous.chain !== chain || previous.funding_handle !== fundingHandle) continue; + + const next = correlateFundingObservation({ + existing: previous, + accountId: previous.account_id, + assetId: previous.asset_id, + chain: previous.chain, + fundingHandle: previous.funding_handle, + source: previous.source, + txHash: previous.tx_hash, + amount: previous.amount, + confirmations: previous.confirmations, + observedAt: refreshedAt, + bridgeDeposit: matchBridgeDeposit({ + txHash: previous.tx_hash, + fundingHandle, + bridgeDeposits, + }), + stuckAfterMs: config.fundingObservationStuckMs, + }); + state.funding_observations[key] = next; + if (hasFundingObservationChanged(previous, next)) { + await publishFundingObservation(next, state); + } + } + + state.funding_observer_last_refresh_at = observed.observed_at || refreshedAt; + state.funding_observer_last_error = null; + state.observer_health[chain] = { + chain, + healthy: true, + configured: true, + supported: true, + paused: false, + handle: fundingHandle, + source: observed.source, + observed_count: observed.transactions.length, + refreshed_at: observed.observed_at || refreshedAt, + }; + applyFundingObservationSummary(state, refreshedAt); + } catch (error) { + state.funding_observer_last_error = serializeError(error); + state.observer_health[chain] = { + chain, + healthy: false, + configured: true, + supported: true, + paused: false, + handle: fundingHandle, + source: 'btc_mempool_space', + refreshed_at: refreshedAt, + error: serializeError(error), + }; + applyFundingObservationSummary(state, refreshedAt); + logger.error('funding_observation_refresh_failed', { + topic: config.kafkaTopicOpsFundingObservation, + details: { + chain, + funding_handle: fundingHandle, + error: serializeError(error), + }, + }); + } +} + +function applyFundingObservationSummary(state, now = new Date().toISOString()) { + const summary = summarizeFundingObservations( + Object.values(state.funding_observations), + { now }, + ); + state.funding_observations_by_handle = summary.funding_observations_by_handle; + state.funding_visibility_by_asset = summary.funding_visibility_by_asset; + state.latest_funding_observation_at = summary.latest_funding_observation_at; + state.uncredited_funding_total_by_asset = summary.uncredited_funding_total_by_asset; + state.credit_correlation = summary.credit_correlation; } async function refreshWithdrawal(tracked, state) { @@ -290,6 +487,21 @@ async function publishAction(payload, state) { state.publish_count += 1; } +async function publishFundingObservation(payload, state) { + const event = buildEventEnvelope({ + source: 'liquidity-manager', + venue: 'near-intents', + eventType: 'funding_observation', + observedAt: payload.last_seen_at, + payload, + }); + assertFundingObservationEvent(event); + await producer.sendJson(config.kafkaTopicOpsFundingObservation, event, { + key: payload.funding_observation_id, + }); + state.funding_publish_count += 1; +} + const timer = setInterval(refresh, config.liquidityRefreshMs); timer.unref?.(); await refresh(); @@ -302,14 +514,7 @@ const controlApi = startControlApi({ namespace: config.projectNamespace, stateProvider: { getState() { - return { - account_id: config.nearIntentsAccountId, - withdrawal_defaults: { - [config.tradingBtc.assetId]: config.tradingBtc.withdrawAddress || null, - [config.tradingEure.assetId]: config.tradingEure.withdrawAddress || null, - }, - ...store.getState(), - }; + return buildPublicState(); }, }, routes: [ @@ -320,7 +525,48 @@ const controlApi = startControlApi({ await refresh(); return { ok: true, - ...store.getState(), + ...buildPublicState(), + }; + }, + }, + { + method: 'POST', + path: '/refresh-funding-observations', + handler: async () => { + await refresh(); + return { + ok: true, + ...buildPublicState(), + }; + }, + }, + { + method: 'POST', + path: '/pause-funding-observer', + handler: () => { + const state = store.getState(); + normalizeLiquidityState(state, { + withdrawalsFrozen: config.withdrawalsFrozen, + }); + state.funding_observer_paused = true; + store.setState(state); + return { ok: true, funding_observer_paused: true }; + }, + }, + { + method: 'POST', + path: '/resume-funding-observer', + handler: async () => { + const state = store.getState(); + normalizeLiquidityState(state, { + withdrawalsFrozen: config.withdrawalsFrozen, + }); + state.funding_observer_paused = false; + store.setState(state); + await refresh(); + return { + ok: true, + ...buildPublicState(), }; }, }, @@ -540,3 +786,45 @@ function inferWithdrawStatusCode(error) { } return 409; } + +function buildPublicState() { + const now = new Date().toISOString(); + const state = normalizeLiquidityState(structuredClone(store.getState()), { + withdrawalsFrozen: config.withdrawalsFrozen, + }); + applyFundingObservationSummary(state, now); + + return { + account_id: config.nearIntentsAccountId, + withdrawal_defaults: { + [config.tradingBtc.assetId]: config.tradingBtc.withdrawAddress || null, + [config.tradingEure.assetId]: config.tradingEure.withdrawAddress || null, + }, + ...state, + observer_health: buildObserverHealth(state.observer_health, { + now, + fundingObserverPaused: state.funding_observer_paused, + }), + observer_age_ms: ageMs(state.funding_observer_last_refresh_at, now), + }; +} + +function buildObserverHealth(observerHealth, { now, fundingObserverPaused }) { + return Object.fromEntries( + Object.entries(observerHealth || {}).map(([chain, health]) => [ + chain, + { + ...health, + paused: fundingObserverPaused || health?.paused || false, + age_ms: ageMs(health?.refreshed_at, now), + }, + ]), + ); +} + +function ageMs(from, to) { + const left = Date.parse(from || ''); + const right = Date.parse(to || ''); + if (!Number.isFinite(left) || !Number.isFinite(right)) return null; + return Math.max(0, right - left); +} diff --git a/src/apps/ops-sentinel.mjs b/src/apps/ops-sentinel.mjs new file mode 100644 index 0000000..029aaeb --- /dev/null +++ b/src/apps/ops-sentinel.mjs @@ -0,0 +1,220 @@ +import process from 'node:process'; + +import { createConsumer } from '../bus/kafka/consumer.mjs'; +import { createProducer } from '../bus/kafka/producer.mjs'; +import { startControlApi } from '../core/control-api.mjs'; +import { createAlertEngine } from '../core/alert-engine.mjs'; +import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; +import { createLogger, serializeError } from '../core/log.mjs'; +import { + assertFundingObservationEvent, + assertInventorySnapshotEvent, + assertLiquidityActionEvent, + assertMarketPriceEvent, + assertOpsAlertEvent, + assertTradeResult, +} from '../core/schemas.mjs'; +import { loadConfig } from '../lib/config.mjs'; + +const config = loadConfig(); +const logger = createLogger({ + service: 'ops-sentinel', + component: 'alerts', + namespace: config.projectNamespace, +}); + +const producer = await createProducer({ + brokers: config.kafkaBrokers, + clientId: config.kafkaClientId, + logger, +}); +const consumer = await createConsumer({ + groupId: config.kafkaConsumerGroupOpsSentinel, + brokers: config.kafkaBrokers, + clientId: config.kafkaClientId, + logger, +}); + +const topics = [ + config.kafkaTopicRefMarketPrice, + config.kafkaTopicStateIntentInventory, + config.kafkaTopicOpsLiquidityAction, + config.kafkaTopicOpsFundingObservation, + config.kafkaTopicExecTradeResult, +]; + +const state = { + paused: false, + last_error: null, + last_event_at: null, + publish_count: 0, +}; + +const alertEngine = createAlertEngine({ + activePair: config.activePair, + priceStaleMs: config.opsSentinelPriceStaleMs, + inventoryStaleMs: config.opsSentinelInventoryStaleMs, + fundingCreditPendingMs: config.opsSentinelFundingCreditPendingMs, + fundingStuckMs: config.opsSentinelFundingStuckMs, + evaluationIntervalMs: config.opsSentinelEvaluationMs, +}); + +for (const topic of topics) { + await consumer.subscribe({ topic, fromBeginning: true }); +} + +await consumer.run({ + eachMessage: async ({ topic, message }) => { + if (!message.value || state.paused) return; + + try { + const event = parseEventMessage(message.value.toString()); + const payload = normalizePayloadForAlert(topic, event); + const transitions = alertEngine.applyEvent(topic, payload); + state.last_error = null; + state.last_event_at = new Date().toISOString(); + await publishTransitions(transitions); + } catch (error) { + state.last_error = serializeError(error); + logger.error('ops_sentinel_consume_failed', { + topic, + details: { + error: serializeError(error), + }, + }); + } + }, +}); + +const timer = setInterval(() => { + if (state.paused) return; + + const transitions = alertEngine.evaluate(); + publishTransitions(transitions).catch((error) => { + state.last_error = serializeError(error); + logger.error('ops_sentinel_evaluate_failed', { + topic: config.kafkaTopicOpsAlert, + details: { + error: serializeError(error), + }, + }); + }); +}, config.opsSentinelEvaluationMs); +timer.unref?.(); + +const controlApi = startControlApi({ + host: config.opsSentinelControlHost, + port: config.opsSentinelControlPort, + logger: logger.child({ component: 'control-api' }), + service: 'ops-sentinel', + namespace: config.projectNamespace, + stateProvider: { + getState() { + return { + paused: state.paused, + publish_count: state.publish_count, + last_error: state.last_error, + last_event_at: state.last_event_at, + ...alertEngine.getState(), + }; + }, + }, + healthProvider: { + getHealth() { + return { + paused: state.paused, + last_event_at: state.last_event_at, + last_error: state.last_error, + }; + }, + }, + routes: [ + { + method: 'POST', + path: '/pause', + handler: () => { + state.paused = true; + consumer.pause(topics.map((topic) => ({ topic }))); + return { ok: true, paused: true }; + }, + }, + { + method: 'POST', + path: '/resume', + handler: () => { + state.paused = false; + consumer.resume(topics.map((topic) => ({ topic }))); + return { ok: true, paused: false }; + }, + }, + ], +}); + +async function publishTransitions(transitions) { + for (const transition of transitions) { + const event = buildEventEnvelope({ + source: 'ops-sentinel', + venue: 'unrip', + eventType: 'ops_alert', + observedAt: transition.last_evaluated_at, + payload: { + alert_event_id: `${transition.alert_code}-${transition.status}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`, + ...transition, + }, + }); + assertOpsAlertEvent(event); + await producer.sendJson(config.kafkaTopicOpsAlert, event, { + key: `${transition.alert_code}:${transition.service_scope}:${transition.tx_hash || transition.pair || 'global'}`, + }); + state.publish_count += 1; + } +} + +function normalizePayloadForAlert(topic, event) { + switch (topic) { + case config.kafkaTopicRefMarketPrice: + assertMarketPriceEvent(event); + return { + ...event.payload, + observed_at: event.observed_at, + ingested_at: event.ingested_at, + }; + case config.kafkaTopicStateIntentInventory: + assertInventorySnapshotEvent(event); + return { + ...event.payload, + observed_at: event.observed_at, + ingested_at: event.ingested_at, + }; + case config.kafkaTopicOpsLiquidityAction: + assertLiquidityActionEvent(event); + return { + ...event.payload, + observed_at: event.observed_at, + ingested_at: event.ingested_at, + }; + case config.kafkaTopicOpsFundingObservation: + assertFundingObservationEvent(event); + return event.payload; + case config.kafkaTopicExecTradeResult: + assertTradeResult(event); + return { + ...event.payload, + observed_at: event.observed_at, + ingested_at: event.ingested_at, + }; + default: + throw new Error(`unsupported ops-sentinel topic: ${topic}`); + } +} + +async function shutdown() { + clearInterval(timer); + await controlApi.close().catch(() => {}); + await consumer.disconnect(); + await producer.disconnect(); + process.exit(0); +} + +process.on('SIGINT', shutdown); +process.on('SIGTERM', shutdown); diff --git a/src/core/alert-engine.mjs b/src/core/alert-engine.mjs new file mode 100644 index 0000000..2712b52 --- /dev/null +++ b/src/core/alert-engine.mjs @@ -0,0 +1,370 @@ +const DEFAULT_RECENT_LIMIT = 50; + +export function createAlertEngine({ + activePair, + priceStaleMs, + inventoryStaleMs, + fundingCreditPendingMs, + fundingStuckMs, + evaluationIntervalMs, + recentTransitionLimit = DEFAULT_RECENT_LIMIT, +}) { + const state = { + latest_price: null, + latest_inventory: null, + latest_liquidity_action: null, + latest_trade_result: null, + funding_observations: {}, + active_alerts: {}, + recent_transitions: [], + last_evaluated_at: null, + }; + + return { + applyEvent(topic, payload, now = new Date().toISOString()) { + switch (topic) { + case 'ref.market_price': + state.latest_price = payload; + break; + case 'state.intent_inventory': + state.latest_inventory = payload; + break; + case 'ops.liquidity_action': + state.latest_liquidity_action = payload; + break; + case 'ops.funding_observation': + if (payload?.funding_observation_id) { + state.funding_observations[payload.funding_observation_id] = payload; + } + break; + case 'exec.trade_result': + state.latest_trade_result = payload; + break; + default: + break; + } + + return evaluateAlerts({ + state, + activePair, + priceStaleMs, + inventoryStaleMs, + fundingCreditPendingMs, + fundingStuckMs, + recentTransitionLimit, + now, + }); + }, + evaluate(now = new Date().toISOString()) { + return evaluateAlerts({ + state, + activePair, + priceStaleMs, + inventoryStaleMs, + fundingCreditPendingMs, + fundingStuckMs, + recentTransitionLimit, + now, + }); + }, + getState(now = new Date().toISOString()) { + return summarizeState({ + state, + evaluationIntervalMs, + now, + }); + }, + }; +} + +function evaluateAlerts({ + state, + activePair, + priceStaleMs, + inventoryStaleMs, + fundingCreditPendingMs, + fundingStuckMs, + recentTransitionLimit, + now, +}) { + const desired = new Map(); + const nowValue = timestampValue(now); + + const priceAgeMs = ageMs(state.latest_price?.observed_at || state.latest_price?.ingested_at, nowValue); + if (priceAgeMs == null || priceAgeMs > priceStaleMs) { + desired.set( + buildAlertKey({ + alertCode: 'reference_price_stale', + serviceScope: 'market-reference-ingest', + pair: activePair, + }), + { + alert_code: 'reference_price_stale', + severity: 'warning', + reason: priceAgeMs == null + ? 'no reference price has been observed' + : `reference price age ${priceAgeMs}ms exceeds ${priceStaleMs}ms`, + service_scope: 'market-reference-ingest', + pair: activePair, + asset_id: null, + tx_hash: null, + details: { + last_price_at: state.latest_price?.observed_at || state.latest_price?.ingested_at || null, + age_ms: priceAgeMs, + stale_after_ms: priceStaleMs, + }, + }, + ); + } + + const inventoryAgeMs = ageMs( + state.latest_inventory?.synced_at || state.latest_inventory?.ingested_at, + nowValue, + ); + if (inventoryAgeMs == null || inventoryAgeMs > inventoryStaleMs) { + desired.set( + buildAlertKey({ + alertCode: 'inventory_snapshot_stale', + serviceScope: 'inventory-sync', + }), + { + alert_code: 'inventory_snapshot_stale', + severity: 'warning', + reason: inventoryAgeMs == null + ? 'no inventory snapshot has been observed' + : `inventory snapshot age ${inventoryAgeMs}ms exceeds ${inventoryStaleMs}ms`, + service_scope: 'inventory-sync', + pair: null, + asset_id: null, + tx_hash: null, + details: { + last_inventory_at: state.latest_inventory?.synced_at || state.latest_inventory?.ingested_at || null, + age_ms: inventoryAgeMs, + stale_after_ms: inventoryStaleMs, + }, + }, + ); + } + + for (const observation of Object.values(state.funding_observations)) { + const observationAgeMs = ageMs(observation.last_seen_at, nowValue); + const baseDetails = { + funding_observation_id: observation.funding_observation_id, + funding_handle: observation.funding_handle, + confirmations: observation.confirmations, + amount: observation.amount, + observation_status: observation.status, + first_seen_at: observation.first_seen_at, + last_seen_at: observation.last_seen_at, + age_ms: observationAgeMs, + bridge_deposit_tx_hash: observation.bridge_deposit_tx_hash, + bridge_status: observation.bridge_status, + }; + + if (observation.status === 'SEEN_UNCONFIRMED') { + desired.set( + buildAlertKey({ + alertCode: 'funding_seen_unconfirmed', + serviceScope: 'liquidity-manager', + assetId: observation.asset_id, + txHash: observation.tx_hash, + }), + { + alert_code: 'funding_seen_unconfirmed', + severity: 'info', + reason: `funding tx ${observation.tx_hash} is visible before confirmations`, + service_scope: 'liquidity-manager', + pair: null, + asset_id: observation.asset_id, + tx_hash: observation.tx_hash, + details: baseDetails, + }, + ); + } + + if ( + observation.status === 'CREDIT_PENDING' + || (observation.status === 'SEEN_CONFIRMED' && observationAgeMs != null && observationAgeMs >= fundingCreditPendingMs) + ) { + desired.set( + buildAlertKey({ + alertCode: 'funding_confirmed_credit_pending', + serviceScope: 'liquidity-manager', + assetId: observation.asset_id, + txHash: observation.tx_hash, + }), + { + alert_code: 'funding_confirmed_credit_pending', + severity: 'warning', + reason: `funding tx ${observation.tx_hash} is confirmed but not spendable yet`, + service_scope: 'liquidity-manager', + pair: null, + asset_id: observation.asset_id, + tx_hash: observation.tx_hash, + details: { + ...baseDetails, + credit_pending_after_ms: fundingCreditPendingMs, + }, + }, + ); + } + + if ( + observation.status === 'FAILED_OR_STUCK' + || ( + observation.status !== 'CREDITED' + && observation.status !== 'SEEN_UNCONFIRMED' + && observationAgeMs != null + && fundingStuckMs != null + && observationAgeMs >= fundingStuckMs + ) + ) { + desired.set( + buildAlertKey({ + alertCode: 'funding_stuck', + serviceScope: 'liquidity-manager', + assetId: observation.asset_id, + txHash: observation.tx_hash, + }), + { + alert_code: 'funding_stuck', + severity: 'critical', + reason: `funding tx ${observation.tx_hash} is failed or stuck before credit`, + service_scope: 'liquidity-manager', + pair: null, + asset_id: observation.asset_id, + tx_hash: observation.tx_hash, + details: baseDetails, + }, + ); + } + } + + const latestTradeResult = state.latest_trade_result; + if (latestTradeResult && isSubmissionFailure(latestTradeResult)) { + desired.set( + buildAlertKey({ + alertCode: 'executor_submission_failed', + serviceScope: 'trade-executor', + pair: latestTradeResult.pair || activePair, + }), + { + alert_code: 'executor_submission_failed', + severity: 'critical', + reason: `executor submission failed for command ${latestTradeResult.command_id}`, + service_scope: 'trade-executor', + pair: latestTradeResult.pair || activePair, + asset_id: null, + tx_hash: null, + details: { + command_id: latestTradeResult.command_id, + quote_id: latestTradeResult.quote_id, + result_code: latestTradeResult.result_code, + error: latestTradeResult.error || null, + status: latestTradeResult.status, + }, + }, + ); + } + + const transitions = reconcileAlertState({ + state, + desired, + now, + recentTransitionLimit, + }); + state.last_evaluated_at = now; + return transitions; +} + +function reconcileAlertState({ state, desired, now, recentTransitionLimit }) { + const transitions = []; + + for (const [key, next] of desired.entries()) { + const existing = state.active_alerts[key]; + if (!existing) { + const raised = { + ...next, + status: 'raised', + first_raised_at: now, + raised_at: now, + cleared_at: null, + last_evaluated_at: now, + }; + state.active_alerts[key] = raised; + transitions.push(raised); + continue; + } + + state.active_alerts[key] = { + ...existing, + ...next, + status: 'raised', + raised_at: existing.raised_at || existing.first_raised_at || now, + first_raised_at: existing.first_raised_at || existing.raised_at || now, + cleared_at: null, + last_evaluated_at: now, + }; + } + + for (const [key, existing] of Object.entries(state.active_alerts)) { + if (desired.has(key)) continue; + + const cleared = { + ...existing, + status: 'cleared', + raised_at: existing.raised_at || existing.first_raised_at || now, + cleared_at: now, + last_evaluated_at: now, + }; + delete state.active_alerts[key]; + transitions.push(cleared); + } + + if (transitions.length > 0) { + state.recent_transitions.unshift(...transitions); + state.recent_transitions = state.recent_transitions.slice(0, recentTransitionLimit); + } + + return transitions; +} + +function summarizeState({ state, evaluationIntervalMs, now }) { + const activeAlerts = Object.values(state.active_alerts) + .sort((left, right) => timestampValue(right.first_raised_at) - timestampValue(left.first_raised_at)); + const nowValue = timestampValue(now); + + return { + active_alerts: activeAlerts, + recent_transitions: state.recent_transitions, + last_evaluated_at: state.last_evaluated_at, + stale: ageMs(state.last_evaluated_at, nowValue) > (evaluationIntervalMs * 2), + latest_inputs: { + market_price_at: state.latest_price?.observed_at || state.latest_price?.ingested_at || null, + inventory_at: state.latest_inventory?.synced_at || state.latest_inventory?.ingested_at || null, + liquidity_action_at: state.latest_liquidity_action?.observed_at || null, + trade_result_at: state.latest_trade_result?.ingested_at || null, + funding_observation_count: Object.keys(state.funding_observations).length, + }, + }; +} + +function buildAlertKey({ alertCode, serviceScope, pair = null, assetId = null, txHash = null }) { + return [alertCode, serviceScope, pair || '', assetId || '', txHash || ''].join('|'); +} + +function isSubmissionFailure(result) { + return result?.status === 'failed' || result?.result_code === 'submission_failed'; +} + +function ageMs(value, nowValue) { + const start = timestampValue(value); + if (!Number.isFinite(start) || !Number.isFinite(nowValue)) return null; + return Math.max(0, nowValue - start); +} + +function timestampValue(value) { + if (!value) return NaN; + const parsed = Date.parse(value); + return Number.isFinite(parsed) ? parsed : NaN; +} diff --git a/src/core/funding-observations.mjs b/src/core/funding-observations.mjs new file mode 100644 index 0000000..684ff2e --- /dev/null +++ b/src/core/funding-observations.mjs @@ -0,0 +1,225 @@ +import crypto from 'node:crypto'; + +import { bigintAmount, mapToSortedObject } from './assets.mjs'; + +const CREDITED_BRIDGE_STATUSES = new Set(['COMPLETED', 'CREDITED', 'FINALIZED', 'SETTLED']); +const FAILED_BRIDGE_STATUSES = new Set(['FAILED', 'ERROR', 'REJECTED', 'EXPIRED']); +const UNCONFIRMED_STATUS = 'SEEN_UNCONFIRMED'; +const CONFIRMED_STATUS = 'SEEN_CONFIRMED'; +const CREDIT_PENDING_STATUS = 'CREDIT_PENDING'; +const CREDITED_STATUS = 'CREDITED'; +const FAILED_OR_STUCK_STATUS = 'FAILED_OR_STUCK'; + +export function buildFundingObservationId({ accountId, chain, fundingHandle, txHash }) { + return crypto + .createHash('sha256') + .update(`${accountId || ''}|${chain || ''}|${fundingHandle || ''}|${txHash || ''}`) + .digest('hex'); +} + +export function buildFundingObservationKey({ chain, fundingHandle, txHash }) { + return `${chain || 'unknown'}:${fundingHandle || 'unknown'}:${txHash || 'unknown'}`; +} + +export function correlateFundingObservation({ + existing = null, + accountId, + assetId, + chain, + fundingHandle, + source, + txHash, + amount, + confirmations = 0, + observedAt = new Date().toISOString(), + bridgeDeposit = null, + stuckAfterMs = 0, +}) { + const firstSeenAt = existing?.first_seen_at || observedAt; + const lastSeenAt = observedAt; + const normalizedConfirmations = normalizeConfirmations(confirmations); + const bridgeStatus = normalizeBridgeStatus(bridgeDeposit?.status); + const bridgeTxHash = bridgeDeposit?.tx_hash || null; + const ageMs = isoAgeMs(firstSeenAt, observedAt); + const fundingObservationId = existing?.funding_observation_id || buildFundingObservationId({ + accountId, + chain, + fundingHandle, + txHash, + }); + + let status = normalizedConfirmations > 0 ? CONFIRMED_STATUS : UNCONFIRMED_STATUS; + if (bridgeStatus) { + if (CREDITED_BRIDGE_STATUSES.has(bridgeStatus)) status = CREDITED_STATUS; + else if (FAILED_BRIDGE_STATUSES.has(bridgeStatus)) status = FAILED_OR_STUCK_STATUS; + else status = CREDIT_PENDING_STATUS; + } else if (stuckAfterMs > 0 && ageMs >= stuckAfterMs && normalizedConfirmations > 0) { + status = FAILED_OR_STUCK_STATUS; + } + + return { + funding_observation_id: fundingObservationId, + account_id: accountId, + asset_id: assetId, + chain, + funding_handle: fundingHandle, + source, + tx_hash: txHash, + status, + amount: String(amount || '0'), + confirmations: normalizedConfirmations, + first_seen_at: firstSeenAt, + last_seen_at: lastSeenAt, + credited_at: status === CREDITED_STATUS ? (existing?.credited_at || observedAt) : null, + bridge_deposit_tx_hash: bridgeTxHash, + bridge_status: bridgeStatus, + credit_correlation: bridgeStatus ? (bridgeDeposit?.tx_hash === txHash ? 'tx_hash' : 'handle') : null, + }; +} + +export function summarizeFundingObservations(observations, { now = new Date().toISOString() } = {}) { + const byHandle = {}; + const byAsset = {}; + const uncreditedFundingTotalByAsset = {}; + const creditCorrelation = {}; + let latestFundingObservationAt = null; + + const sorted = [...observations].sort((left, right) => ( + timestampValue(right.last_seen_at) - timestampValue(left.last_seen_at) + )); + + for (const observation of sorted) { + if (!observation?.funding_observation_id) continue; + + const handleKey = observation.funding_handle || `${observation.chain}:${observation.asset_id}`; + const ageMs = isoAgeMs(observation.last_seen_at, now); + const enriched = { + ...observation, + age_ms: ageMs, + spendable: false, + pre_credit: observation.status !== CREDITED_STATUS, + }; + + const handle = byHandle[handleKey] ||= { + funding_handle: observation.funding_handle, + chain: observation.chain, + asset_id: observation.asset_id, + source: observation.source, + latest_status: observation.status, + latest_observation_at: observation.last_seen_at, + pre_credit_total: '0', + observations: [], + }; + handle.observations.push(enriched); + + if (timestampValue(observation.last_seen_at) > timestampValue(handle.latest_observation_at)) { + handle.latest_status = observation.status; + handle.latest_observation_at = observation.last_seen_at; + handle.source = observation.source; + } + + const asset = byAsset[observation.asset_id] ||= { + asset_id: observation.asset_id, + pre_credit_total: '0', + latest_observation_at: observation.last_seen_at, + latest_status: observation.status, + }; + if (timestampValue(observation.last_seen_at) > timestampValue(asset.latest_observation_at)) { + asset.latest_observation_at = observation.last_seen_at; + asset.latest_status = observation.status; + } + + creditCorrelation[observation.funding_observation_id] = { + funding_observation_id: observation.funding_observation_id, + tx_hash: observation.tx_hash, + bridge_deposit_tx_hash: observation.bridge_deposit_tx_hash, + status: observation.status, + credited_at: observation.credited_at, + bridge_status: observation.bridge_status, + }; + + if (observation.status !== CREDITED_STATUS) { + handle.pre_credit_total = addAmounts(handle.pre_credit_total, observation.amount); + asset.pre_credit_total = addAmounts(asset.pre_credit_total, observation.amount); + uncreditedFundingTotalByAsset[observation.asset_id] = addAmounts( + uncreditedFundingTotalByAsset[observation.asset_id] || '0', + observation.amount, + ); + } + + if (timestampValue(observation.last_seen_at) > timestampValue(latestFundingObservationAt)) { + latestFundingObservationAt = observation.last_seen_at; + } + } + + return { + funding_observations_by_handle: mapToSortedObject(byHandle), + funding_visibility_by_asset: mapToSortedObject(byAsset), + latest_funding_observation_at: latestFundingObservationAt, + uncredited_funding_total_by_asset: mapToSortedObject(uncreditedFundingTotalByAsset), + credit_correlation: mapToSortedObject(creditCorrelation), + }; +} + +export function buildFundingVisibility(observations, { now = new Date().toISOString() } = {}) { + const summary = summarizeFundingObservations(observations, { now }); + return { + last_observed_at: summary.latest_funding_observation_at, + pre_credit_inbound: summary.uncredited_funding_total_by_asset, + by_asset: summary.funding_visibility_by_asset, + by_handle: summary.funding_observations_by_handle, + }; +} + +export function hasFundingObservationChanged(previous, next) { + if (!previous) return true; + + return ( + previous.status !== next.status + || previous.confirmations !== next.confirmations + || previous.bridge_deposit_tx_hash !== next.bridge_deposit_tx_hash + || previous.bridge_status !== next.bridge_status + || previous.credited_at !== next.credited_at + ); +} + +export function matchBridgeDeposit({ txHash, fundingHandle, bridgeDeposits }) { + const txMatch = bridgeDeposits.find((deposit) => deposit?.tx_hash && deposit.tx_hash === txHash); + if (txMatch) return txMatch; + + return bridgeDeposits.find((deposit) => ( + !deposit?.tx_hash + && ( + deposit?.address + && fundingHandle + && deposit.address === fundingHandle + ) + )) || null; +} + +function normalizeBridgeStatus(status) { + return status ? String(status).toUpperCase() : null; +} + +function normalizeConfirmations(value) { + const normalized = Number(value); + if (!Number.isFinite(normalized) || normalized < 0) return 0; + return Math.floor(normalized); +} + +function addAmounts(left, right) { + return (bigintAmount(left) + bigintAmount(right)).toString(); +} + +function isoAgeMs(from, to) { + const start = timestampValue(from); + const end = timestampValue(to); + if (!Number.isFinite(start) || !Number.isFinite(end)) return null; + return Math.max(0, end - start); +} + +function timestampValue(value) { + if (!value) return -Infinity; + const parsed = Date.parse(value); + return Number.isFinite(parsed) ? parsed : -Infinity; +} diff --git a/src/core/history-records.mjs b/src/core/history-records.mjs index 0bae447..c53be8b 100644 --- a/src/core/history-records.mjs +++ b/src/core/history-records.mjs @@ -1,9 +1,11 @@ import { assertExecuteTradeCommand, + assertFundingObservationEvent, assertInventorySnapshotEvent, assertLiquidityActionEvent, assertMarketPriceEvent, assertNormalizedSwapDemand, + assertOpsAlertEvent, assertTradeDecisionEvent, assertTradeResult, } from './schemas.mjs'; @@ -73,6 +75,32 @@ export function routeHistoryRecord({ topic, event }) { decision_key: event.payload.liquidity_action_id, }, }; + case 'ops.funding_observation': + assertFundingObservationEvent(event); + return { + table: 'funding_observations', + record: { + event_id: event.event_id, + observed_at: event.observed_at, + ingested_at: event.ingested_at, + quote_id: null, + pair: null, + decision_key: event.payload.funding_observation_id, + }, + }; + case 'ops.alert': + assertOpsAlertEvent(event); + return { + table: 'ops_alerts', + record: { + event_id: event.event_id, + observed_at: event.observed_at, + ingested_at: event.ingested_at, + quote_id: null, + pair: event.payload.pair || null, + decision_key: event.payload.alert_event_id, + }, + }; case 'decision.trade_decision': assertTradeDecisionEvent(event); return { diff --git a/src/core/liquidity-state.mjs b/src/core/liquidity-state.mjs index 8d0f8bf..d6bcfee 100644 --- a/src/core/liquidity-state.mjs +++ b/src/core/liquidity-state.mjs @@ -3,8 +3,16 @@ export function normalizeLiquidityState(state, { withdrawalsFrozen }) { state.deposits ||= {}; state.tracked_withdrawals ||= {}; state.supported_tokens ||= {}; + state.funding_observations ||= {}; + state.funding_observations_by_handle ||= {}; + state.funding_visibility_by_asset ||= {}; + state.uncredited_funding_total_by_asset ||= {}; + state.credit_correlation ||= {}; + state.observer_health ||= {}; state.publish_count ||= 0; + state.funding_publish_count ||= 0; state.withdrawals_frozen ??= withdrawalsFrozen; state.paused ??= false; + state.funding_observer_paused ??= false; return state; } diff --git a/src/core/schemas.mjs b/src/core/schemas.mjs index 95204c1..5f6881e 100644 --- a/src/core/schemas.mjs +++ b/src/core/schemas.mjs @@ -10,6 +10,10 @@ function requireOneOf(value, field, values) { if (!values.includes(value)) throw new Error(`Unexpected ${field}: ${value}`); } +function requireNumber(value, field) { + if (typeof value !== 'number' || !Number.isFinite(value)) throw new Error(`Missing ${field}`); +} + export function assertEventEnvelope(event) { requireObject(event, 'event'); requireString(event.event_id, 'event.event_id'); @@ -74,6 +78,50 @@ export function assertLiquidityActionEvent(event) { return event; } +export function assertFundingObservationEvent(event) { + assertEventEnvelope(event); + if (event.event_type !== 'funding_observation') { + throw new Error(`Unexpected event_type: ${event.event_type}`); + } + + const payload = event.payload; + requireString(payload.funding_observation_id, 'payload.funding_observation_id'); + requireString(payload.account_id, 'payload.account_id'); + requireString(payload.asset_id, 'payload.asset_id'); + requireString(payload.chain, 'payload.chain'); + requireString(payload.funding_handle, 'payload.funding_handle'); + requireString(payload.source, 'payload.source'); + requireString(payload.tx_hash, 'payload.tx_hash'); + requireString(payload.status, 'payload.status'); + requireString(payload.amount, 'payload.amount'); + requireNumber(payload.confirmations, 'payload.confirmations'); + requireString(payload.first_seen_at, 'payload.first_seen_at'); + requireString(payload.last_seen_at, 'payload.last_seen_at'); + if (payload.credited_at != null) requireString(payload.credited_at, 'payload.credited_at'); + if (payload.bridge_deposit_tx_hash != null) { + requireString(payload.bridge_deposit_tx_hash, 'payload.bridge_deposit_tx_hash'); + } + return event; +} + +export function assertOpsAlertEvent(event) { + assertEventEnvelope(event); + if (event.event_type !== 'ops_alert') throw new Error(`Unexpected event_type: ${event.event_type}`); + + const payload = event.payload; + requireString(payload.alert_event_id, 'payload.alert_event_id'); + requireString(payload.alert_code, 'payload.alert_code'); + requireString(payload.status, 'payload.status'); + requireOneOf(payload.status, 'payload.status', ['raised', 'cleared']); + requireString(payload.severity, 'payload.severity'); + requireString(payload.reason, 'payload.reason'); + requireString(payload.service_scope, 'payload.service_scope'); + requireString(payload.raised_at, 'payload.raised_at'); + if (payload.cleared_at != null) requireString(payload.cleared_at, 'payload.cleared_at'); + requireObject(payload.details, 'payload.details'); + return event; +} + export function assertTradeDecisionEvent(event) { assertEventEnvelope(event); if (event.event_type !== 'trade_decision') throw new Error(`Unexpected event_type: ${event.event_type}`); diff --git a/src/lib/config.mjs b/src/lib/config.mjs index 2e88d99..aa8b84a 100644 --- a/src/lib/config.mjs +++ b/src/lib/config.mjs @@ -18,6 +18,7 @@ const DEFAULTS = { historyWriterControlPort: 8085, strategyEngineControlPort: 8086, tradeExecutorControlPort: 8087, + opsSentinelControlPort: 8088, kafkaBrokers: ['127.0.0.1:9092'], kafkaClientId: 'unrip', kafkaTopicRawNearIntentsQuote: 'raw.near_intents.quote', @@ -25,6 +26,8 @@ const DEFAULTS = { kafkaTopicRefMarketPrice: 'ref.market_price', kafkaTopicStateIntentInventory: 'state.intent_inventory', kafkaTopicOpsLiquidityAction: 'ops.liquidity_action', + kafkaTopicOpsFundingObservation: 'ops.funding_observation', + kafkaTopicOpsAlert: 'ops.alert', kafkaTopicDecisionTradeDecision: 'decision.trade_decision', kafkaTopicCmdExecuteTrade: 'cmd.execute_trade', kafkaTopicExecTradeResult: 'exec.trade_result', @@ -32,6 +35,7 @@ const DEFAULTS = { kafkaConsumerGroupInventory: 'inventory-sync-v1', kafkaConsumerGroupStrategy: 'strategy-engine-v1', kafkaConsumerGroupExecutor: 'trade-executor-v1', + kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1', executorStateDir: './var/executor-state', liquidityStateDir: './var/liquidity-state', postgresUrl: 'postgresql://unrip:unrip@127.0.0.1:5432/unrip', @@ -63,6 +67,11 @@ const DEFAULTS = { executorInitialArmed: false, executorResponseTimeoutMs: 10_000, withdrawalsFrozen: true, + btcFundingObserverEnabled: true, + btcFundingObserverBaseUrl: 'https://mempool.space/api', + fundingObservationStuckMs: 60 * 60 * 1000, + opsSentinelEvaluationMs: 5_000, + opsSentinelFundingCreditPendingMs: 5 * 60 * 1000, }; function splitCsv(value) { @@ -171,6 +180,12 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.HISTORY_WRITER_CONTROL_PORT, DEFAULTS.historyWriterControlPort, ), + opsSentinelControlHost: + process.env.OPS_SENTINEL_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, + opsSentinelControlPort: parseNumber( + process.env.OPS_SENTINEL_CONTROL_PORT, + DEFAULTS.opsSentinelControlPort, + ), strategyEngineControlHost: process.env.STRATEGY_ENGINE_CONTROL_HOST || DEFAULTS.nearIntentsControlHost, strategyEngineControlPort: parseNumber( @@ -197,6 +212,10 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.KAFKA_TOPIC_STATE_INTENT_INVENTORY || DEFAULTS.kafkaTopicStateIntentInventory, kafkaTopicOpsLiquidityAction: process.env.KAFKA_TOPIC_OPS_LIQUIDITY_ACTION || DEFAULTS.kafkaTopicOpsLiquidityAction, + kafkaTopicOpsFundingObservation: + process.env.KAFKA_TOPIC_OPS_FUNDING_OBSERVATION || DEFAULTS.kafkaTopicOpsFundingObservation, + kafkaTopicOpsAlert: + process.env.KAFKA_TOPIC_OPS_ALERT || DEFAULTS.kafkaTopicOpsAlert, kafkaTopicDecisionTradeDecision: process.env.KAFKA_TOPIC_DECISION_TRADE_DECISION || DEFAULTS.kafkaTopicDecisionTradeDecision, kafkaTopicCmdExecuteTrade: @@ -211,6 +230,8 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.KAFKA_CONSUMER_GROUP_STRATEGY || DEFAULTS.kafkaConsumerGroupStrategy, kafkaConsumerGroupExecutor: process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor, + kafkaConsumerGroupOpsSentinel: + process.env.KAFKA_CONSUMER_GROUP_OPS_SENTINEL || DEFAULTS.kafkaConsumerGroupOpsSentinel, executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir, liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir, postgresUrl: process.env.POSTGRES_URL || DEFAULTS.postgresUrl, @@ -280,5 +301,35 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.LIQUIDITY_WITHDRAWALS_FROZEN, DEFAULTS.withdrawalsFrozen, ), + btcFundingObserverEnabled: parseBoolean( + process.env.BTC_FUNDING_OBSERVER_ENABLED, + DEFAULTS.btcFundingObserverEnabled, + ), + btcFundingObserverBaseUrl: + process.env.BTC_FUNDING_OBSERVER_BASE_URL || DEFAULTS.btcFundingObserverBaseUrl, + fundingObservationStuckMs: parseNumber( + process.env.FUNDING_OBSERVATION_STUCK_MS, + DEFAULTS.fundingObservationStuckMs, + ), + opsSentinelEvaluationMs: parseNumber( + process.env.OPS_SENTINEL_EVALUATION_MS, + DEFAULTS.opsSentinelEvaluationMs, + ), + opsSentinelPriceStaleMs: parseNumber( + process.env.OPS_SENTINEL_PRICE_STALE_MS, + DEFAULTS.marketReferenceMaxAgeMs, + ), + opsSentinelInventoryStaleMs: parseNumber( + process.env.OPS_SENTINEL_INVENTORY_STALE_MS, + DEFAULTS.strategyInventoryMaxAgeMs, + ), + opsSentinelFundingCreditPendingMs: parseNumber( + process.env.OPS_SENTINEL_FUNDING_CREDIT_PENDING_MS, + DEFAULTS.opsSentinelFundingCreditPendingMs, + ), + opsSentinelFundingStuckMs: parseNumber( + process.env.OPS_SENTINEL_FUNDING_STUCK_MS, + DEFAULTS.fundingObservationStuckMs, + ), }; } diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index caec4d1..6c8d800 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -6,6 +6,8 @@ const TABLES = [ 'market_price_events', 'intent_inventory_snapshots', 'liquidity_actions', + 'funding_observations', + 'ops_alerts', 'trade_decisions', 'execute_trade_commands', 'trade_execution_results', @@ -51,6 +53,47 @@ export async function ensureHistorySchema(pool) { `); } + await ensureExpressionIndex(pool, { + name: 'funding_observations_tx_hash_idx', + table: 'funding_observations', + expression: "(payload->>'tx_hash')", + }); + await ensureExpressionIndex(pool, { + name: 'funding_observations_handle_idx', + table: 'funding_observations', + expression: "(payload->>'funding_handle')", + }); + await ensureExpressionIndex(pool, { + name: 'funding_observations_asset_id_idx', + table: 'funding_observations', + expression: "(payload->>'asset_id')", + }); + await ensureExpressionIndex(pool, { + name: 'funding_observations_chain_idx', + table: 'funding_observations', + expression: "(payload->>'chain')", + }); + await ensureExpressionIndex(pool, { + name: 'funding_observations_status_idx', + table: 'funding_observations', + expression: "(payload->>'status')", + }); + await ensureExpressionIndex(pool, { + name: 'ops_alerts_alert_code_idx', + table: 'ops_alerts', + expression: "(payload->>'alert_code')", + }); + await ensureExpressionIndex(pool, { + name: 'ops_alerts_status_idx', + table: 'ops_alerts', + expression: "(payload->>'status')", + }); + await ensureExpressionIndex(pool, { + name: 'ops_alerts_asset_id_idx', + table: 'ops_alerts', + expression: "(payload->>'asset_id')", + }); + await pool.query(` CREATE TABLE IF NOT EXISTS ${PORTFOLIO_METRICS_TABLE} ( metric_id TEXT PRIMARY KEY, @@ -240,3 +283,10 @@ function normalizePortfolioMetricRow(row) { payload: row.payload, }; } + +async function ensureExpressionIndex(pool, { name, table, expression }) { + await pool.query(` + CREATE INDEX IF NOT EXISTS ${name} + ON ${table} (${expression}) + `); +} diff --git a/src/observers/btc-address-observer.mjs b/src/observers/btc-address-observer.mjs new file mode 100644 index 0000000..54f0d2d --- /dev/null +++ b/src/observers/btc-address-observer.mjs @@ -0,0 +1,95 @@ +import { fetchJson } from '../lib/http.mjs'; + +export function createBtcAddressObserver({ + baseUrl, + source = 'btc_mempool_space', +}) { + const normalizedBaseUrl = String(baseUrl || '').replace(/\/+$/, ''); + if (!normalizedBaseUrl) throw new Error('Missing BTC funding observer base URL'); + + return { + async listTransactions({ address }) { + if (!address) return { + source, + observed_at: new Date().toISOString(), + transactions: [], + }; + + const [tipHeight, latestTxs, mempoolTxs] = await Promise.all([ + fetchTipHeight(normalizedBaseUrl), + fetchJson(`${normalizedBaseUrl}/address/${encodeURIComponent(address)}/txs`).catch(() => []), + fetchJson(`${normalizedBaseUrl}/address/${encodeURIComponent(address)}/txs/mempool`).catch(() => []), + ]); + + const transactions = dedupeTransactions([...(latestTxs || []), ...(mempoolTxs || [])]) + .map((tx) => normalizeBtcTransaction(tx, { address, tipHeight, source })) + .filter(Boolean); + + return { + source, + observed_at: new Date().toISOString(), + transactions, + }; + }, + }; +} + +async function fetchTipHeight(baseUrl) { + const response = await fetch(`${baseUrl}/blocks/tip/height`); + if (!response.ok) return null; + + const text = await response.text(); + const parsed = Number(text); + return Number.isFinite(parsed) ? parsed : null; +} + +function dedupeTransactions(transactions) { + const seen = new Set(); + const deduped = []; + for (const tx of transactions) { + const key = tx?.txid; + if (!key || seen.has(key)) continue; + seen.add(key); + deduped.push(tx); + } + return deduped; +} + +function normalizeBtcTransaction(tx, { address, tipHeight, source }) { + if (!tx?.txid) return null; + + const amount = sumOutputsToAddress(tx.vout, address); + if (amount === 0n) return null; + + const confirmed = tx.status?.confirmed === true; + const confirmations = confirmed + ? computeConfirmations(tipHeight, tx.status?.block_height) + : 0; + const observedAt = tx.status?.block_time + ? new Date(Number(tx.status.block_time) * 1000).toISOString() + : new Date().toISOString(); + + return { + source, + tx_hash: tx.txid, + amount: amount.toString(), + confirmations, + observed_at: observedAt, + }; +} + +function sumOutputsToAddress(outputs, address) { + let total = 0n; + for (const output of outputs || []) { + if (output?.scriptpubkey_address !== address) continue; + total += BigInt(String(output.value || 0)); + } + return total; +} + +function computeConfirmations(tipHeight, blockHeight) { + const tip = Number(tipHeight); + const block = Number(blockHeight); + if (!Number.isFinite(tip) || !Number.isFinite(block)) return 1; + return Math.max(1, (tip - block) + 1); +} diff --git a/test/alert-engine.test.mjs b/test/alert-engine.test.mjs new file mode 100644 index 0000000..7f76f90 --- /dev/null +++ b/test/alert-engine.test.mjs @@ -0,0 +1,103 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { createAlertEngine } from '../src/core/alert-engine.mjs'; + +function createEngine() { + return createAlertEngine({ + activePair: 'nep141:btc.omft.near->nep141:eure.omft.near', + priceStaleMs: 30_000, + inventoryStaleMs: 30_000, + fundingCreditPendingMs: 300_000, + fundingStuckMs: 3_600_000, + evaluationIntervalMs: 5_000, + }); +} + +test('alert engine raises and clears stale state transitions', () => { + const engine = createEngine(); + + let transitions = engine.applyEvent('ref.market_price', { + price_id: 'price-1', + pair: 'nep141:btc.omft.near->nep141:eure.omft.near', + eur_per_btc: '100000', + btc_per_eure: '0.00001', + observed_at: '2026-04-03T08:00:00.000Z', + }, '2026-04-03T08:00:00.000Z'); + assert.equal(transitions.length, 1); + assert.equal(transitions[0].alert_code, 'inventory_snapshot_stale'); + assert.equal(transitions[0].status, 'raised'); + + transitions = engine.applyEvent('state.intent_inventory', { + inventory_id: 'inventory-1', + account_id: 'solver.near', + reconciliation_status: 'ok', + spendable: { 'nep141:btc.omft.near': '1000' }, + synced_at: '2026-04-03T08:00:00.000Z', + }, '2026-04-03T08:00:00.000Z'); + assert.equal(transitions.length, 1); + assert.equal(transitions[0].alert_code, 'inventory_snapshot_stale'); + assert.equal(transitions[0].status, 'cleared'); + + transitions = engine.evaluate('2026-04-03T08:00:31.000Z'); + assert.equal(transitions.length, 2); + assert.deepEqual( + transitions.map((transition) => `${transition.alert_code}:${transition.status}`).sort(), + [ + 'inventory_snapshot_stale:raised', + 'reference_price_stale:raised', + ], + ); + + transitions = engine.applyEvent('ref.market_price', { + price_id: 'price-2', + pair: 'nep141:btc.omft.near->nep141:eure.omft.near', + eur_per_btc: '100100', + btc_per_eure: '0.00000999', + observed_at: '2026-04-03T08:00:32.000Z', + }, '2026-04-03T08:00:32.000Z'); + assert.equal(transitions.length, 1); + assert.equal(transitions[0].alert_code, 'reference_price_stale'); + assert.equal(transitions[0].status, 'cleared'); +}); + +test('executor submission failure produces an alert event and clears on recovery', () => { + const engine = createEngine(); + engine.applyEvent('ref.market_price', { + price_id: 'price-1', + pair: 'nep141:btc.omft.near->nep141:eure.omft.near', + eur_per_btc: '100000', + btc_per_eure: '0.00001', + observed_at: '2026-04-03T08:00:00.000Z', + }, '2026-04-03T08:00:00.000Z'); + engine.applyEvent('state.intent_inventory', { + inventory_id: 'inventory-1', + account_id: 'solver.near', + reconciliation_status: 'ok', + spendable: { 'nep141:btc.omft.near': '1000' }, + synced_at: '2026-04-03T08:00:00.000Z', + }, '2026-04-03T08:00:00.000Z'); + + let transitions = engine.applyEvent('exec.trade_result', { + command_id: 'cmd-1', + quote_id: 'quote-1', + pair: 'nep141:btc.omft.near->nep141:eure.omft.near', + status: 'failed', + result_code: 'submission_failed', + error: { message: 'relay timeout' }, + }, '2026-04-03T08:00:10.000Z'); + assert.equal(transitions.length, 1); + assert.equal(transitions[0].alert_code, 'executor_submission_failed'); + assert.equal(transitions[0].status, 'raised'); + + transitions = engine.applyEvent('exec.trade_result', { + command_id: 'cmd-2', + quote_id: 'quote-2', + pair: 'nep141:btc.omft.near->nep141:eure.omft.near', + status: 'submitted', + result_code: 'quote_response_ok', + }, '2026-04-03T08:00:20.000Z'); + assert.equal(transitions.length, 1); + assert.equal(transitions[0].alert_code, 'executor_submission_failed'); + assert.equal(transitions[0].status, 'cleared'); +}); diff --git a/test/funding-observations.test.mjs b/test/funding-observations.test.mjs new file mode 100644 index 0000000..25c9f49 --- /dev/null +++ b/test/funding-observations.test.mjs @@ -0,0 +1,110 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { + buildFundingVisibility, + correlateFundingObservation, +} from '../src/core/funding-observations.mjs'; +import { buildInventorySnapshot } from '../src/core/inventory.mjs'; +import { routeHistoryRecord } from '../src/core/history-records.mjs'; + +test('pre-credit funding visibility remains non-spendable', () => { + const observation = correlateFundingObservation({ + accountId: 'solver.near', + assetId: 'nep141:btc.omft.near', + chain: 'btc:mainnet', + fundingHandle: 'bc1qexample', + source: 'btc_mempool_space', + txHash: 'btc-tx-1', + amount: '1500', + confirmations: 0, + observedAt: '2026-04-03T08:00:00.000Z', + }); + + const inventory = buildInventorySnapshot({ + accountId: 'solver.near', + balances: { + 'nep141:btc.omft.near': '1000', + }, + recentDeposits: [], + trackedWithdrawals: [], + assetRegistry: new Map([ + ['nep141:btc.omft.near', { decimals: 8 }], + ]), + observedAt: '2026-04-03T08:01:00.000Z', + }); + const visibility = buildFundingVisibility([observation], { + now: '2026-04-03T08:01:00.000Z', + }); + + assert.equal(observation.status, 'SEEN_UNCONFIRMED'); + assert.equal(inventory.spendable['nep141:btc.omft.near'], '1000'); + assert.equal(visibility.pre_credit_inbound['nep141:btc.omft.near'], '1500'); + assert.equal(visibility.by_handle.bc1qexample.observations[0].spendable, false); +}); + +test('funding observation correlates to later credit without losing tx hash', () => { + const seen = correlateFundingObservation({ + accountId: 'solver.near', + assetId: 'nep141:btc.omft.near', + chain: 'btc:mainnet', + fundingHandle: 'bc1qexample', + source: 'btc_mempool_space', + txHash: 'btc-tx-1', + amount: '1500', + confirmations: 2, + observedAt: '2026-04-03T08:00:00.000Z', + }); + const credited = correlateFundingObservation({ + existing: seen, + accountId: 'solver.near', + assetId: 'nep141:btc.omft.near', + chain: 'btc:mainnet', + fundingHandle: 'bc1qexample', + source: 'btc_mempool_space', + txHash: 'btc-tx-1', + amount: '1500', + confirmations: 3, + observedAt: '2026-04-03T08:10:00.000Z', + bridgeDeposit: { + tx_hash: 'btc-tx-1', + status: 'COMPLETED', + }, + }); + + assert.equal(credited.status, 'CREDITED'); + assert.equal(credited.tx_hash, 'btc-tx-1'); + assert.equal(credited.bridge_deposit_tx_hash, 'btc-tx-1'); + assert.equal(credited.funding_observation_id, seen.funding_observation_id); + assert.equal(credited.credited_at, '2026-04-03T08:10:00.000Z'); +}); + +test('history writer routes funding observations into the funding table family', () => { + const routed = routeHistoryRecord({ + topic: 'ops.funding_observation', + event: { + event_id: 'evt-funding-1', + event_type: 'funding_observation', + venue: 'near-intents', + schema_version: 1, + ingested_at: '2026-04-03T08:00:00.000Z', + payload: { + funding_observation_id: 'funding-1', + account_id: 'solver.near', + asset_id: 'nep141:btc.omft.near', + chain: 'btc:mainnet', + funding_handle: 'bc1qexample', + source: 'btc_mempool_space', + tx_hash: 'btc-tx-1', + status: 'SEEN_CONFIRMED', + amount: '1500', + confirmations: 2, + first_seen_at: '2026-04-03T08:00:00.000Z', + last_seen_at: '2026-04-03T08:05:00.000Z', + }, + }, + }); + + assert.equal(routed.table, 'funding_observations'); + assert.equal(routed.record.decision_key, 'funding-1'); +}); diff --git a/test/liquidity-state.test.mjs b/test/liquidity-state.test.mjs index 8de75d7..92ec75c 100644 --- a/test/liquidity-state.test.mjs +++ b/test/liquidity-state.test.mjs @@ -16,6 +16,13 @@ test('normalizeLiquidityState hydrates missing nested maps from persisted partia assert.deepEqual(state.deposits, {}); assert.deepEqual(state.tracked_withdrawals, {}); assert.deepEqual(state.supported_tokens, {}); + assert.deepEqual(state.funding_observations, {}); + assert.deepEqual(state.funding_observations_by_handle, {}); + assert.deepEqual(state.funding_visibility_by_asset, {}); + assert.deepEqual(state.uncredited_funding_total_by_asset, {}); + assert.deepEqual(state.credit_correlation, {}); + assert.deepEqual(state.observer_health, {}); assert.equal(state.withdrawals_frozen, true); assert.equal(state.paused, false); + assert.equal(state.funding_observer_paused, false); });