Use bridge deposit time for funding activity
Some checks failed
deploy / deploy (push) Failing after 34s

Proof: npm test (147 passing); npm run operator-dashboard:build; git diff --cached --check.

Assumptions: Bridge recent_deposits created_at is the authoritative source time for deposit activity; rows without created_at must be deduped to their earliest observed status instead of the latest replay ingestion.

Still fake: No fund movement or bridge migration was performed; ntfy messages already sent before this fix cannot be unsent.
This commit is contained in:
philipp 2026-05-07 16:47:52 +02:00
parent d24ac8ee59
commit 4adc705a2b
11 changed files with 252 additions and 48 deletions

View file

@ -205,7 +205,9 @@ await consumer.run({
state.last_environment_status_duplicate_at = handledAt; state.last_environment_status_duplicate_at = handledAt;
} }
} }
await publishLiquidityNotification({ topic, event }); if (shouldRunDerivedRefresh) {
await publishLiquidityNotification({ topic, event });
}
if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) { if (shouldRunDerivedRefresh && portfolioMetricTopics.has(topic)) {
try { try {
await refreshPortfolioMetrics(); await refreshPortfolioMetrics();

View file

@ -4,6 +4,7 @@ import { createConsumer } from '../bus/kafka/consumer.mjs';
import { createProducer } from '../bus/kafka/producer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs';
import { import {
bridgeDepositAssetId, bridgeDepositAssetId,
bridgeDepositObservedAt,
uniqueChainsForAssets, uniqueChainsForAssets,
} from '../core/bridge-assets.mjs'; } from '../core/bridge-assets.mjs';
import { startControlApi } from '../core/control-api.mjs'; import { startControlApi } from '../core/control-api.mjs';
@ -148,9 +149,11 @@ async function refresh() {
address: deposit.address, address: deposit.address,
status: deposit.status, status: deposit.status,
decimals: deposit.decimals, decimals: deposit.decimals,
created_at: bridgeDepositObservedAt(deposit),
near_token_id: deposit.near_token_id || null, near_token_id: deposit.near_token_id || null,
intents_token_id: deposit.intents_token_id || null, intents_token_id: deposit.intents_token_id || null,
defuse_asset_identifier: deposit.defuse_asset_identifier || null, defuse_asset_identifier: deposit.defuse_asset_identifier || null,
mint_tx_hash: deposit.mint_tx_hash || null,
}); });
} }
} }

View file

@ -4,6 +4,7 @@ import { createProducer } from '../bus/kafka/producer.mjs';
import { import {
assetsByChain as groupAssetsByChain, assetsByChain as groupAssetsByChain,
bridgeDepositAssetId, bridgeDepositAssetId,
bridgeDepositObservedAt,
uniqueChainsForAssets, uniqueChainsForAssets,
} from '../core/bridge-assets.mjs'; } from '../core/bridge-assets.mjs';
import { startControlApi } from '../core/control-api.mjs'; import { startControlApi } from '../core/control-api.mjs';
@ -180,9 +181,11 @@ async function refreshChain(chain, state) {
address: deposit.address, address: deposit.address,
status: deposit.status, status: deposit.status,
decimals: deposit.decimals, decimals: deposit.decimals,
created_at: bridgeDepositObservedAt(deposit),
near_token_id: deposit.near_token_id || null, near_token_id: deposit.near_token_id || null,
intents_token_id: deposit.intents_token_id || null, intents_token_id: deposit.intents_token_id || null,
defuse_asset_identifier: deposit.defuse_asset_identifier || null, defuse_asset_identifier: deposit.defuse_asset_identifier || null,
mint_tx_hash: deposit.mint_tx_hash || null,
}; };
const previous = state.deposits[key]; const previous = state.deposits[key];
state.deposits[key] = normalized; state.deposits[key] = normalized;
@ -193,7 +196,7 @@ async function refreshChain(chain, state) {
chain, chain,
asset_id: assetId, asset_id: assetId,
details: normalized, details: normalized,
}, state); }, state, { observedAt: normalized.created_at });
} }
} }
@ -491,11 +494,18 @@ async function submitWithdrawal({ assetId, amount, destinationAddress, chain = n
}; };
} }
async function publishAction(payload, state) { async function publishAction(payload, state, { observedAt = null } = {}) {
const event = buildEventEnvelope({ const event = buildEventEnvelope({
source: 'liquidity-manager', source: 'liquidity-manager',
venue: 'near-intents', venue: 'near-intents',
eventType: 'liquidity_action', eventType: 'liquidity_action',
observedAt:
observedAt
|| payload.observed_at
|| payload.details?.created_at
|| payload.details?.last_checked_at
|| payload.details?.submitted_at
|| null,
payload: { payload: {
liquidity_action_id: `${payload.action_type}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`, liquidity_action_id: `${payload.action_type}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`,
account_id: config.nearIntentsAccountId, account_id: config.nearIntentsAccountId,

View file

@ -23,6 +23,22 @@ export function bridgeDepositAssetId(deposit, {
return fallbackAssetId || candidates[0] || null; return fallbackAssetId || candidates[0] || null;
} }
export function bridgeDepositObservedAt(deposit) {
const candidates = [
deposit?.created_at,
deposit?.updated_at,
deposit?.observed_at,
deposit?.timestamp,
];
for (const candidate of candidates) {
const timestamp = toIsoTimestamp(candidate);
if (timestamp) return timestamp;
}
return null;
}
export function uniqueChainsForAssets(assets = []) { export function uniqueChainsForAssets(assets = []) {
return [...new Set( return [...new Set(
assets assets
@ -41,3 +57,9 @@ export function assetsByChain(assets = []) {
} }
return byChain; return byChain;
} }
function toIsoTimestamp(value) {
if (!value) return null;
const date = new Date(value);
return Number.isNaN(date.getTime()) ? null : date.toISOString();
}

View file

@ -6,7 +6,7 @@ export function shouldRunDerivedRefreshForEvent({
const maxAge = Number(maxEventAgeMs); const maxAge = Number(maxEventAgeMs);
if (!Number.isFinite(maxAge) || maxAge < 0) return true; if (!Number.isFinite(maxAge) || maxAge < 0) return true;
const eventAt = event?.ingested_at || event?.observed_at || null; const eventAt = event?.observed_at || event?.ingested_at || null;
if (!eventAt) return true; if (!eventAt) return true;
const age = Date.parse(now) - Date.parse(eventAt); const age = Date.parse(now) - Date.parse(eventAt);

View file

@ -1,4 +1,5 @@
import { unitsToNumber } from './assets.mjs'; import { unitsToNumber } from './assets.mjs';
import { bridgeDepositObservedAt } from './bridge-assets.mjs';
import { summarizeFundingObservations } from './funding-observations.mjs'; import { summarizeFundingObservations } from './funding-observations.mjs';
import { resolveDashboardRequestAuth } from './operator-dashboard-auth.mjs'; import { resolveDashboardRequestAuth } from './operator-dashboard-auth.mjs';
import { TERMINAL_SETTLEMENT_ATTRIBUTION_STATUSES } from './quote-outcomes.mjs'; import { TERMINAL_SETTLEMENT_ATTRIBUTION_STATUSES } from './quote-outcomes.mjs';
@ -1816,10 +1817,11 @@ function buildRecentDepositItems({ config, recentDepositStatuses, liquidityState
const fallbackItem = normalizeLiquidityDepositForUi({ const fallbackItem = normalizeLiquidityDepositForUi({
config, config,
deposit, deposit,
observedAt: liquidityState?.last_refresh_at || null, observedAt: null,
}); });
const key = buildFundingActivityKey(fallbackItem); const key = buildFundingActivityKey(fallbackItem);
if (!itemsByKey.has(key)) { const existing = itemsByKey.get(key);
if (!existing || shouldReplaceFundingActivityItem(existing, fallbackItem)) {
itemsByKey.set(key, fallbackItem); itemsByKey.set(key, fallbackItem);
} }
} }
@ -1854,6 +1856,17 @@ function fundingActivityTimestamp(item) {
return item?.last_seen_at || item?.credited_at || item?.first_seen_at || null; return item?.last_seen_at || item?.credited_at || item?.first_seen_at || null;
} }
function shouldReplaceFundingActivityItem(existing, candidate) {
if (!candidate) return false;
if (!existing) return true;
if (candidate.bridge_created_at && !existing.bridge_created_at) return true;
const existingTs = timestampValue(fundingActivityTimestamp(existing));
const candidateTs = timestampValue(fundingActivityTimestamp(candidate));
return Number.isFinite(candidateTs)
&& (!Number.isFinite(existingTs) || candidateTs < existingTs);
}
function buildFundingActivityKey(item) { function buildFundingActivityKey(item) {
return [ return [
item?.tx_hash || 'no-tx', item?.tx_hash || 'no-tx',
@ -1897,6 +1910,7 @@ function normalizeDepositStatusForUi({ config, depositStatus }) {
amount: details.amount || '0', amount: details.amount || '0',
address: details.address || details.deposit_address || null, address: details.address || details.deposit_address || null,
status: payload.status || details.status || null, status: payload.status || details.status || null,
created_at: details.created_at || payload.created_at || null,
}, },
observedAt: depositStatus?.observed_at || depositStatus?.ingested_at || null, observedAt: depositStatus?.observed_at || depositStatus?.ingested_at || null,
}); });
@ -1905,7 +1919,8 @@ function normalizeDepositStatusForUi({ config, depositStatus }) {
function normalizeLiquidityDepositForUi({ config, deposit, observedAt }) { function normalizeLiquidityDepositForUi({ config, deposit, observedAt }) {
const asset = config.assetRegistry.get(deposit?.asset_id); const asset = config.assetRegistry.get(deposit?.asset_id);
const status = deposit?.status || null; const status = deposit?.status || null;
const timestamp = observedAt || null; const bridgeCreatedAt = bridgeDepositObservedAt(deposit);
const timestamp = bridgeCreatedAt || observedAt || null;
return { return {
funding_observation_id: null, funding_observation_id: null,
@ -1923,6 +1938,7 @@ function normalizeLiquidityDepositForUi({ config, deposit, observedAt }) {
first_seen_at: timestamp, first_seen_at: timestamp,
last_seen_at: timestamp, last_seen_at: timestamp,
credited_at: CREDITED_FUNDING_STATUSES.has(String(status || '').toUpperCase()) ? timestamp : null, credited_at: CREDITED_FUNDING_STATUSES.has(String(status || '').toUpperCase()) ? timestamp : null,
bridge_created_at: bridgeCreatedAt,
}; };
} }

View file

@ -1143,7 +1143,7 @@ export async function loadCurrentFundingObservations(pool) {
export async function loadRecentDepositStatuses(pool, { limit = 20 } = {}) { export async function loadRecentDepositStatuses(pool, { limit = 20 } = {}) {
const normalizedLimit = Math.max(1, Number(limit) || 20); const normalizedLimit = Math.max(1, Number(limit) || 20);
const fetchLimit = Math.max(normalizedLimit * 4, normalizedLimit); const fetchLimit = Math.max(normalizedLimit * 50, 500);
const result = await pool.query( const result = await pool.query(
` `
SELECT observed_at, ingested_at, payload SELECT observed_at, ingested_at, payload
@ -1155,21 +1155,12 @@ export async function loadRecentDepositStatuses(pool, { limit = 20 } = {}) {
[fetchLimit], [fetchLimit],
); );
const seen = new Set(); return normalizeDepositStatusRows(result.rows)
const uniqueRows = []; .sort((left, right) => (
for (const row of result.rows) { timestampValue(right.observed_at || right.ingested_at)
const key = buildDepositStatusRowKey(row.payload); - timestampValue(left.observed_at || left.ingested_at)
if (seen.has(key)) continue; ))
seen.add(key); .slice(0, normalizedLimit);
uniqueRows.push(row);
if (uniqueRows.length >= normalizedLimit) break;
}
return uniqueRows.map((row) => ({
observed_at: toIsoTimestamp(row.observed_at),
ingested_at: toIsoTimestamp(row.ingested_at),
payload: row.payload,
}));
} }
export async function loadRecentAlertTransitions(pool, { limit = 20 } = {}) { export async function loadRecentAlertTransitions(pool, { limit = 20 } = {}) {
@ -1302,30 +1293,17 @@ async function loadExternalAssetFlowsSince(pool, {
async function loadCreditedDepositRowsSince(pool, since) { async function loadCreditedDepositRowsSince(pool, since) {
const result = await pool.query( const result = await pool.query(
` `
SELECT DISTINCT ON ( SELECT observed_at, ingested_at, payload
payload->'details'->>'tx_hash',
payload->>'chain',
payload->>'asset_id',
payload->'details'->>'amount'
)
observed_at,
ingested_at,
payload
FROM liquidity_actions FROM liquidity_actions
WHERE ingested_at > $1 WHERE payload->>'action_type' = 'deposit_status_observed'
AND payload->>'action_type' = 'deposit_status_observed' AND UPPER(payload->>'status') = ANY($1)
AND UPPER(payload->>'status') = ANY($2)
AND COALESCE(payload->'details'->>'tx_hash', '') <> '' AND COALESCE(payload->'details'->>'tx_hash', '') <> ''
ORDER BY ORDER BY COALESCE(observed_at, ingested_at) DESC
payload->'details'->>'tx_hash',
payload->>'chain',
payload->>'asset_id',
payload->'details'->>'amount',
ingested_at DESC
`, `,
[since, CREDITED_LIQUIDITY_STATUSES], [CREDITED_LIQUIDITY_STATUSES],
); );
return result.rows; return normalizeDepositStatusRows(result.rows)
.filter((row) => timestampValue(row.observed_at || row.ingested_at) > timestampValue(since));
} }
async function loadCompletedWithdrawalRowsSince(pool, since) { async function loadCompletedWithdrawalRowsSince(pool, since) {
@ -1369,7 +1347,7 @@ async function normalizeExternalFlowRow(pool, {
if (!assetId) return null; if (!assetId) return null;
const amount = String(details.amount || '0'); const amount = String(details.amount || '0');
const effectiveAt = toIsoTimestamp(row.observed_at || row.ingested_at); const effectiveAt = toIsoTimestamp(details.created_at || row.observed_at || row.ingested_at);
const signedUnits = (sign * BigInt(amount)).toString(); const signedUnits = (sign * BigInt(amount)).toString();
let referencePriceAtFlowTime = null; let referencePriceAtFlowTime = null;
const effectiveBtcAssets = normalizeBtcAssets({ btcAsset, btcAssets }); const effectiveBtcAssets = normalizeBtcAssets({ btcAsset, btcAssets });
@ -1751,6 +1729,68 @@ function buildDepositStatusRowKey(payload) {
].join('|'); ].join('|');
} }
function normalizeDepositStatusRows(rows = []) {
const byKey = new Map();
for (const row of rows) {
const key = buildDepositStatusRowKey(row.payload);
const effectiveAt = depositStatusEffectiveAt(row);
const latestSortAt = timestampValue(row.observed_at || row.ingested_at);
const existing = byKey.get(key);
if (!existing) {
byKey.set(key, {
latest: row,
latestSortAt,
firstEffectiveAt: effectiveAt,
});
continue;
}
if (latestSortAt > existing.latestSortAt) {
existing.latest = row;
existing.latestSortAt = latestSortAt;
}
const candidateEffectiveTs = timestampValue(effectiveAt);
const existingEffectiveTs = timestampValue(existing.firstEffectiveAt);
if (
Number.isFinite(candidateEffectiveTs)
&& (!Number.isFinite(existingEffectiveTs) || candidateEffectiveTs < existingEffectiveTs)
) {
existing.firstEffectiveAt = effectiveAt;
}
}
return [...byKey.values()].map(({ latest, firstEffectiveAt }) => {
const effectiveAt = firstEffectiveAt || toIsoTimestamp(latest.observed_at || latest.ingested_at);
return {
observed_at: effectiveAt,
ingested_at: toIsoTimestamp(latest.ingested_at),
payload: withDepositCreatedAt(latest.payload, effectiveAt),
};
});
}
function depositStatusEffectiveAt(row) {
return toIsoTimestamp(
row?.payload?.details?.created_at
|| row?.observed_at
|| row?.ingested_at,
);
}
function withDepositCreatedAt(payload, effectiveAt) {
if (!effectiveAt) return payload;
const details = payload?.details || {};
return {
...payload,
details: {
...details,
created_at: details.created_at || effectiveAt,
},
};
}
async function ensureExpressionIndex(pool, { name, table, expression }) { async function ensureExpressionIndex(pool, { name, table, expression }) {
await pool.query(` await pool.query(`
CREATE INDEX IF NOT EXISTS ${name} CREATE INDEX IF NOT EXISTS ${name}

View file

@ -3,6 +3,7 @@ import assert from 'node:assert/strict';
import { import {
bridgeDepositAssetId, bridgeDepositAssetId,
bridgeDepositObservedAt,
intentsAssetIdFromNearTokenId, intentsAssetIdFromNearTokenId,
} from '../src/core/bridge-assets.mjs'; } from '../src/core/bridge-assets.mjs';
@ -37,3 +38,16 @@ test('bridgeDepositAssetId uses credited bridge near_token_id instead of chain-o
fallbackAssetId: LEGACY_BTC, fallbackAssetId: LEGACY_BTC,
}), NBTC); }), NBTC);
}); });
test('bridgeDepositObservedAt preserves bridge deposit creation time', () => {
assert.equal(bridgeDepositObservedAt({
created_at: '2026-04-07T15:20:19.909Z',
}), '2026-04-07T15:20:19.909Z');
assert.equal(bridgeDepositObservedAt({
created_at: 'not-a-date',
observed_at: '2026-04-07T15:20:24.814Z',
}), '2026-04-07T15:20:24.814Z');
assert.equal(bridgeDepositObservedAt({}), null);
});

View file

@ -45,3 +45,14 @@ test('history writer derived refresh policy skips events outside request freshne
maxEventAgeMs: 30000, maxEventAgeMs: 30000,
}), true); }), true);
}); });
test('history writer derived refresh policy prefers source observed time over replay ingestion time', () => {
assert.equal(shouldRunDerivedRefreshForEvent({
event: {
observed_at: '2026-04-07T15:20:19.909Z',
ingested_at: '2026-05-07T14:07:23.119Z',
},
now: '2026-05-07T14:07:24.000Z',
maxEventAgeMs: 30000,
}), false);
});

View file

@ -1399,8 +1399,8 @@ test('funding summary includes credited bridge deposits without observer-backed
fundingObservations: [], fundingObservations: [],
recentDepositStatuses: [ recentDepositStatuses: [
{ {
observed_at: '2026-04-07T15:20:00.000Z', observed_at: null,
ingested_at: '2026-04-07T15:20:01.000Z', ingested_at: '2026-05-07T14:07:23.107Z',
payload: { payload: {
action_type: 'deposit_status_observed', action_type: 'deposit_status_observed',
chain: config.tradingEure.chain, chain: config.tradingEure.chain,
@ -1442,10 +1442,11 @@ test('funding summary includes credited bridge deposits without observer-backed
amount: '24999999800000000000', amount: '24999999800000000000',
address: '0xdeposit', address: '0xdeposit',
status: 'COMPLETED', status: 'COMPLETED',
created_at: '2026-04-07T15:20:19.909Z',
}, },
}, },
tracked_withdrawals: {}, tracked_withdrawals: {},
last_refresh_at: '2026-04-07T15:20:10.000Z', last_refresh_at: '2026-05-07T14:07:23.107Z',
}, },
}, },
{ {
@ -1500,10 +1501,11 @@ test('funding summary includes credited bridge deposits without observer-backed
], ],
}); });
assert.equal(bootstrap.funds.funding.latest_observed_at, '2026-04-07T15:20:00.000Z'); assert.equal(bootstrap.funds.funding.latest_observed_at, '2026-04-07T15:20:19.909Z');
assert.equal(bootstrap.funds.funding.credited_deposits[0].asset_id, config.tradingEure.assetId); assert.equal(bootstrap.funds.funding.credited_deposits[0].asset_id, config.tradingEure.assetId);
assert.equal(bootstrap.funds.funding.credited_deposits[0].amount, '24.9999998'); assert.equal(bootstrap.funds.funding.credited_deposits[0].amount, '24.9999998');
assert.equal(bootstrap.funds.funding.recent_observations[0].tx_hash, 'eth-tx-1'); assert.equal(bootstrap.funds.funding.recent_observations[0].tx_hash, 'eth-tx-1');
assert.equal(bootstrap.funds.funding.recent_observations[0].last_seen_at, '2026-04-07T15:20:19.909Z');
assert.equal(bootstrap.funds.recent_deposits[0].tx_hash, 'eth-tx-1'); assert.equal(bootstrap.funds.recent_deposits[0].tx_hash, 'eth-tx-1');
}); });

View file

@ -0,0 +1,84 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { loadRecentDepositStatuses } from '../src/lib/postgres.mjs';
test('recent deposit status loader keeps old bridge deposits at original effective time', async () => {
const txHash = '0xe8755a069e10a5963953c847f48f2db857c7a2dfdf8d21579c4057366126dd3f';
const rows = [
depositRow({
txHash,
status: 'COMPLETED',
ingestedAt: '2026-05-07T14:07:23.107Z',
}),
depositRow({
txHash,
status: 'COMPLETED',
ingestedAt: '2026-04-07T15:20:54.757Z',
}),
depositRow({
txHash,
status: 'PENDING',
ingestedAt: '2026-04-07T15:20:24.814Z',
}),
];
const pool = {
async query(sql, params) {
assert.match(sql, /deposit_status_observed/);
assert.equal(params[0], 500);
return { rows };
},
};
const [deposit] = await loadRecentDepositStatuses(pool, { limit: 10 });
assert.equal(deposit.payload.status, 'COMPLETED');
assert.equal(deposit.ingested_at, '2026-05-07T14:07:23.107Z');
assert.equal(deposit.observed_at, '2026-04-07T15:20:24.814Z');
assert.equal(deposit.payload.details.created_at, '2026-04-07T15:20:24.814Z');
});
test('recent deposit status loader uses bridge created_at when present', async () => {
const rows = [
depositRow({
txHash: 'eth-tx-new',
status: 'COMPLETED',
ingestedAt: '2026-05-07T14:07:23.107Z',
createdAt: '2026-04-08T15:57:14.877Z',
}),
];
const pool = {
async query() {
return { rows };
},
};
const [deposit] = await loadRecentDepositStatuses(pool, { limit: 10 });
assert.equal(deposit.observed_at, '2026-04-08T15:57:14.877Z');
assert.equal(deposit.payload.details.created_at, '2026-04-08T15:57:14.877Z');
});
function depositRow({
txHash,
status,
ingestedAt,
createdAt = null,
}) {
return {
observed_at: null,
ingested_at: ingestedAt,
payload: {
action_type: 'deposit_status_observed',
chain: 'eth:100',
asset_id: 'nep141:eure.omft.near',
status,
details: {
tx_hash: txHash,
address: '0xdeposit',
amount: '24999999800000000000',
...(createdAt ? { created_at: createdAt } : {}),
},
},
};
}