diff --git a/scripts/deploy/bootstrap.sh b/scripts/deploy/bootstrap.sh index 9039f79..0d218ce 100755 --- a/scripts/deploy/bootstrap.sh +++ b/scripts/deploy/bootstrap.sh @@ -135,6 +135,7 @@ fi : "${POSTGRES_PASSWORD:=}" : "${POSTGRES_URL:=}" : "${NEAR_INTENTS_SIGNER_PRIVATE_KEY:=}" +: "${NOTIFICATION_NTFY_TOKEN:=}" secret_value() { local key="$1" @@ -153,6 +154,10 @@ if [[ -z "$NEAR_INTENTS_SIGNER_PRIVATE_KEY" ]]; then NEAR_INTENTS_SIGNER_PRIVATE_KEY="$(secret_value NEAR_INTENTS_SIGNER_PRIVATE_KEY)" fi +if [[ -z "$NOTIFICATION_NTFY_TOKEN" ]]; then + NOTIFICATION_NTFY_TOKEN="$(secret_value NOTIFICATION_NTFY_TOKEN)" +fi + if [[ -z "$POSTGRES_PASSWORD" ]]; then POSTGRES_PASSWORD="$(python3 - <<'PY' import secrets @@ -177,6 +182,9 @@ secret_args=( if [[ -n "$NEAR_INTENTS_SIGNER_PRIVATE_KEY" ]]; then secret_args+=(--from-literal=NEAR_INTENTS_SIGNER_PRIVATE_KEY="$NEAR_INTENTS_SIGNER_PRIVATE_KEY") fi +if [[ -n "$NOTIFICATION_NTFY_TOKEN" ]]; then + secret_args+=(--from-literal=NOTIFICATION_NTFY_TOKEN="$NOTIFICATION_NTFY_TOKEN") +fi kubectl -n "$PROJECT_NAMESPACE" create secret generic "$APP_SECRET_NAME" \ "${secret_args[@]}" \ --dry-run=client -o yaml | kubectl apply -f - diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 19ebbdc..d8b24b7 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -4,12 +4,22 @@ import { createConsumer } from '../bus/kafka/consumer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { routeHistoryRecord } from '../core/history-records.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; +import { + buildIntentRequestOutcomeNotification, + buildLiquidityActionNotification, + buildQuoteOutcomeNotification, + createNotificationDispatcher, + observedAtOrAfter, +} from '../core/notification-layer.mjs'; +import { createNtfyNotificationClient } from '../core/notification-client.mjs'; import { buildPortfolioMetricId, computePortfolioMetric } from '../core/portfolio-metrics.mjs'; import { parseEventMessage } from '../core/event-envelope.mjs'; import { loadConfig } from '../lib/config.mjs'; import { + claimNotificationDelivery, createPostgresPool, ensureHistorySchema, + finishNotificationDelivery, insertHistoryEvent, loadLatestPortfolioMetric, loadPortfolioMetricInputs, @@ -30,6 +40,34 @@ const pool = createPostgresPool({ }); await ensureHistorySchema(pool); +const notificationLogger = logger.child({ component: 'notifications' }); +const notificationClient = createNtfyNotificationClient({ + baseUrl: config.notificationNtfyBaseUrl, + topic: config.notificationNtfyTopic, + token: config.notificationNtfyToken, + timeoutMs: config.notificationNtfyTimeoutMs, + logger: notificationLogger, +}); +const notificationDispatcher = createNotificationDispatcher({ + client: notificationClient, + logger: notificationLogger, + store: { + claim: (notification) => claimNotificationDelivery(pool, { + notificationKey: notification.notification_key, + notificationType: notification.notification_type, + sourceKind: notification.source_kind, + sourceId: notification.source_id, + payload: notification.data || {}, + }), + finish: (notification, result) => finishNotificationDelivery(pool, { + notificationKey: notification.notification_key, + ok: result.ok, + response: result.ok ? { status: result.status, response: result.response || null } : null, + error: result.ok ? null : result.error || { status: result.status, response: result.response || null }, + }), + }, +}); + const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupHistory, brokers: config.kafkaBrokers, @@ -87,6 +125,10 @@ const state = { last_intent_request_outcomes_at: null, latest_intent_request_outcomes: null, intent_request_outcomes_error: null, + notification_count: 0, + notification_error_count: 0, + last_notification_at: null, + last_notification_error: null, }; await refreshPortfolioMetrics().catch((error) => { @@ -125,6 +167,7 @@ await consumer.run({ if (topic === config.kafkaTopicOpsAlert) { state.last_alert_write_at = state.last_write_at; } + await publishLiquidityNotification({ topic, event }); if (portfolioMetricTopics.has(topic)) { try { await refreshPortfolioMetrics(); @@ -140,7 +183,10 @@ await consumer.run({ } if (quoteOutcomeTopics.has(topic)) { try { - await refreshQuoteOutcomeAttributions(); + const records = await refreshQuoteOutcomeAttributions(); + await publishQuoteOutcomeNotifications(records, { + minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, + }); } catch (error) { state.quote_outcomes_error = serializeError(error); logger.error('quote_outcomes_refresh_failed', { @@ -153,7 +199,10 @@ await consumer.run({ } if (intentRequestOutcomeTopics.has(topic)) { try { - await refreshIntentRequestOutcomeAttributions(); + const records = await refreshIntentRequestOutcomeAttributions(); + await publishIntentRequestOutcomeNotifications(records, { + minObservedAt: event.observed_at || event.ingested_at || state.last_write_at, + }); } catch (error) { state.intent_request_outcomes_error = serializeError(error); logger.error('intent_request_outcomes_refresh_failed', { @@ -334,7 +383,41 @@ async function refreshQuoteOutcomeAttributions() { not_filled_count: records.filter((entry) => entry.outcome_status === 'not_filled').length, submitted_count: records.filter((entry) => entry.outcome_status === 'submitted').length, }; - return state.latest_quote_outcomes; + return records; +} + +async function publishLiquidityNotification({ topic, event }) { + if (topic !== config.kafkaTopicOpsLiquidityAction) return; + const notification = buildLiquidityActionNotification({ event, config }); + await publishNotification(notification); +} + +async function publishQuoteOutcomeNotifications(records, { minObservedAt = null } = {}) { + for (const record of records || []) { + if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue; + await publishNotification(buildQuoteOutcomeNotification({ record, config })); + } +} + +async function publishIntentRequestOutcomeNotifications(records, { minObservedAt = null } = {}) { + for (const record of records || []) { + if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue; + await publishNotification(buildIntentRequestOutcomeNotification({ record, config })); + } +} + +async function publishNotification(notification) { + if (!notification) return null; + const result = await notificationDispatcher.publishOnce(notification); + if (result.delivered) { + state.notification_count += 1; + state.last_notification_at = new Date().toISOString(); + state.last_notification_error = null; + } else if (result.ok === false) { + state.notification_error_count += 1; + state.last_notification_error = result.error || { status: result.status, response: result.response || null }; + } + return result; } function summarizePortfolioMetric(metric) { diff --git a/src/core/notification-layer.mjs b/src/core/notification-layer.mjs new file mode 100644 index 0000000..1aefcc7 --- /dev/null +++ b/src/core/notification-layer.mjs @@ -0,0 +1,236 @@ +import { formatUnitsDecimal } from './intent-requests.mjs'; + +const CREDITED_DEPOSIT_STATUSES = new Set(['CREDITED', 'COMPLETED', 'FINALIZED', 'SETTLED']); +const COMPLETED_WITHDRAWAL_STATUSES = new Set(['COMPLETED', 'FINALIZED', 'SETTLED']); +const SETTLED_ATTRIBUTION_STATUSES = new Set(['heuristic_match', 'linked_settlement']); + +export function buildLiquidityActionNotification({ event, config } = {}) { + const payload = event?.payload || event || {}; + const details = payload.details || {}; + const status = normalizeStatus(payload.status || details.status); + + if (payload.action_type === 'deposit_status_observed' && CREDITED_DEPOSIT_STATUSES.has(status)) { + if (!details.tx_hash) return null; + const asset = assetFor(config, payload.asset_id || details.asset_id); + const amount = formatAssetUnits(details.amount || '0', asset); + const sourceId = `${payload.chain || details.chain || 'unknown'}:${payload.asset_id || details.asset_id || 'unknown'}:${details.tx_hash}:${details.amount || '0'}`; + return { + notification_key: `deposit_credited:${sourceId}`, + notification_type: 'deposit_credited', + source_kind: 'liquidity_action', + source_id: details.tx_hash, + title: 'Deposit credited', + message: [ + `Deposit credited: ${amount}`, + `chain=${payload.chain || details.chain || 'unknown'}`, + `tx=${details.tx_hash}`, + ].join('\n'), + priority: 'default', + tags: ['unrip', 'deposit'], + data: { + action_type: payload.action_type, + status, + asset_id: payload.asset_id || details.asset_id || null, + amount_units: String(details.amount || '0'), + tx_hash: details.tx_hash, + chain: payload.chain || details.chain || null, + }, + }; + } + + if (payload.action_type === 'withdrawal_status_changed' && COMPLETED_WITHDRAWAL_STATUSES.has(status)) { + if (!details.withdrawal_hash) return null; + const asset = assetFor(config, payload.asset_id || details.asset_id); + const amount = formatAssetUnits(details.amount || '0', asset); + const sourceId = `${payload.chain || details.chain || 'unknown'}:${payload.asset_id || details.asset_id || 'unknown'}:${details.withdrawal_hash}`; + return { + notification_key: `withdrawal_completed:${sourceId}`, + notification_type: 'withdrawal_completed', + source_kind: 'liquidity_action', + source_id: details.withdrawal_hash, + title: 'Withdrawal completed', + message: [ + `Withdrawal completed: ${amount}`, + `chain=${payload.chain || details.chain || 'unknown'}`, + `withdrawal=${details.withdrawal_hash}`, + details.transfer_tx_hash ? `transfer=${details.transfer_tx_hash}` : null, + ].filter(Boolean).join('\n'), + priority: 'default', + tags: ['unrip', 'withdrawal'], + data: { + action_type: payload.action_type, + status, + asset_id: payload.asset_id || details.asset_id || null, + amount_units: String(details.amount || '0'), + withdrawal_hash: details.withdrawal_hash, + transfer_tx_hash: details.transfer_tx_hash || null, + chain: payload.chain || details.chain || null, + }, + }; + } + + return null; +} + +export function buildQuoteOutcomeNotification({ record, config } = {}) { + if (!hasProvenSettlement(record)) return null; + const quoteId = record.quote_id || record.payload?.quote_id || null; + if (!quoteId) return null; + const delta = record.attributed_inventory_delta || record.payload?.attributed_inventory_delta || {}; + const inventoryId = delta.inventory_id || 'unknown-inventory'; + const deltaSummary = formatDeltaSummary(delta.delta_units || {}, config); + return { + notification_key: `trade_completed:quote:${quoteId}:${inventoryId}`, + notification_type: 'trade_completed', + source_kind: 'quote_response', + source_id: quoteId, + title: 'Trade completed', + message: [ + `Quote trade completed: ${quoteId}`, + `asset movement: ${deltaSummary}`, + record.gross_edge_pct || record.payload?.gross_edge_pct ? `edge=${record.gross_edge_pct || record.payload?.gross_edge_pct}%` : null, + `inventory=${inventoryId}`, + ].filter(Boolean).join('\n'), + priority: 'default', + tags: ['unrip', 'trade'], + data: { + trade_kind: 'quote_response', + quote_id: quoteId, + decision_id: record.decision_id || record.payload?.decision_id || null, + command_id: record.command_id || record.payload?.command_id || null, + outcome_status: record.outcome_status || record.payload?.outcome_status || null, + attribution_status: record.attribution_status || record.payload?.attribution_status || null, + inventory_id: inventoryId, + delta_units: delta.delta_units || null, + gross_edge_pct: record.gross_edge_pct || record.payload?.gross_edge_pct || null, + }, + }; +} + +export function buildIntentRequestOutcomeNotification({ record, config } = {}) { + if (!hasProvenSettlement(record)) return null; + const requestId = record.request_id || record.payload?.request_id || null; + if (!requestId) return null; + const delta = record.attributed_inventory_delta || record.payload?.attributed_inventory_delta || {}; + const inventoryId = delta.inventory_id || 'unknown-inventory'; + const deltaSummary = formatDeltaSummary(delta.delta_units || {}, config); + return { + notification_key: `trade_completed:intent_request:${requestId}:${inventoryId}`, + notification_type: 'trade_completed', + source_kind: 'intent_request', + source_id: requestId, + title: 'Trade completed', + message: [ + `Own request completed: ${requestId}`, + `asset movement: ${deltaSummary}`, + record.intent_hash || record.payload?.intent_hash ? `intent=${record.intent_hash || record.payload?.intent_hash}` : null, + `inventory=${inventoryId}`, + ].filter(Boolean).join('\n'), + priority: 'default', + tags: ['unrip', 'trade'], + data: { + trade_kind: 'intent_request', + request_id: requestId, + idempotency_key: record.idempotency_key || record.payload?.idempotency_key || null, + submission_id: record.submission_id || record.payload?.submission_id || null, + intent_hash: record.intent_hash || record.payload?.intent_hash || null, + outcome_status: record.outcome_status || record.payload?.outcome_status || null, + attribution_status: record.attribution_status || record.payload?.attribution_status || null, + inventory_id: inventoryId, + delta_units: delta.delta_units || null, + }, + }; +} + +export function observedAtOrAfter(value, minObservedAt) { + if (!minObservedAt) return true; + const valueTs = Date.parse(value || ''); + const minTs = Date.parse(minObservedAt || ''); + return Number.isFinite(valueTs) && Number.isFinite(minTs) && valueTs >= minTs; +} + +export function hasProvenSettlement(record = {}) { + const payload = record.payload || {}; + const status = record.outcome_status || payload.outcome_status; + const attributionStatus = record.attribution_status || payload.attribution_status; + const delta = record.attributed_inventory_delta || payload.attributed_inventory_delta; + return status === 'completed' + && SETTLED_ATTRIBUTION_STATUSES.has(attributionStatus) + && Boolean(delta?.delta_units); +} + +export function createNotificationDispatcher({ client, store, logger = null } = {}) { + return { + async publishOnce(notification) { + if (!notification) return { ok: true, skipped: true, reason: 'no_notification' }; + if (!client?.isConfigured?.()) { + return { ok: true, skipped: true, reason: 'notification_client_not_configured' }; + } + + const claimed = await store.claim(notification); + if (!claimed) return { ok: true, skipped: true, reason: 'notification_already_delivered' }; + + const result = await client.publish({ + title: notification.title, + message: notification.message, + priority: notification.priority, + tags: notification.tags, + click: notification.click, + }); + + await store.finish(notification, result); + if (!result.ok) { + logger?.warn?.('notification_delivery_failed', { + details: { + notification_key: notification.notification_key, + notification_type: notification.notification_type, + result, + }, + }); + } + return result.ok + ? { ...result, delivered: true } + : { ...result, delivered: false }; + }, + }; +} + +function hasAssetRegistry(config) { + return config?.assetRegistry && typeof config.assetRegistry.get === 'function'; +} + +function assetFor(config, assetId) { + if (hasAssetRegistry(config) && config.assetRegistry.get(assetId)) return config.assetRegistry.get(assetId); + if (config?.tradingBtc?.assetId === assetId) return config.tradingBtc; + if (config?.tradingEure?.assetId === assetId) return config.tradingEure; + return { assetId, symbol: shortAssetId(assetId), decimals: 0 }; +} + +function formatAssetUnits(units, asset) { + return `${formatUnitsDecimal(units || '0', Number(asset?.decimals || 0))} ${asset?.symbol || shortAssetId(asset?.assetId)}`; +} + +function formatSignedAssetUnits(units, asset) { + const value = BigInt(String(units || '0')); + const sign = value > 0n ? '+' : value < 0n ? '-' : ''; + const absolute = value < 0n ? -value : value; + return `${sign}${formatAssetUnits(absolute.toString(), asset)}`; +} + +function formatDeltaSummary(deltaUnits, config) { + const parts = Object.entries(deltaUnits || {}) + .filter(([, units]) => BigInt(String(units || '0')) !== 0n) + .map(([assetId, units]) => formatSignedAssetUnits(units, assetFor(config, assetId))); + return parts.length ? parts.join(', ') : 'no non-zero delta'; +} + +function normalizeStatus(status) { + return String(status || '').trim().toUpperCase(); +} + +function shortAssetId(assetId) { + if (!assetId) return 'asset'; + const raw = String(assetId); + if (raw.length <= 18) return raw; + return `${raw.slice(0, 8)}...${raw.slice(-6)}`; +} diff --git a/src/lib/config.mjs b/src/lib/config.mjs index 4c52355..207d048 100644 --- a/src/lib/config.mjs +++ b/src/lib/config.mjs @@ -107,6 +107,7 @@ const DEFAULTS = { operatorDashboardUpstreamTimeoutMs: 3_000, notificationNtfyBaseUrl: '', notificationNtfyTopic: 'unrip', + notificationNtfyToken: '', notificationNtfyTimeoutMs: 5_000, }; @@ -578,6 +579,8 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.NOTIFICATION_NTFY_BASE_URL || DEFAULTS.notificationNtfyBaseUrl, notificationNtfyTopic: process.env.NOTIFICATION_NTFY_TOPIC || DEFAULTS.notificationNtfyTopic, + notificationNtfyToken: + process.env.NOTIFICATION_NTFY_TOKEN || DEFAULTS.notificationNtfyToken, notificationNtfyTimeoutMs: parseNumber( process.env.NOTIFICATION_NTFY_TIMEOUT_MS, DEFAULTS.notificationNtfyTimeoutMs, diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index 69f68e2..d91a90a 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -117,6 +117,31 @@ export async function ensureHistorySchema(pool) { ON ${PORTFOLIO_METRICS_TABLE} (computed_at DESC) `); + await pool.query(` + CREATE TABLE IF NOT EXISTS notification_deliveries ( + notification_key TEXT PRIMARY KEY, + notification_type TEXT NOT NULL, + source_kind TEXT NOT NULL, + source_id TEXT, + status TEXT NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + first_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + delivered_at TIMESTAMPTZ, + payload JSONB NOT NULL, + response JSONB, + error JSONB + ) + `); + await pool.query(` + CREATE INDEX IF NOT EXISTS notification_deliveries_status_idx + ON notification_deliveries (status, last_attempt_at DESC) + `); + await pool.query(` + CREATE INDEX IF NOT EXISTS notification_deliveries_type_idx + ON notification_deliveries (notification_type, last_attempt_at DESC) + `); + await pool.query(` CREATE TABLE IF NOT EXISTS ${QUOTE_OUTCOMES_TABLE} ( quote_id TEXT PRIMARY KEY, @@ -346,6 +371,78 @@ export async function loadLatestPortfolioMetric(pool) { return normalizePortfolioMetricRow(result.rows[0]); } +export async function claimNotificationDelivery(pool, { + notificationKey, + notificationType, + sourceKind, + sourceId = null, + payload = {}, +}) { + if (!notificationKey) throw new Error('notificationKey is required'); + if (!notificationType) throw new Error('notificationType is required'); + if (!sourceKind) throw new Error('sourceKind is required'); + + const result = await pool.query( + ` + INSERT INTO notification_deliveries ( + notification_key, + notification_type, + source_kind, + source_id, + status, + attempt_count, + payload, + response, + error + ) VALUES ($1, $2, $3, $4, 'pending', 1, $5::jsonb, NULL, NULL) + ON CONFLICT (notification_key) DO UPDATE SET + status = 'pending', + attempt_count = notification_deliveries.attempt_count + 1, + last_attempt_at = NOW(), + payload = EXCLUDED.payload, + response = NULL, + error = NULL + WHERE notification_deliveries.status <> 'delivered' + RETURNING notification_key + `, + [ + notificationKey, + notificationType, + sourceKind, + sourceId, + JSON.stringify(payload || {}), + ], + ); + return result.rowCount > 0; +} + +export async function finishNotificationDelivery(pool, { + notificationKey, + ok, + response = null, + error = null, +}) { + if (!notificationKey) throw new Error('notificationKey is required'); + await pool.query( + ` + UPDATE notification_deliveries + SET + status = $2, + delivered_at = CASE WHEN $2 = 'delivered' THEN NOW() ELSE delivered_at END, + last_attempt_at = NOW(), + response = $3::jsonb, + error = $4::jsonb + WHERE notification_key = $1 + `, + [ + notificationKey, + ok ? 'delivered' : 'failed', + response ? JSON.stringify(response) : null, + error ? JSON.stringify(error) : null, + ], + ); +} + export async function refreshQuoteOutcomes(pool, { btcAsset = null, eureAsset = null, diff --git a/test/notification-client.test.mjs b/test/notification-client.test.mjs index e951a43..f8475a1 100644 --- a/test/notification-client.test.mjs +++ b/test/notification-client.test.mjs @@ -17,6 +17,7 @@ test('ntfy publish request uses repo configured base url and topic', () => { priority: 'high', tags: ['warning', 'unrip'], click: 'https://dashboard.example/strategy', + token: 'secret-token', }); assert.equal(request.ok, true); @@ -27,6 +28,7 @@ test('ntfy publish request uses repo configured base url and topic', () => { assert.equal(request.init.headers.Priority, 'high'); assert.equal(request.init.headers.Tags, 'warning,unrip'); assert.equal(request.init.headers.Click, 'https://dashboard.example/strategy'); + assert.equal(request.init.headers.Authorization, 'Bearer secret-token'); }); test('ntfy notification client skips when endpoint or message is not configured', async () => { @@ -76,20 +78,24 @@ test('config exposes ntfy notification defaults and environment overrides', () = baseUrl: process.env.NOTIFICATION_NTFY_BASE_URL, topic: process.env.NOTIFICATION_NTFY_TOPIC, timeout: process.env.NOTIFICATION_NTFY_TIMEOUT_MS, + token: process.env.NOTIFICATION_NTFY_TOKEN, }; process.env.NOTIFICATION_NTFY_BASE_URL = 'http://ntfy.utility.svc.cluster.local'; process.env.NOTIFICATION_NTFY_TOPIC = 'unrip'; process.env.NOTIFICATION_NTFY_TIMEOUT_MS = '4321'; + process.env.NOTIFICATION_NTFY_TOKEN = 'secret-token'; try { const config = loadConfig({ envPath: '/tmp/unrip-missing-env-for-notification-test' }); assert.equal(config.notificationNtfyBaseUrl, 'http://ntfy.utility.svc.cluster.local'); assert.equal(config.notificationNtfyTopic, 'unrip'); assert.equal(config.notificationNtfyTimeoutMs, 4321); + assert.equal(config.notificationNtfyToken, 'secret-token'); } finally { restoreEnv('NOTIFICATION_NTFY_BASE_URL', previous.baseUrl); restoreEnv('NOTIFICATION_NTFY_TOPIC', previous.topic); restoreEnv('NOTIFICATION_NTFY_TIMEOUT_MS', previous.timeout); + restoreEnv('NOTIFICATION_NTFY_TOKEN', previous.token); } }); diff --git a/test/notification-layer.test.mjs b/test/notification-layer.test.mjs new file mode 100644 index 0000000..8efa70e --- /dev/null +++ b/test/notification-layer.test.mjs @@ -0,0 +1,205 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { + buildIntentRequestOutcomeNotification, + buildLiquidityActionNotification, + buildQuoteOutcomeNotification, + createNotificationDispatcher, + hasProvenSettlement, + observedAtOrAfter, +} from '../src/core/notification-layer.mjs'; + +const BTC = { assetId: 'nep141:btc.omft.near', symbol: 'BTC', decimals: 8 }; +const EURE = { assetId: 'nep141:eure.omft.near', symbol: 'EURe', decimals: 18 }; +const config = { + tradingBtc: BTC, + tradingEure: EURE, + assetRegistry: new Map([ + [BTC.assetId, BTC], + [EURE.assetId, EURE], + ]), +}; + +test('liquidity notifications only emit credited deposits and completed withdrawals', () => { + assert.equal(buildLiquidityActionNotification({ + config, + event: { + payload: { + action_type: 'deposit_status_observed', + status: 'PENDING', + chain: 'eth:100', + asset_id: EURE.assetId, + details: { amount: '5000000000000000000', tx_hash: 'deposit-tx-1' }, + }, + }, + }), null); + + const deposit = buildLiquidityActionNotification({ + config, + event: { + payload: { + action_type: 'deposit_status_observed', + status: 'CREDITED', + chain: 'eth:100', + asset_id: EURE.assetId, + details: { amount: '5000000000000000000', tx_hash: 'deposit-tx-1' }, + }, + }, + }); + assert.equal(deposit.notification_type, 'deposit_credited'); + assert.match(deposit.message, /Deposit credited: 5 EURe/); + assert.match(deposit.message, /tx=deposit-tx-1/); + + assert.equal(buildLiquidityActionNotification({ + config, + event: { + payload: { + action_type: 'withdrawal_status_changed', + status: 'SUBMITTED', + chain: 'btc:mainnet', + asset_id: BTC.assetId, + details: { amount: '100000', withdrawal_hash: 'withdrawal-1' }, + }, + }, + }), null); + + const withdrawal = buildLiquidityActionNotification({ + config, + event: { + payload: { + action_type: 'withdrawal_status_changed', + status: 'COMPLETED', + chain: 'btc:mainnet', + asset_id: BTC.assetId, + details: { amount: '100000', withdrawal_hash: 'withdrawal-1', transfer_tx_hash: 'btc-tx-1' }, + }, + }, + }); + assert.equal(withdrawal.notification_type, 'withdrawal_completed'); + assert.match(withdrawal.message, /Withdrawal completed: 0.001 BTC/); + assert.match(withdrawal.message, /transfer=btc-tx-1/); +}); + +test('trade notifications require completed outcome with durable settlement evidence', () => { + const submitted = { + quote_id: 'quote-submitted', + outcome_status: 'submitted', + attribution_status: 'unattributed', + attributed_inventory_delta: null, + }; + assert.equal(hasProvenSettlement(submitted), false); + assert.equal(buildQuoteOutcomeNotification({ record: submitted, config }), null); + + const completedWithoutDelta = { + quote_id: 'quote-fake-completed', + outcome_status: 'completed', + attribution_status: 'unattributed', + attributed_inventory_delta: null, + }; + assert.equal(hasProvenSettlement(completedWithoutDelta), false); + assert.equal(buildQuoteOutcomeNotification({ record: completedWithoutDelta, config }), null); + + const completed = { + quote_id: 'quote-real', + decision_id: 'decision-real', + command_id: 'command-real', + gross_edge_pct: '0.49', + outcome_status: 'completed', + attribution_status: 'heuristic_match', + attributed_inventory_delta: { + inventory_id: 'inventory-after', + delta_units: { + [BTC.assetId]: '-12345', + [EURE.assetId]: '76500000000000000000', + }, + }, + }; + const notification = buildQuoteOutcomeNotification({ record: completed, config }); + assert.equal(notification.notification_type, 'trade_completed'); + assert.match(notification.message, /Quote trade completed: quote-real/); + assert.match(notification.message, /-0.00012345 BTC/); + assert.match(notification.message, /\+76.5 EURe/); +}); + +test('own request notifications do not emit from relay acceptance alone', () => { + const accepted = { + request_id: 'request-accepted', + outcome_status: 'awaiting_settlement', + submission_status: 'accepted_by_relay', + relay_status: 'SETTLED', + attribution_status: 'unattributed', + attributed_inventory_delta: null, + }; + assert.equal(buildIntentRequestOutcomeNotification({ record: accepted, config }), null); + + const completed = { + request_id: 'request-completed', + idempotency_key: 'idem-1', + intent_hash: 'intent-hash-1', + outcome_status: 'completed', + attribution_status: 'linked_settlement', + attributed_inventory_delta: { + inventory_id: 'inventory-after-request', + delta_units: { + [EURE.assetId]: '-5000000000000000000', + [BTC.assetId]: '8000', + }, + }, + }; + const notification = buildIntentRequestOutcomeNotification({ record: completed, config }); + assert.equal(notification.notification_type, 'trade_completed'); + assert.match(notification.message, /Own request completed: request-completed/); + assert.match(notification.message, /-5 EURe/); + assert.match(notification.message, /\+0.00008 BTC/); +}); + +test('notification dispatcher publishes each durable notification key once', async () => { + const calls = []; + const claimed = new Set(); + const finished = []; + const dispatcher = createNotificationDispatcher({ + client: { + isConfigured: () => true, + async publish(notification) { + calls.push(notification); + return { ok: true, status: 200, response: '{"id":"msg-1"}' }; + }, + }, + store: { + async claim(notification) { + if (claimed.has(notification.notification_key)) return false; + claimed.add(notification.notification_key); + return true; + }, + async finish(notification, result) { + finished.push({ notification, result }); + }, + }, + }); + + const notification = { + notification_key: 'deposit_credited:eth:100:eure:tx-1:5000', + notification_type: 'deposit_credited', + source_kind: 'liquidity_action', + source_id: 'tx-1', + title: 'Deposit credited', + message: 'Deposit credited: 5 EURe', + priority: 'default', + tags: ['unrip', 'deposit'], + }; + + assert.equal((await dispatcher.publishOnce(notification)).delivered, true); + assert.deepEqual(await dispatcher.publishOnce(notification), { + ok: true, + skipped: true, + reason: 'notification_already_delivered', + }); + assert.equal(calls.length, 1); + assert.equal(finished.length, 1); +}); + +test('outcome recency filter keeps startup refresh from sending historical notifications', () => { + assert.equal(observedAtOrAfter('2026-04-16T10:00:00.000Z', '2026-04-16T09:59:59.000Z'), true); + assert.equal(observedAtOrAfter('2026-04-16T09:00:00.000Z', '2026-04-16T09:59:59.000Z'), false); +}); diff --git a/test/ntfy_manifest_test.py b/test/ntfy_manifest_test.py index ef76229..ed67add 100644 --- a/test/ntfy_manifest_test.py +++ b/test/ntfy_manifest_test.py @@ -21,6 +21,13 @@ class NtfyManifestTest(unittest.TestCase): source = (ROOT / '.forgejo/workflows/deploy.yml').read_text() self.assertNotIn('deployment/ntfy', source) + def test_notification_token_is_secret_bootstrap_only(self): + manifest = (ROOT / 'deploy/k8s/base/unrip.yaml').read_text() + self.assertNotIn('NOTIFICATION_NTFY_TOKEN:', manifest) + bootstrap = (ROOT / 'scripts/deploy/bootstrap.sh').read_text() + self.assertIn('NOTIFICATION_NTFY_TOKEN', bootstrap) + self.assertIn('--from-literal=NOTIFICATION_NTFY_TOKEN=', bootstrap) + if __name__ == '__main__': unittest.main()