From 4adc705a2b1e7d14eda4a884b1964594ddfae32b Mon Sep 17 00:00:00 2001 From: philipp Date: Thu, 7 May 2026 16:47:52 +0200 Subject: [PATCH] Use bridge deposit time for funding activity Proof: npm test (147 passing); npm run operator-dashboard:build; git diff --cached --check. Assumptions: Bridge recent_deposits created_at is the authoritative source time for deposit activity; rows without created_at must be deduped to their earliest observed status instead of the latest replay ingestion. Still fake: No fund movement or bridge migration was performed; ntfy messages already sent before this fix cannot be unsent. --- src/apps/history-writer.mjs | 4 +- src/apps/inventory-sync.mjs | 3 + src/apps/liquidity-manager.mjs | 14 ++- src/core/bridge-assets.mjs | 22 ++++ src/core/history-writer-refresh-policy.mjs | 2 +- src/core/operator-dashboard.mjs | 22 +++- src/lib/postgres.mjs | 114 +++++++++++++------- test/bridge-assets.test.mjs | 14 +++ test/history-writer-refresh-policy.test.mjs | 11 ++ test/operator-dashboard.test.mjs | 10 +- test/postgres-funding.test.mjs | 84 +++++++++++++++ 11 files changed, 252 insertions(+), 48 deletions(-) create mode 100644 test/postgres-funding.test.mjs diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index a760f5e..442c47e 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -205,7 +205,9 @@ await consumer.run({ state.last_environment_status_duplicate_at = handledAt; } } - await publishLiquidityNotification({ topic, event }); + if (shouldRunDerivedRefresh) { + await publishLiquidityNotification({ topic, event }); + } if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) { try { await refreshPortfolioMetrics(); diff --git a/src/apps/inventory-sync.mjs b/src/apps/inventory-sync.mjs index 58effd0..ac7212e 100644 --- a/src/apps/inventory-sync.mjs +++ b/src/apps/inventory-sync.mjs @@ -4,6 +4,7 @@ import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { bridgeDepositAssetId, + bridgeDepositObservedAt, uniqueChainsForAssets, } from '../core/bridge-assets.mjs'; import { startControlApi } from '../core/control-api.mjs'; @@ -148,9 +149,11 @@ async function refresh() { address: deposit.address, status: deposit.status, decimals: deposit.decimals, + created_at: bridgeDepositObservedAt(deposit), near_token_id: deposit.near_token_id || null, intents_token_id: deposit.intents_token_id || null, defuse_asset_identifier: deposit.defuse_asset_identifier || null, + mint_tx_hash: deposit.mint_tx_hash || null, }); } } diff --git a/src/apps/liquidity-manager.mjs b/src/apps/liquidity-manager.mjs index f49c499..7ebb55b 100644 --- a/src/apps/liquidity-manager.mjs +++ b/src/apps/liquidity-manager.mjs @@ -4,6 +4,7 @@ import { createProducer } from '../bus/kafka/producer.mjs'; import { assetsByChain as groupAssetsByChain, bridgeDepositAssetId, + bridgeDepositObservedAt, uniqueChainsForAssets, } from '../core/bridge-assets.mjs'; import { startControlApi } from '../core/control-api.mjs'; @@ -180,9 +181,11 @@ async function refreshChain(chain, state) { address: deposit.address, status: deposit.status, decimals: deposit.decimals, + created_at: bridgeDepositObservedAt(deposit), near_token_id: deposit.near_token_id || null, intents_token_id: deposit.intents_token_id || null, defuse_asset_identifier: deposit.defuse_asset_identifier || null, + mint_tx_hash: deposit.mint_tx_hash || null, }; const previous = state.deposits[key]; state.deposits[key] = normalized; @@ -193,7 +196,7 @@ async function refreshChain(chain, state) { chain, asset_id: assetId, details: normalized, - }, state); + }, state, { observedAt: normalized.created_at }); } } @@ -491,11 +494,18 @@ async function submitWithdrawal({ assetId, amount, destinationAddress, chain = n }; } -async function publishAction(payload, state) { +async function publishAction(payload, state, { observedAt = null } = {}) { const event = buildEventEnvelope({ source: 'liquidity-manager', venue: 'near-intents', eventType: 'liquidity_action', + observedAt: + observedAt + || payload.observed_at + || payload.details?.created_at + || payload.details?.last_checked_at + || payload.details?.submitted_at + || null, payload: { liquidity_action_id: `${payload.action_type}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`, account_id: config.nearIntentsAccountId, diff --git a/src/core/bridge-assets.mjs b/src/core/bridge-assets.mjs index c584502..8518991 100644 --- a/src/core/bridge-assets.mjs +++ b/src/core/bridge-assets.mjs @@ -23,6 +23,22 @@ export function bridgeDepositAssetId(deposit, { return fallbackAssetId || candidates[0] || null; } +export function bridgeDepositObservedAt(deposit) { + const candidates = [ + deposit?.created_at, + deposit?.updated_at, + deposit?.observed_at, + deposit?.timestamp, + ]; + + for (const candidate of candidates) { + const timestamp = toIsoTimestamp(candidate); + if (timestamp) return timestamp; + } + + return null; +} + export function uniqueChainsForAssets(assets = []) { return [...new Set( assets @@ -41,3 +57,9 @@ export function assetsByChain(assets = []) { } return byChain; } + +function toIsoTimestamp(value) { + if (!value) return null; + const date = new Date(value); + return Number.isNaN(date.getTime()) ? null : date.toISOString(); +} diff --git a/src/core/history-writer-refresh-policy.mjs b/src/core/history-writer-refresh-policy.mjs index 700707c..8f8abe2 100644 --- a/src/core/history-writer-refresh-policy.mjs +++ b/src/core/history-writer-refresh-policy.mjs @@ -6,7 +6,7 @@ export function shouldRunDerivedRefreshForEvent({ const maxAge = Number(maxEventAgeMs); if (!Number.isFinite(maxAge) || maxAge < 0) return true; - const eventAt = event?.ingested_at || event?.observed_at || null; + const eventAt = event?.observed_at || event?.ingested_at || null; if (!eventAt) return true; const age = Date.parse(now) - Date.parse(eventAt); diff --git a/src/core/operator-dashboard.mjs b/src/core/operator-dashboard.mjs index fdc54b0..6b7517b 100644 --- a/src/core/operator-dashboard.mjs +++ b/src/core/operator-dashboard.mjs @@ -1,4 +1,5 @@ import { unitsToNumber } from './assets.mjs'; +import { bridgeDepositObservedAt } from './bridge-assets.mjs'; import { summarizeFundingObservations } from './funding-observations.mjs'; import { resolveDashboardRequestAuth } from './operator-dashboard-auth.mjs'; import { TERMINAL_SETTLEMENT_ATTRIBUTION_STATUSES } from './quote-outcomes.mjs'; @@ -1816,10 +1817,11 @@ function buildRecentDepositItems({ config, recentDepositStatuses, liquidityState const fallbackItem = normalizeLiquidityDepositForUi({ config, deposit, - observedAt: liquidityState?.last_refresh_at || null, + observedAt: null, }); const key = buildFundingActivityKey(fallbackItem); - if (!itemsByKey.has(key)) { + const existing = itemsByKey.get(key); + if (!existing || shouldReplaceFundingActivityItem(existing, fallbackItem)) { itemsByKey.set(key, fallbackItem); } } @@ -1854,6 +1856,17 @@ function fundingActivityTimestamp(item) { return item?.last_seen_at || item?.credited_at || item?.first_seen_at || null; } +function shouldReplaceFundingActivityItem(existing, candidate) { + if (!candidate) return false; + if (!existing) return true; + if (candidate.bridge_created_at && !existing.bridge_created_at) return true; + + const existingTs = timestampValue(fundingActivityTimestamp(existing)); + const candidateTs = timestampValue(fundingActivityTimestamp(candidate)); + return Number.isFinite(candidateTs) + && (!Number.isFinite(existingTs) || candidateTs < existingTs); +} + function buildFundingActivityKey(item) { return [ item?.tx_hash || 'no-tx', @@ -1897,6 +1910,7 @@ function normalizeDepositStatusForUi({ config, depositStatus }) { amount: details.amount || '0', address: details.address || details.deposit_address || null, status: payload.status || details.status || null, + created_at: details.created_at || payload.created_at || null, }, observedAt: depositStatus?.observed_at || depositStatus?.ingested_at || null, }); @@ -1905,7 +1919,8 @@ function normalizeDepositStatusForUi({ config, depositStatus }) { function normalizeLiquidityDepositForUi({ config, deposit, observedAt }) { const asset = config.assetRegistry.get(deposit?.asset_id); const status = deposit?.status || null; - const timestamp = observedAt || null; + const bridgeCreatedAt = bridgeDepositObservedAt(deposit); + const timestamp = bridgeCreatedAt || observedAt || null; return { funding_observation_id: null, @@ -1923,6 +1938,7 @@ function normalizeLiquidityDepositForUi({ config, deposit, observedAt }) { first_seen_at: timestamp, last_seen_at: timestamp, credited_at: CREDITED_FUNDING_STATUSES.has(String(status || '').toUpperCase()) ? timestamp : null, + bridge_created_at: bridgeCreatedAt, }; } diff --git a/src/lib/postgres.mjs b/src/lib/postgres.mjs index 386cd1e..8efa33f 100644 --- a/src/lib/postgres.mjs +++ b/src/lib/postgres.mjs @@ -1143,7 +1143,7 @@ export async function loadCurrentFundingObservations(pool) { export async function loadRecentDepositStatuses(pool, { limit = 20 } = {}) { const normalizedLimit = Math.max(1, Number(limit) || 20); - const fetchLimit = Math.max(normalizedLimit * 4, normalizedLimit); + const fetchLimit = Math.max(normalizedLimit * 50, 500); const result = await pool.query( ` SELECT observed_at, ingested_at, payload @@ -1155,21 +1155,12 @@ export async function loadRecentDepositStatuses(pool, { limit = 20 } = {}) { [fetchLimit], ); - const seen = new Set(); - const uniqueRows = []; - for (const row of result.rows) { - const key = buildDepositStatusRowKey(row.payload); - if (seen.has(key)) continue; - seen.add(key); - uniqueRows.push(row); - if (uniqueRows.length >= normalizedLimit) break; - } - - return uniqueRows.map((row) => ({ - observed_at: toIsoTimestamp(row.observed_at), - ingested_at: toIsoTimestamp(row.ingested_at), - payload: row.payload, - })); + return normalizeDepositStatusRows(result.rows) + .sort((left, right) => ( + timestampValue(right.observed_at || right.ingested_at) + - timestampValue(left.observed_at || left.ingested_at) + )) + .slice(0, normalizedLimit); } export async function loadRecentAlertTransitions(pool, { limit = 20 } = {}) { @@ -1302,30 +1293,17 @@ async function loadExternalAssetFlowsSince(pool, { async function loadCreditedDepositRowsSince(pool, since) { const result = await pool.query( ` - SELECT DISTINCT ON ( - payload->'details'->>'tx_hash', - payload->>'chain', - payload->>'asset_id', - payload->'details'->>'amount' - ) - observed_at, - ingested_at, - payload + SELECT observed_at, ingested_at, payload FROM liquidity_actions - WHERE ingested_at > $1 - AND payload->>'action_type' = 'deposit_status_observed' - AND UPPER(payload->>'status') = ANY($2) + WHERE payload->>'action_type' = 'deposit_status_observed' + AND UPPER(payload->>'status') = ANY($1) AND COALESCE(payload->'details'->>'tx_hash', '') <> '' - ORDER BY - payload->'details'->>'tx_hash', - payload->>'chain', - payload->>'asset_id', - payload->'details'->>'amount', - ingested_at DESC + ORDER BY COALESCE(observed_at, ingested_at) DESC `, - [since, CREDITED_LIQUIDITY_STATUSES], + [CREDITED_LIQUIDITY_STATUSES], ); - return result.rows; + return normalizeDepositStatusRows(result.rows) + .filter((row) => timestampValue(row.observed_at || row.ingested_at) > timestampValue(since)); } async function loadCompletedWithdrawalRowsSince(pool, since) { @@ -1369,7 +1347,7 @@ async function normalizeExternalFlowRow(pool, { if (!assetId) return null; const amount = String(details.amount || '0'); - const effectiveAt = toIsoTimestamp(row.observed_at || row.ingested_at); + const effectiveAt = toIsoTimestamp(details.created_at || row.observed_at || row.ingested_at); const signedUnits = (sign * BigInt(amount)).toString(); let referencePriceAtFlowTime = null; const effectiveBtcAssets = normalizeBtcAssets({ btcAsset, btcAssets }); @@ -1751,6 +1729,68 @@ function buildDepositStatusRowKey(payload) { ].join('|'); } +function normalizeDepositStatusRows(rows = []) { + const byKey = new Map(); + + for (const row of rows) { + const key = buildDepositStatusRowKey(row.payload); + const effectiveAt = depositStatusEffectiveAt(row); + const latestSortAt = timestampValue(row.observed_at || row.ingested_at); + const existing = byKey.get(key); + + if (!existing) { + byKey.set(key, { + latest: row, + latestSortAt, + firstEffectiveAt: effectiveAt, + }); + continue; + } + + if (latestSortAt > existing.latestSortAt) { + existing.latest = row; + existing.latestSortAt = latestSortAt; + } + const candidateEffectiveTs = timestampValue(effectiveAt); + const existingEffectiveTs = timestampValue(existing.firstEffectiveAt); + if ( + Number.isFinite(candidateEffectiveTs) + && (!Number.isFinite(existingEffectiveTs) || candidateEffectiveTs < existingEffectiveTs) + ) { + existing.firstEffectiveAt = effectiveAt; + } + } + + return [...byKey.values()].map(({ latest, firstEffectiveAt }) => { + const effectiveAt = firstEffectiveAt || toIsoTimestamp(latest.observed_at || latest.ingested_at); + return { + observed_at: effectiveAt, + ingested_at: toIsoTimestamp(latest.ingested_at), + payload: withDepositCreatedAt(latest.payload, effectiveAt), + }; + }); +} + +function depositStatusEffectiveAt(row) { + return toIsoTimestamp( + row?.payload?.details?.created_at + || row?.observed_at + || row?.ingested_at, + ); +} + +function withDepositCreatedAt(payload, effectiveAt) { + if (!effectiveAt) return payload; + const details = payload?.details || {}; + return { + ...payload, + details: { + ...details, + created_at: details.created_at || effectiveAt, + }, + }; +} + async function ensureExpressionIndex(pool, { name, table, expression }) { await pool.query(` CREATE INDEX IF NOT EXISTS ${name} diff --git a/test/bridge-assets.test.mjs b/test/bridge-assets.test.mjs index a40a132..0a4d1f0 100644 --- a/test/bridge-assets.test.mjs +++ b/test/bridge-assets.test.mjs @@ -3,6 +3,7 @@ import assert from 'node:assert/strict'; import { bridgeDepositAssetId, + bridgeDepositObservedAt, intentsAssetIdFromNearTokenId, } from '../src/core/bridge-assets.mjs'; @@ -37,3 +38,16 @@ test('bridgeDepositAssetId uses credited bridge near_token_id instead of chain-o fallbackAssetId: LEGACY_BTC, }), NBTC); }); + +test('bridgeDepositObservedAt preserves bridge deposit creation time', () => { + assert.equal(bridgeDepositObservedAt({ + created_at: '2026-04-07T15:20:19.909Z', + }), '2026-04-07T15:20:19.909Z'); + + assert.equal(bridgeDepositObservedAt({ + created_at: 'not-a-date', + observed_at: '2026-04-07T15:20:24.814Z', + }), '2026-04-07T15:20:24.814Z'); + + assert.equal(bridgeDepositObservedAt({}), null); +}); diff --git a/test/history-writer-refresh-policy.test.mjs b/test/history-writer-refresh-policy.test.mjs index 0b0b529..6d95b57 100644 --- a/test/history-writer-refresh-policy.test.mjs +++ b/test/history-writer-refresh-policy.test.mjs @@ -45,3 +45,14 @@ test('history writer derived refresh policy skips events outside request freshne maxEventAgeMs: 30000, }), true); }); + +test('history writer derived refresh policy prefers source observed time over replay ingestion time', () => { + assert.equal(shouldRunDerivedRefreshForEvent({ + event: { + observed_at: '2026-04-07T15:20:19.909Z', + ingested_at: '2026-05-07T14:07:23.119Z', + }, + now: '2026-05-07T14:07:24.000Z', + maxEventAgeMs: 30000, + }), false); +}); diff --git a/test/operator-dashboard.test.mjs b/test/operator-dashboard.test.mjs index 1cd17d7..4411f79 100644 --- a/test/operator-dashboard.test.mjs +++ b/test/operator-dashboard.test.mjs @@ -1399,8 +1399,8 @@ test('funding summary includes credited bridge deposits without observer-backed fundingObservations: [], recentDepositStatuses: [ { - observed_at: '2026-04-07T15:20:00.000Z', - ingested_at: '2026-04-07T15:20:01.000Z', + observed_at: null, + ingested_at: '2026-05-07T14:07:23.107Z', payload: { action_type: 'deposit_status_observed', chain: config.tradingEure.chain, @@ -1442,10 +1442,11 @@ test('funding summary includes credited bridge deposits without observer-backed amount: '24999999800000000000', address: '0xdeposit', status: 'COMPLETED', + created_at: '2026-04-07T15:20:19.909Z', }, }, tracked_withdrawals: {}, - last_refresh_at: '2026-04-07T15:20:10.000Z', + last_refresh_at: '2026-05-07T14:07:23.107Z', }, }, { @@ -1500,10 +1501,11 @@ test('funding summary includes credited bridge deposits without observer-backed ], }); - assert.equal(bootstrap.funds.funding.latest_observed_at, '2026-04-07T15:20:00.000Z'); + assert.equal(bootstrap.funds.funding.latest_observed_at, '2026-04-07T15:20:19.909Z'); assert.equal(bootstrap.funds.funding.credited_deposits[0].asset_id, config.tradingEure.assetId); assert.equal(bootstrap.funds.funding.credited_deposits[0].amount, '24.9999998'); assert.equal(bootstrap.funds.funding.recent_observations[0].tx_hash, 'eth-tx-1'); + assert.equal(bootstrap.funds.funding.recent_observations[0].last_seen_at, '2026-04-07T15:20:19.909Z'); assert.equal(bootstrap.funds.recent_deposits[0].tx_hash, 'eth-tx-1'); }); diff --git a/test/postgres-funding.test.mjs b/test/postgres-funding.test.mjs new file mode 100644 index 0000000..45100b6 --- /dev/null +++ b/test/postgres-funding.test.mjs @@ -0,0 +1,84 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { loadRecentDepositStatuses } from '../src/lib/postgres.mjs'; + +test('recent deposit status loader keeps old bridge deposits at original effective time', async () => { + const txHash = '0xe8755a069e10a5963953c847f48f2db857c7a2dfdf8d21579c4057366126dd3f'; + const rows = [ + depositRow({ + txHash, + status: 'COMPLETED', + ingestedAt: '2026-05-07T14:07:23.107Z', + }), + depositRow({ + txHash, + status: 'COMPLETED', + ingestedAt: '2026-04-07T15:20:54.757Z', + }), + depositRow({ + txHash, + status: 'PENDING', + ingestedAt: '2026-04-07T15:20:24.814Z', + }), + ]; + const pool = { + async query(sql, params) { + assert.match(sql, /deposit_status_observed/); + assert.equal(params[0], 500); + return { rows }; + }, + }; + + const [deposit] = await loadRecentDepositStatuses(pool, { limit: 10 }); + + assert.equal(deposit.payload.status, 'COMPLETED'); + assert.equal(deposit.ingested_at, '2026-05-07T14:07:23.107Z'); + assert.equal(deposit.observed_at, '2026-04-07T15:20:24.814Z'); + assert.equal(deposit.payload.details.created_at, '2026-04-07T15:20:24.814Z'); +}); + +test('recent deposit status loader uses bridge created_at when present', async () => { + const rows = [ + depositRow({ + txHash: 'eth-tx-new', + status: 'COMPLETED', + ingestedAt: '2026-05-07T14:07:23.107Z', + createdAt: '2026-04-08T15:57:14.877Z', + }), + ]; + const pool = { + async query() { + return { rows }; + }, + }; + + const [deposit] = await loadRecentDepositStatuses(pool, { limit: 10 }); + + assert.equal(deposit.observed_at, '2026-04-08T15:57:14.877Z'); + assert.equal(deposit.payload.details.created_at, '2026-04-08T15:57:14.877Z'); +}); + +function depositRow({ + txHash, + status, + ingestedAt, + createdAt = null, +}) { + return { + observed_at: null, + ingested_at: ingestedAt, + payload: { + action_type: 'deposit_status_observed', + chain: 'eth:100', + asset_id: 'nep141:eure.omft.near', + status, + details: { + tx_hash: txHash, + address: '0xdeposit', + amount: '24999999800000000000', + ...(createdAt ? { created_at: createdAt } : {}), + }, + }, + }; +}