Notify on durable fund and trade outcomes
Some checks failed
deploy / deploy (push) Failing after 3m30s
Some checks failed
deploy / deploy (push) Failing after 3m30s
Proof: npm test; npm run operator-dashboard:build; PYTHONPATH=. python3 test/render_release_manifest_test.py; PYTHONPATH=. python3 test/repo_deployments_test.py; PYTHONPATH=. python3 test/ntfy_manifest_test.py; kubectl kustomize deploy/k8s/base. Assumptions: notifications should be emitted by history-writer after durable writes and outcome refreshes, and only for credited deposits, completed withdrawals, and completed trades with linked inventory movement evidence. Still fake: Generic alert notification policy is not re-enabled; withdrawal submitted notifications are not emitted; old historical outcomes are not backfilled as notifications; fee-complete realized PnL is still unavailable.
This commit is contained in:
parent
f3676c201f
commit
c5a214ce06
8 changed files with 648 additions and 3 deletions
|
|
@ -135,6 +135,7 @@ fi
|
|||
: "${POSTGRES_PASSWORD:=}"
|
||||
: "${POSTGRES_URL:=}"
|
||||
: "${NEAR_INTENTS_SIGNER_PRIVATE_KEY:=}"
|
||||
: "${NOTIFICATION_NTFY_TOKEN:=}"
|
||||
|
||||
secret_value() {
|
||||
local key="$1"
|
||||
|
|
@ -153,6 +154,10 @@ if [[ -z "$NEAR_INTENTS_SIGNER_PRIVATE_KEY" ]]; then
|
|||
NEAR_INTENTS_SIGNER_PRIVATE_KEY="$(secret_value NEAR_INTENTS_SIGNER_PRIVATE_KEY)"
|
||||
fi
|
||||
|
||||
if [[ -z "$NOTIFICATION_NTFY_TOKEN" ]]; then
|
||||
NOTIFICATION_NTFY_TOKEN="$(secret_value NOTIFICATION_NTFY_TOKEN)"
|
||||
fi
|
||||
|
||||
if [[ -z "$POSTGRES_PASSWORD" ]]; then
|
||||
POSTGRES_PASSWORD="$(python3 - <<'PY'
|
||||
import secrets
|
||||
|
|
@ -177,6 +182,9 @@ secret_args=(
|
|||
if [[ -n "$NEAR_INTENTS_SIGNER_PRIVATE_KEY" ]]; then
|
||||
secret_args+=(--from-literal=NEAR_INTENTS_SIGNER_PRIVATE_KEY="$NEAR_INTENTS_SIGNER_PRIVATE_KEY")
|
||||
fi
|
||||
if [[ -n "$NOTIFICATION_NTFY_TOKEN" ]]; then
|
||||
secret_args+=(--from-literal=NOTIFICATION_NTFY_TOKEN="$NOTIFICATION_NTFY_TOKEN")
|
||||
fi
|
||||
kubectl -n "$PROJECT_NAMESPACE" create secret generic "$APP_SECRET_NAME" \
|
||||
"${secret_args[@]}" \
|
||||
--dry-run=client -o yaml | kubectl apply -f -
|
||||
|
|
|
|||
|
|
@ -4,12 +4,22 @@ import { createConsumer } from '../bus/kafka/consumer.mjs';
|
|||
import { startControlApi } from '../core/control-api.mjs';
|
||||
import { routeHistoryRecord } from '../core/history-records.mjs';
|
||||
import { createLogger, serializeError } from '../core/log.mjs';
|
||||
import {
|
||||
buildIntentRequestOutcomeNotification,
|
||||
buildLiquidityActionNotification,
|
||||
buildQuoteOutcomeNotification,
|
||||
createNotificationDispatcher,
|
||||
observedAtOrAfter,
|
||||
} from '../core/notification-layer.mjs';
|
||||
import { createNtfyNotificationClient } from '../core/notification-client.mjs';
|
||||
import { buildPortfolioMetricId, computePortfolioMetric } from '../core/portfolio-metrics.mjs';
|
||||
import { parseEventMessage } from '../core/event-envelope.mjs';
|
||||
import { loadConfig } from '../lib/config.mjs';
|
||||
import {
|
||||
claimNotificationDelivery,
|
||||
createPostgresPool,
|
||||
ensureHistorySchema,
|
||||
finishNotificationDelivery,
|
||||
insertHistoryEvent,
|
||||
loadLatestPortfolioMetric,
|
||||
loadPortfolioMetricInputs,
|
||||
|
|
@ -30,6 +40,34 @@ const pool = createPostgresPool({
|
|||
});
|
||||
await ensureHistorySchema(pool);
|
||||
|
||||
const notificationLogger = logger.child({ component: 'notifications' });
|
||||
const notificationClient = createNtfyNotificationClient({
|
||||
baseUrl: config.notificationNtfyBaseUrl,
|
||||
topic: config.notificationNtfyTopic,
|
||||
token: config.notificationNtfyToken,
|
||||
timeoutMs: config.notificationNtfyTimeoutMs,
|
||||
logger: notificationLogger,
|
||||
});
|
||||
const notificationDispatcher = createNotificationDispatcher({
|
||||
client: notificationClient,
|
||||
logger: notificationLogger,
|
||||
store: {
|
||||
claim: (notification) => claimNotificationDelivery(pool, {
|
||||
notificationKey: notification.notification_key,
|
||||
notificationType: notification.notification_type,
|
||||
sourceKind: notification.source_kind,
|
||||
sourceId: notification.source_id,
|
||||
payload: notification.data || {},
|
||||
}),
|
||||
finish: (notification, result) => finishNotificationDelivery(pool, {
|
||||
notificationKey: notification.notification_key,
|
||||
ok: result.ok,
|
||||
response: result.ok ? { status: result.status, response: result.response || null } : null,
|
||||
error: result.ok ? null : result.error || { status: result.status, response: result.response || null },
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
const consumer = await createConsumer({
|
||||
groupId: config.kafkaConsumerGroupHistory,
|
||||
brokers: config.kafkaBrokers,
|
||||
|
|
@ -87,6 +125,10 @@ const state = {
|
|||
last_intent_request_outcomes_at: null,
|
||||
latest_intent_request_outcomes: null,
|
||||
intent_request_outcomes_error: null,
|
||||
notification_count: 0,
|
||||
notification_error_count: 0,
|
||||
last_notification_at: null,
|
||||
last_notification_error: null,
|
||||
};
|
||||
|
||||
await refreshPortfolioMetrics().catch((error) => {
|
||||
|
|
@ -125,6 +167,7 @@ await consumer.run({
|
|||
if (topic === config.kafkaTopicOpsAlert) {
|
||||
state.last_alert_write_at = state.last_write_at;
|
||||
}
|
||||
await publishLiquidityNotification({ topic, event });
|
||||
if (portfolioMetricTopics.has(topic)) {
|
||||
try {
|
||||
await refreshPortfolioMetrics();
|
||||
|
|
@ -140,7 +183,10 @@ await consumer.run({
|
|||
}
|
||||
if (quoteOutcomeTopics.has(topic)) {
|
||||
try {
|
||||
await refreshQuoteOutcomeAttributions();
|
||||
const records = await refreshQuoteOutcomeAttributions();
|
||||
await publishQuoteOutcomeNotifications(records, {
|
||||
minObservedAt: event.observed_at || event.ingested_at || state.last_write_at,
|
||||
});
|
||||
} catch (error) {
|
||||
state.quote_outcomes_error = serializeError(error);
|
||||
logger.error('quote_outcomes_refresh_failed', {
|
||||
|
|
@ -153,7 +199,10 @@ await consumer.run({
|
|||
}
|
||||
if (intentRequestOutcomeTopics.has(topic)) {
|
||||
try {
|
||||
await refreshIntentRequestOutcomeAttributions();
|
||||
const records = await refreshIntentRequestOutcomeAttributions();
|
||||
await publishIntentRequestOutcomeNotifications(records, {
|
||||
minObservedAt: event.observed_at || event.ingested_at || state.last_write_at,
|
||||
});
|
||||
} catch (error) {
|
||||
state.intent_request_outcomes_error = serializeError(error);
|
||||
logger.error('intent_request_outcomes_refresh_failed', {
|
||||
|
|
@ -334,7 +383,41 @@ async function refreshQuoteOutcomeAttributions() {
|
|||
not_filled_count: records.filter((entry) => entry.outcome_status === 'not_filled').length,
|
||||
submitted_count: records.filter((entry) => entry.outcome_status === 'submitted').length,
|
||||
};
|
||||
return state.latest_quote_outcomes;
|
||||
return records;
|
||||
}
|
||||
|
||||
async function publishLiquidityNotification({ topic, event }) {
|
||||
if (topic !== config.kafkaTopicOpsLiquidityAction) return;
|
||||
const notification = buildLiquidityActionNotification({ event, config });
|
||||
await publishNotification(notification);
|
||||
}
|
||||
|
||||
async function publishQuoteOutcomeNotifications(records, { minObservedAt = null } = {}) {
|
||||
for (const record of records || []) {
|
||||
if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue;
|
||||
await publishNotification(buildQuoteOutcomeNotification({ record, config }));
|
||||
}
|
||||
}
|
||||
|
||||
async function publishIntentRequestOutcomeNotifications(records, { minObservedAt = null } = {}) {
|
||||
for (const record of records || []) {
|
||||
if (!observedAtOrAfter(record.outcome_observed_at, minObservedAt)) continue;
|
||||
await publishNotification(buildIntentRequestOutcomeNotification({ record, config }));
|
||||
}
|
||||
}
|
||||
|
||||
async function publishNotification(notification) {
|
||||
if (!notification) return null;
|
||||
const result = await notificationDispatcher.publishOnce(notification);
|
||||
if (result.delivered) {
|
||||
state.notification_count += 1;
|
||||
state.last_notification_at = new Date().toISOString();
|
||||
state.last_notification_error = null;
|
||||
} else if (result.ok === false) {
|
||||
state.notification_error_count += 1;
|
||||
state.last_notification_error = result.error || { status: result.status, response: result.response || null };
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function summarizePortfolioMetric(metric) {
|
||||
|
|
|
|||
236
src/core/notification-layer.mjs
Normal file
236
src/core/notification-layer.mjs
Normal file
|
|
@ -0,0 +1,236 @@
|
|||
import { formatUnitsDecimal } from './intent-requests.mjs';
|
||||
|
||||
const CREDITED_DEPOSIT_STATUSES = new Set(['CREDITED', 'COMPLETED', 'FINALIZED', 'SETTLED']);
|
||||
const COMPLETED_WITHDRAWAL_STATUSES = new Set(['COMPLETED', 'FINALIZED', 'SETTLED']);
|
||||
const SETTLED_ATTRIBUTION_STATUSES = new Set(['heuristic_match', 'linked_settlement']);
|
||||
|
||||
export function buildLiquidityActionNotification({ event, config } = {}) {
|
||||
const payload = event?.payload || event || {};
|
||||
const details = payload.details || {};
|
||||
const status = normalizeStatus(payload.status || details.status);
|
||||
|
||||
if (payload.action_type === 'deposit_status_observed' && CREDITED_DEPOSIT_STATUSES.has(status)) {
|
||||
if (!details.tx_hash) return null;
|
||||
const asset = assetFor(config, payload.asset_id || details.asset_id);
|
||||
const amount = formatAssetUnits(details.amount || '0', asset);
|
||||
const sourceId = `${payload.chain || details.chain || 'unknown'}:${payload.asset_id || details.asset_id || 'unknown'}:${details.tx_hash}:${details.amount || '0'}`;
|
||||
return {
|
||||
notification_key: `deposit_credited:${sourceId}`,
|
||||
notification_type: 'deposit_credited',
|
||||
source_kind: 'liquidity_action',
|
||||
source_id: details.tx_hash,
|
||||
title: 'Deposit credited',
|
||||
message: [
|
||||
`Deposit credited: ${amount}`,
|
||||
`chain=${payload.chain || details.chain || 'unknown'}`,
|
||||
`tx=${details.tx_hash}`,
|
||||
].join('\n'),
|
||||
priority: 'default',
|
||||
tags: ['unrip', 'deposit'],
|
||||
data: {
|
||||
action_type: payload.action_type,
|
||||
status,
|
||||
asset_id: payload.asset_id || details.asset_id || null,
|
||||
amount_units: String(details.amount || '0'),
|
||||
tx_hash: details.tx_hash,
|
||||
chain: payload.chain || details.chain || null,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
if (payload.action_type === 'withdrawal_status_changed' && COMPLETED_WITHDRAWAL_STATUSES.has(status)) {
|
||||
if (!details.withdrawal_hash) return null;
|
||||
const asset = assetFor(config, payload.asset_id || details.asset_id);
|
||||
const amount = formatAssetUnits(details.amount || '0', asset);
|
||||
const sourceId = `${payload.chain || details.chain || 'unknown'}:${payload.asset_id || details.asset_id || 'unknown'}:${details.withdrawal_hash}`;
|
||||
return {
|
||||
notification_key: `withdrawal_completed:${sourceId}`,
|
||||
notification_type: 'withdrawal_completed',
|
||||
source_kind: 'liquidity_action',
|
||||
source_id: details.withdrawal_hash,
|
||||
title: 'Withdrawal completed',
|
||||
message: [
|
||||
`Withdrawal completed: ${amount}`,
|
||||
`chain=${payload.chain || details.chain || 'unknown'}`,
|
||||
`withdrawal=${details.withdrawal_hash}`,
|
||||
details.transfer_tx_hash ? `transfer=${details.transfer_tx_hash}` : null,
|
||||
].filter(Boolean).join('\n'),
|
||||
priority: 'default',
|
||||
tags: ['unrip', 'withdrawal'],
|
||||
data: {
|
||||
action_type: payload.action_type,
|
||||
status,
|
||||
asset_id: payload.asset_id || details.asset_id || null,
|
||||
amount_units: String(details.amount || '0'),
|
||||
withdrawal_hash: details.withdrawal_hash,
|
||||
transfer_tx_hash: details.transfer_tx_hash || null,
|
||||
chain: payload.chain || details.chain || null,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
export function buildQuoteOutcomeNotification({ record, config } = {}) {
|
||||
if (!hasProvenSettlement(record)) return null;
|
||||
const quoteId = record.quote_id || record.payload?.quote_id || null;
|
||||
if (!quoteId) return null;
|
||||
const delta = record.attributed_inventory_delta || record.payload?.attributed_inventory_delta || {};
|
||||
const inventoryId = delta.inventory_id || 'unknown-inventory';
|
||||
const deltaSummary = formatDeltaSummary(delta.delta_units || {}, config);
|
||||
return {
|
||||
notification_key: `trade_completed:quote:${quoteId}:${inventoryId}`,
|
||||
notification_type: 'trade_completed',
|
||||
source_kind: 'quote_response',
|
||||
source_id: quoteId,
|
||||
title: 'Trade completed',
|
||||
message: [
|
||||
`Quote trade completed: ${quoteId}`,
|
||||
`asset movement: ${deltaSummary}`,
|
||||
record.gross_edge_pct || record.payload?.gross_edge_pct ? `edge=${record.gross_edge_pct || record.payload?.gross_edge_pct}%` : null,
|
||||
`inventory=${inventoryId}`,
|
||||
].filter(Boolean).join('\n'),
|
||||
priority: 'default',
|
||||
tags: ['unrip', 'trade'],
|
||||
data: {
|
||||
trade_kind: 'quote_response',
|
||||
quote_id: quoteId,
|
||||
decision_id: record.decision_id || record.payload?.decision_id || null,
|
||||
command_id: record.command_id || record.payload?.command_id || null,
|
||||
outcome_status: record.outcome_status || record.payload?.outcome_status || null,
|
||||
attribution_status: record.attribution_status || record.payload?.attribution_status || null,
|
||||
inventory_id: inventoryId,
|
||||
delta_units: delta.delta_units || null,
|
||||
gross_edge_pct: record.gross_edge_pct || record.payload?.gross_edge_pct || null,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function buildIntentRequestOutcomeNotification({ record, config } = {}) {
|
||||
if (!hasProvenSettlement(record)) return null;
|
||||
const requestId = record.request_id || record.payload?.request_id || null;
|
||||
if (!requestId) return null;
|
||||
const delta = record.attributed_inventory_delta || record.payload?.attributed_inventory_delta || {};
|
||||
const inventoryId = delta.inventory_id || 'unknown-inventory';
|
||||
const deltaSummary = formatDeltaSummary(delta.delta_units || {}, config);
|
||||
return {
|
||||
notification_key: `trade_completed:intent_request:${requestId}:${inventoryId}`,
|
||||
notification_type: 'trade_completed',
|
||||
source_kind: 'intent_request',
|
||||
source_id: requestId,
|
||||
title: 'Trade completed',
|
||||
message: [
|
||||
`Own request completed: ${requestId}`,
|
||||
`asset movement: ${deltaSummary}`,
|
||||
record.intent_hash || record.payload?.intent_hash ? `intent=${record.intent_hash || record.payload?.intent_hash}` : null,
|
||||
`inventory=${inventoryId}`,
|
||||
].filter(Boolean).join('\n'),
|
||||
priority: 'default',
|
||||
tags: ['unrip', 'trade'],
|
||||
data: {
|
||||
trade_kind: 'intent_request',
|
||||
request_id: requestId,
|
||||
idempotency_key: record.idempotency_key || record.payload?.idempotency_key || null,
|
||||
submission_id: record.submission_id || record.payload?.submission_id || null,
|
||||
intent_hash: record.intent_hash || record.payload?.intent_hash || null,
|
||||
outcome_status: record.outcome_status || record.payload?.outcome_status || null,
|
||||
attribution_status: record.attribution_status || record.payload?.attribution_status || null,
|
||||
inventory_id: inventoryId,
|
||||
delta_units: delta.delta_units || null,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function observedAtOrAfter(value, minObservedAt) {
|
||||
if (!minObservedAt) return true;
|
||||
const valueTs = Date.parse(value || '');
|
||||
const minTs = Date.parse(minObservedAt || '');
|
||||
return Number.isFinite(valueTs) && Number.isFinite(minTs) && valueTs >= minTs;
|
||||
}
|
||||
|
||||
export function hasProvenSettlement(record = {}) {
|
||||
const payload = record.payload || {};
|
||||
const status = record.outcome_status || payload.outcome_status;
|
||||
const attributionStatus = record.attribution_status || payload.attribution_status;
|
||||
const delta = record.attributed_inventory_delta || payload.attributed_inventory_delta;
|
||||
return status === 'completed'
|
||||
&& SETTLED_ATTRIBUTION_STATUSES.has(attributionStatus)
|
||||
&& Boolean(delta?.delta_units);
|
||||
}
|
||||
|
||||
export function createNotificationDispatcher({ client, store, logger = null } = {}) {
|
||||
return {
|
||||
async publishOnce(notification) {
|
||||
if (!notification) return { ok: true, skipped: true, reason: 'no_notification' };
|
||||
if (!client?.isConfigured?.()) {
|
||||
return { ok: true, skipped: true, reason: 'notification_client_not_configured' };
|
||||
}
|
||||
|
||||
const claimed = await store.claim(notification);
|
||||
if (!claimed) return { ok: true, skipped: true, reason: 'notification_already_delivered' };
|
||||
|
||||
const result = await client.publish({
|
||||
title: notification.title,
|
||||
message: notification.message,
|
||||
priority: notification.priority,
|
||||
tags: notification.tags,
|
||||
click: notification.click,
|
||||
});
|
||||
|
||||
await store.finish(notification, result);
|
||||
if (!result.ok) {
|
||||
logger?.warn?.('notification_delivery_failed', {
|
||||
details: {
|
||||
notification_key: notification.notification_key,
|
||||
notification_type: notification.notification_type,
|
||||
result,
|
||||
},
|
||||
});
|
||||
}
|
||||
return result.ok
|
||||
? { ...result, delivered: true }
|
||||
: { ...result, delivered: false };
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function hasAssetRegistry(config) {
|
||||
return config?.assetRegistry && typeof config.assetRegistry.get === 'function';
|
||||
}
|
||||
|
||||
function assetFor(config, assetId) {
|
||||
if (hasAssetRegistry(config) && config.assetRegistry.get(assetId)) return config.assetRegistry.get(assetId);
|
||||
if (config?.tradingBtc?.assetId === assetId) return config.tradingBtc;
|
||||
if (config?.tradingEure?.assetId === assetId) return config.tradingEure;
|
||||
return { assetId, symbol: shortAssetId(assetId), decimals: 0 };
|
||||
}
|
||||
|
||||
function formatAssetUnits(units, asset) {
|
||||
return `${formatUnitsDecimal(units || '0', Number(asset?.decimals || 0))} ${asset?.symbol || shortAssetId(asset?.assetId)}`;
|
||||
}
|
||||
|
||||
function formatSignedAssetUnits(units, asset) {
|
||||
const value = BigInt(String(units || '0'));
|
||||
const sign = value > 0n ? '+' : value < 0n ? '-' : '';
|
||||
const absolute = value < 0n ? -value : value;
|
||||
return `${sign}${formatAssetUnits(absolute.toString(), asset)}`;
|
||||
}
|
||||
|
||||
function formatDeltaSummary(deltaUnits, config) {
|
||||
const parts = Object.entries(deltaUnits || {})
|
||||
.filter(([, units]) => BigInt(String(units || '0')) !== 0n)
|
||||
.map(([assetId, units]) => formatSignedAssetUnits(units, assetFor(config, assetId)));
|
||||
return parts.length ? parts.join(', ') : 'no non-zero delta';
|
||||
}
|
||||
|
||||
function normalizeStatus(status) {
|
||||
return String(status || '').trim().toUpperCase();
|
||||
}
|
||||
|
||||
function shortAssetId(assetId) {
|
||||
if (!assetId) return 'asset';
|
||||
const raw = String(assetId);
|
||||
if (raw.length <= 18) return raw;
|
||||
return `${raw.slice(0, 8)}...${raw.slice(-6)}`;
|
||||
}
|
||||
|
|
@ -107,6 +107,7 @@ const DEFAULTS = {
|
|||
operatorDashboardUpstreamTimeoutMs: 3_000,
|
||||
notificationNtfyBaseUrl: '',
|
||||
notificationNtfyTopic: 'unrip',
|
||||
notificationNtfyToken: '',
|
||||
notificationNtfyTimeoutMs: 5_000,
|
||||
};
|
||||
|
||||
|
|
@ -578,6 +579,8 @@ export function loadConfig({ envPath = '.env' } = {}) {
|
|||
process.env.NOTIFICATION_NTFY_BASE_URL || DEFAULTS.notificationNtfyBaseUrl,
|
||||
notificationNtfyTopic:
|
||||
process.env.NOTIFICATION_NTFY_TOPIC || DEFAULTS.notificationNtfyTopic,
|
||||
notificationNtfyToken:
|
||||
process.env.NOTIFICATION_NTFY_TOKEN || DEFAULTS.notificationNtfyToken,
|
||||
notificationNtfyTimeoutMs: parseNumber(
|
||||
process.env.NOTIFICATION_NTFY_TIMEOUT_MS,
|
||||
DEFAULTS.notificationNtfyTimeoutMs,
|
||||
|
|
|
|||
|
|
@ -117,6 +117,31 @@ export async function ensureHistorySchema(pool) {
|
|||
ON ${PORTFOLIO_METRICS_TABLE} (computed_at DESC)
|
||||
`);
|
||||
|
||||
await pool.query(`
|
||||
CREATE TABLE IF NOT EXISTS notification_deliveries (
|
||||
notification_key TEXT PRIMARY KEY,
|
||||
notification_type TEXT NOT NULL,
|
||||
source_kind TEXT NOT NULL,
|
||||
source_id TEXT,
|
||||
status TEXT NOT NULL,
|
||||
attempt_count INTEGER NOT NULL DEFAULT 0,
|
||||
first_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
last_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
delivered_at TIMESTAMPTZ,
|
||||
payload JSONB NOT NULL,
|
||||
response JSONB,
|
||||
error JSONB
|
||||
)
|
||||
`);
|
||||
await pool.query(`
|
||||
CREATE INDEX IF NOT EXISTS notification_deliveries_status_idx
|
||||
ON notification_deliveries (status, last_attempt_at DESC)
|
||||
`);
|
||||
await pool.query(`
|
||||
CREATE INDEX IF NOT EXISTS notification_deliveries_type_idx
|
||||
ON notification_deliveries (notification_type, last_attempt_at DESC)
|
||||
`);
|
||||
|
||||
await pool.query(`
|
||||
CREATE TABLE IF NOT EXISTS ${QUOTE_OUTCOMES_TABLE} (
|
||||
quote_id TEXT PRIMARY KEY,
|
||||
|
|
@ -346,6 +371,78 @@ export async function loadLatestPortfolioMetric(pool) {
|
|||
return normalizePortfolioMetricRow(result.rows[0]);
|
||||
}
|
||||
|
||||
export async function claimNotificationDelivery(pool, {
|
||||
notificationKey,
|
||||
notificationType,
|
||||
sourceKind,
|
||||
sourceId = null,
|
||||
payload = {},
|
||||
}) {
|
||||
if (!notificationKey) throw new Error('notificationKey is required');
|
||||
if (!notificationType) throw new Error('notificationType is required');
|
||||
if (!sourceKind) throw new Error('sourceKind is required');
|
||||
|
||||
const result = await pool.query(
|
||||
`
|
||||
INSERT INTO notification_deliveries (
|
||||
notification_key,
|
||||
notification_type,
|
||||
source_kind,
|
||||
source_id,
|
||||
status,
|
||||
attempt_count,
|
||||
payload,
|
||||
response,
|
||||
error
|
||||
) VALUES ($1, $2, $3, $4, 'pending', 1, $5::jsonb, NULL, NULL)
|
||||
ON CONFLICT (notification_key) DO UPDATE SET
|
||||
status = 'pending',
|
||||
attempt_count = notification_deliveries.attempt_count + 1,
|
||||
last_attempt_at = NOW(),
|
||||
payload = EXCLUDED.payload,
|
||||
response = NULL,
|
||||
error = NULL
|
||||
WHERE notification_deliveries.status <> 'delivered'
|
||||
RETURNING notification_key
|
||||
`,
|
||||
[
|
||||
notificationKey,
|
||||
notificationType,
|
||||
sourceKind,
|
||||
sourceId,
|
||||
JSON.stringify(payload || {}),
|
||||
],
|
||||
);
|
||||
return result.rowCount > 0;
|
||||
}
|
||||
|
||||
export async function finishNotificationDelivery(pool, {
|
||||
notificationKey,
|
||||
ok,
|
||||
response = null,
|
||||
error = null,
|
||||
}) {
|
||||
if (!notificationKey) throw new Error('notificationKey is required');
|
||||
await pool.query(
|
||||
`
|
||||
UPDATE notification_deliveries
|
||||
SET
|
||||
status = $2,
|
||||
delivered_at = CASE WHEN $2 = 'delivered' THEN NOW() ELSE delivered_at END,
|
||||
last_attempt_at = NOW(),
|
||||
response = $3::jsonb,
|
||||
error = $4::jsonb
|
||||
WHERE notification_key = $1
|
||||
`,
|
||||
[
|
||||
notificationKey,
|
||||
ok ? 'delivered' : 'failed',
|
||||
response ? JSON.stringify(response) : null,
|
||||
error ? JSON.stringify(error) : null,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
export async function refreshQuoteOutcomes(pool, {
|
||||
btcAsset = null,
|
||||
eureAsset = null,
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ test('ntfy publish request uses repo configured base url and topic', () => {
|
|||
priority: 'high',
|
||||
tags: ['warning', 'unrip'],
|
||||
click: 'https://dashboard.example/strategy',
|
||||
token: 'secret-token',
|
||||
});
|
||||
|
||||
assert.equal(request.ok, true);
|
||||
|
|
@ -27,6 +28,7 @@ test('ntfy publish request uses repo configured base url and topic', () => {
|
|||
assert.equal(request.init.headers.Priority, 'high');
|
||||
assert.equal(request.init.headers.Tags, 'warning,unrip');
|
||||
assert.equal(request.init.headers.Click, 'https://dashboard.example/strategy');
|
||||
assert.equal(request.init.headers.Authorization, 'Bearer secret-token');
|
||||
});
|
||||
|
||||
test('ntfy notification client skips when endpoint or message is not configured', async () => {
|
||||
|
|
@ -76,20 +78,24 @@ test('config exposes ntfy notification defaults and environment overrides', () =
|
|||
baseUrl: process.env.NOTIFICATION_NTFY_BASE_URL,
|
||||
topic: process.env.NOTIFICATION_NTFY_TOPIC,
|
||||
timeout: process.env.NOTIFICATION_NTFY_TIMEOUT_MS,
|
||||
token: process.env.NOTIFICATION_NTFY_TOKEN,
|
||||
};
|
||||
process.env.NOTIFICATION_NTFY_BASE_URL = 'http://ntfy.utility.svc.cluster.local';
|
||||
process.env.NOTIFICATION_NTFY_TOPIC = 'unrip';
|
||||
process.env.NOTIFICATION_NTFY_TIMEOUT_MS = '4321';
|
||||
process.env.NOTIFICATION_NTFY_TOKEN = 'secret-token';
|
||||
|
||||
try {
|
||||
const config = loadConfig({ envPath: '/tmp/unrip-missing-env-for-notification-test' });
|
||||
assert.equal(config.notificationNtfyBaseUrl, 'http://ntfy.utility.svc.cluster.local');
|
||||
assert.equal(config.notificationNtfyTopic, 'unrip');
|
||||
assert.equal(config.notificationNtfyTimeoutMs, 4321);
|
||||
assert.equal(config.notificationNtfyToken, 'secret-token');
|
||||
} finally {
|
||||
restoreEnv('NOTIFICATION_NTFY_BASE_URL', previous.baseUrl);
|
||||
restoreEnv('NOTIFICATION_NTFY_TOPIC', previous.topic);
|
||||
restoreEnv('NOTIFICATION_NTFY_TIMEOUT_MS', previous.timeout);
|
||||
restoreEnv('NOTIFICATION_NTFY_TOKEN', previous.token);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
205
test/notification-layer.test.mjs
Normal file
205
test/notification-layer.test.mjs
Normal file
|
|
@ -0,0 +1,205 @@
|
|||
import test from 'node:test';
|
||||
import assert from 'node:assert/strict';
|
||||
|
||||
import {
|
||||
buildIntentRequestOutcomeNotification,
|
||||
buildLiquidityActionNotification,
|
||||
buildQuoteOutcomeNotification,
|
||||
createNotificationDispatcher,
|
||||
hasProvenSettlement,
|
||||
observedAtOrAfter,
|
||||
} from '../src/core/notification-layer.mjs';
|
||||
|
||||
const BTC = { assetId: 'nep141:btc.omft.near', symbol: 'BTC', decimals: 8 };
|
||||
const EURE = { assetId: 'nep141:eure.omft.near', symbol: 'EURe', decimals: 18 };
|
||||
const config = {
|
||||
tradingBtc: BTC,
|
||||
tradingEure: EURE,
|
||||
assetRegistry: new Map([
|
||||
[BTC.assetId, BTC],
|
||||
[EURE.assetId, EURE],
|
||||
]),
|
||||
};
|
||||
|
||||
test('liquidity notifications only emit credited deposits and completed withdrawals', () => {
|
||||
assert.equal(buildLiquidityActionNotification({
|
||||
config,
|
||||
event: {
|
||||
payload: {
|
||||
action_type: 'deposit_status_observed',
|
||||
status: 'PENDING',
|
||||
chain: 'eth:100',
|
||||
asset_id: EURE.assetId,
|
||||
details: { amount: '5000000000000000000', tx_hash: 'deposit-tx-1' },
|
||||
},
|
||||
},
|
||||
}), null);
|
||||
|
||||
const deposit = buildLiquidityActionNotification({
|
||||
config,
|
||||
event: {
|
||||
payload: {
|
||||
action_type: 'deposit_status_observed',
|
||||
status: 'CREDITED',
|
||||
chain: 'eth:100',
|
||||
asset_id: EURE.assetId,
|
||||
details: { amount: '5000000000000000000', tx_hash: 'deposit-tx-1' },
|
||||
},
|
||||
},
|
||||
});
|
||||
assert.equal(deposit.notification_type, 'deposit_credited');
|
||||
assert.match(deposit.message, /Deposit credited: 5 EURe/);
|
||||
assert.match(deposit.message, /tx=deposit-tx-1/);
|
||||
|
||||
assert.equal(buildLiquidityActionNotification({
|
||||
config,
|
||||
event: {
|
||||
payload: {
|
||||
action_type: 'withdrawal_status_changed',
|
||||
status: 'SUBMITTED',
|
||||
chain: 'btc:mainnet',
|
||||
asset_id: BTC.assetId,
|
||||
details: { amount: '100000', withdrawal_hash: 'withdrawal-1' },
|
||||
},
|
||||
},
|
||||
}), null);
|
||||
|
||||
const withdrawal = buildLiquidityActionNotification({
|
||||
config,
|
||||
event: {
|
||||
payload: {
|
||||
action_type: 'withdrawal_status_changed',
|
||||
status: 'COMPLETED',
|
||||
chain: 'btc:mainnet',
|
||||
asset_id: BTC.assetId,
|
||||
details: { amount: '100000', withdrawal_hash: 'withdrawal-1', transfer_tx_hash: 'btc-tx-1' },
|
||||
},
|
||||
},
|
||||
});
|
||||
assert.equal(withdrawal.notification_type, 'withdrawal_completed');
|
||||
assert.match(withdrawal.message, /Withdrawal completed: 0.001 BTC/);
|
||||
assert.match(withdrawal.message, /transfer=btc-tx-1/);
|
||||
});
|
||||
|
||||
test('trade notifications require completed outcome with durable settlement evidence', () => {
|
||||
const submitted = {
|
||||
quote_id: 'quote-submitted',
|
||||
outcome_status: 'submitted',
|
||||
attribution_status: 'unattributed',
|
||||
attributed_inventory_delta: null,
|
||||
};
|
||||
assert.equal(hasProvenSettlement(submitted), false);
|
||||
assert.equal(buildQuoteOutcomeNotification({ record: submitted, config }), null);
|
||||
|
||||
const completedWithoutDelta = {
|
||||
quote_id: 'quote-fake-completed',
|
||||
outcome_status: 'completed',
|
||||
attribution_status: 'unattributed',
|
||||
attributed_inventory_delta: null,
|
||||
};
|
||||
assert.equal(hasProvenSettlement(completedWithoutDelta), false);
|
||||
assert.equal(buildQuoteOutcomeNotification({ record: completedWithoutDelta, config }), null);
|
||||
|
||||
const completed = {
|
||||
quote_id: 'quote-real',
|
||||
decision_id: 'decision-real',
|
||||
command_id: 'command-real',
|
||||
gross_edge_pct: '0.49',
|
||||
outcome_status: 'completed',
|
||||
attribution_status: 'heuristic_match',
|
||||
attributed_inventory_delta: {
|
||||
inventory_id: 'inventory-after',
|
||||
delta_units: {
|
||||
[BTC.assetId]: '-12345',
|
||||
[EURE.assetId]: '76500000000000000000',
|
||||
},
|
||||
},
|
||||
};
|
||||
const notification = buildQuoteOutcomeNotification({ record: completed, config });
|
||||
assert.equal(notification.notification_type, 'trade_completed');
|
||||
assert.match(notification.message, /Quote trade completed: quote-real/);
|
||||
assert.match(notification.message, /-0.00012345 BTC/);
|
||||
assert.match(notification.message, /\+76.5 EURe/);
|
||||
});
|
||||
|
||||
test('own request notifications do not emit from relay acceptance alone', () => {
|
||||
const accepted = {
|
||||
request_id: 'request-accepted',
|
||||
outcome_status: 'awaiting_settlement',
|
||||
submission_status: 'accepted_by_relay',
|
||||
relay_status: 'SETTLED',
|
||||
attribution_status: 'unattributed',
|
||||
attributed_inventory_delta: null,
|
||||
};
|
||||
assert.equal(buildIntentRequestOutcomeNotification({ record: accepted, config }), null);
|
||||
|
||||
const completed = {
|
||||
request_id: 'request-completed',
|
||||
idempotency_key: 'idem-1',
|
||||
intent_hash: 'intent-hash-1',
|
||||
outcome_status: 'completed',
|
||||
attribution_status: 'linked_settlement',
|
||||
attributed_inventory_delta: {
|
||||
inventory_id: 'inventory-after-request',
|
||||
delta_units: {
|
||||
[EURE.assetId]: '-5000000000000000000',
|
||||
[BTC.assetId]: '8000',
|
||||
},
|
||||
},
|
||||
};
|
||||
const notification = buildIntentRequestOutcomeNotification({ record: completed, config });
|
||||
assert.equal(notification.notification_type, 'trade_completed');
|
||||
assert.match(notification.message, /Own request completed: request-completed/);
|
||||
assert.match(notification.message, /-5 EURe/);
|
||||
assert.match(notification.message, /\+0.00008 BTC/);
|
||||
});
|
||||
|
||||
test('notification dispatcher publishes each durable notification key once', async () => {
|
||||
const calls = [];
|
||||
const claimed = new Set();
|
||||
const finished = [];
|
||||
const dispatcher = createNotificationDispatcher({
|
||||
client: {
|
||||
isConfigured: () => true,
|
||||
async publish(notification) {
|
||||
calls.push(notification);
|
||||
return { ok: true, status: 200, response: '{"id":"msg-1"}' };
|
||||
},
|
||||
},
|
||||
store: {
|
||||
async claim(notification) {
|
||||
if (claimed.has(notification.notification_key)) return false;
|
||||
claimed.add(notification.notification_key);
|
||||
return true;
|
||||
},
|
||||
async finish(notification, result) {
|
||||
finished.push({ notification, result });
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const notification = {
|
||||
notification_key: 'deposit_credited:eth:100:eure:tx-1:5000',
|
||||
notification_type: 'deposit_credited',
|
||||
source_kind: 'liquidity_action',
|
||||
source_id: 'tx-1',
|
||||
title: 'Deposit credited',
|
||||
message: 'Deposit credited: 5 EURe',
|
||||
priority: 'default',
|
||||
tags: ['unrip', 'deposit'],
|
||||
};
|
||||
|
||||
assert.equal((await dispatcher.publishOnce(notification)).delivered, true);
|
||||
assert.deepEqual(await dispatcher.publishOnce(notification), {
|
||||
ok: true,
|
||||
skipped: true,
|
||||
reason: 'notification_already_delivered',
|
||||
});
|
||||
assert.equal(calls.length, 1);
|
||||
assert.equal(finished.length, 1);
|
||||
});
|
||||
|
||||
test('outcome recency filter keeps startup refresh from sending historical notifications', () => {
|
||||
assert.equal(observedAtOrAfter('2026-04-16T10:00:00.000Z', '2026-04-16T09:59:59.000Z'), true);
|
||||
assert.equal(observedAtOrAfter('2026-04-16T09:00:00.000Z', '2026-04-16T09:59:59.000Z'), false);
|
||||
});
|
||||
|
|
@ -21,6 +21,13 @@ class NtfyManifestTest(unittest.TestCase):
|
|||
source = (ROOT / '.forgejo/workflows/deploy.yml').read_text()
|
||||
self.assertNotIn('deployment/ntfy', source)
|
||||
|
||||
def test_notification_token_is_secret_bootstrap_only(self):
|
||||
manifest = (ROOT / 'deploy/k8s/base/unrip.yaml').read_text()
|
||||
self.assertNotIn('NOTIFICATION_NTFY_TOKEN:', manifest)
|
||||
bootstrap = (ROOT / 'scripts/deploy/bootstrap.sh').read_text()
|
||||
self.assertIn('NOTIFICATION_NTFY_TOKEN', bootstrap)
|
||||
self.assertIn('--from-literal=NOTIFICATION_NTFY_TOKEN=', bootstrap)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue