diff --git a/src/apps/history-writer.mjs b/src/apps/history-writer.mjs index 22a8d45..e886efc 100644 --- a/src/apps/history-writer.mjs +++ b/src/apps/history-writer.mjs @@ -39,7 +39,7 @@ const topics = [ ]; for (const topic of topics) { - await consumer.subscribe({ topic, fromBeginning: true }); + await consumer.subscribe({ topic, fromBeginning: false }); } const state = { diff --git a/src/apps/liquidity-manager.mjs b/src/apps/liquidity-manager.mjs index 4a0f84d..98ac2c7 100644 --- a/src/apps/liquidity-manager.mjs +++ b/src/apps/liquidity-manager.mjs @@ -4,6 +4,7 @@ import { createProducer } from '../bus/kafka/producer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope } from '../core/event-envelope.mjs'; import { createJsonStateStore } from '../core/json-state-store.mjs'; +import { normalizeLiquidityState } from '../core/liquidity-state.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { assertLiquidityActionEvent } from '../core/schemas.mjs'; import { loadConfig } from '../lib/config.mjs'; @@ -55,7 +56,9 @@ const assetsByChain = new Map([ ]); async function refresh() { - const state = store.getState(); + const state = normalizeLiquidityState(store.getState(), { + withdrawalsFrozen: config.withdrawalsFrozen, + }); if (state.paused) return; try { @@ -212,6 +215,9 @@ const controlApi = startControlApi({ path: '/pause', handler: () => { const state = store.getState(); + normalizeLiquidityState(state, { + withdrawalsFrozen: config.withdrawalsFrozen, + }); state.paused = true; store.setState(state); return { ok: true, paused: true }; @@ -222,6 +228,9 @@ const controlApi = startControlApi({ path: '/resume', handler: async () => { const state = store.getState(); + normalizeLiquidityState(state, { + withdrawalsFrozen: config.withdrawalsFrozen, + }); state.paused = false; store.setState(state); await refresh(); @@ -233,6 +242,9 @@ const controlApi = startControlApi({ path: '/freeze-withdrawals', handler: async ({ body }) => { const state = store.getState(); + normalizeLiquidityState(state, { + withdrawalsFrozen: config.withdrawalsFrozen, + }); state.withdrawals_frozen = body.frozen !== false; store.setState(state); await publishAction({ @@ -260,6 +272,9 @@ const controlApi = startControlApi({ } const state = store.getState(); + normalizeLiquidityState(state, { + withdrawalsFrozen: config.withdrawalsFrozen, + }); state.tracked_withdrawals[body.withdrawal_hash] = { withdrawal_hash: body.withdrawal_hash, asset_id: body.asset_id, diff --git a/src/core/executor-state-store.mjs b/src/core/executor-state-store.mjs index 726126b..851143b 100644 --- a/src/core/executor-state-store.mjs +++ b/src/core/executor-state-store.mjs @@ -4,7 +4,7 @@ const INITIAL_STATE = { commands: {}, }; -export function createExecutorStateStore({ stateDir, fileName = 'commands.json' }) { +export function createExecutorStateStore({ stateDir, fileName = 'trade-executor-commands.json' }) { const store = createJsonStateStore({ stateDir, fileName, diff --git a/src/core/liquidity-state.mjs b/src/core/liquidity-state.mjs new file mode 100644 index 0000000..8d0f8bf --- /dev/null +++ b/src/core/liquidity-state.mjs @@ -0,0 +1,10 @@ +export function normalizeLiquidityState(state, { withdrawalsFrozen }) { + state.deposit_addresses ||= {}; + state.deposits ||= {}; + state.tracked_withdrawals ||= {}; + state.supported_tokens ||= {}; + state.publish_count ||= 0; + state.withdrawals_frozen ??= withdrawalsFrozen; + state.paused ??= false; + return state; +} diff --git a/src/core/schemas.mjs b/src/core/schemas.mjs index 734f5bd..95204c1 100644 --- a/src/core/schemas.mjs +++ b/src/core/schemas.mjs @@ -94,14 +94,14 @@ export function assertExecuteTradeCommand(event) { const payload = event.payload; requireString(payload.command_id, 'payload.command_id'); - requireString(payload.decision_id, 'payload.decision_id'); requireString(payload.idempotency_key, 'payload.idempotency_key'); requireString(payload.execution_key, 'payload.execution_key'); requireString(payload.quote_id, 'payload.quote_id'); requireString(payload.asset_in, 'payload.asset_in'); requireString(payload.asset_out, 'payload.asset_out'); - requireString(payload.request_kind, 'payload.request_kind'); - requireObject(payload.quote_output, 'payload.quote_output'); + if (payload.decision_id != null) requireString(payload.decision_id, 'payload.decision_id'); + if (payload.request_kind != null) requireString(payload.request_kind, 'payload.request_kind'); + if (payload.quote_output != null) requireObject(payload.quote_output, 'payload.quote_output'); if (payload.amount_in != null) requireString(payload.amount_in, 'payload.amount_in'); if (payload.amount_out != null) requireString(payload.amount_out, 'payload.amount_out'); return event; @@ -113,11 +113,11 @@ export function assertTradeResult(event) { const payload = event.payload; requireString(payload.command_id, 'payload.command_id'); - requireString(payload.decision_id, 'payload.decision_id'); requireString(payload.idempotency_key, 'payload.idempotency_key'); requireString(payload.execution_key, 'payload.execution_key'); requireString(payload.quote_id, 'payload.quote_id'); requireString(payload.status, 'payload.status'); + if (payload.decision_id != null) requireString(payload.decision_id, 'payload.decision_id'); if (payload.result_code != null) requireString(payload.result_code, 'payload.result_code'); return event; } diff --git a/test/legacy-schema-compat.test.mjs b/test/legacy-schema-compat.test.mjs new file mode 100644 index 0000000..b92d7e9 --- /dev/null +++ b/test/legacy-schema-compat.test.mjs @@ -0,0 +1,54 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { routeHistoryRecord } from '../src/core/history-records.mjs'; + +test('history routing accepts legacy execute_trade records without decision metadata', () => { + const routed = routeHistoryRecord({ + topic: 'cmd.execute_trade', + event: { + event_id: 'cmd-legacy-1', + event_type: 'execute_trade', + venue: 'near-intents', + schema_version: 1, + ingested_at: '2026-04-02T08:00:00.000Z', + payload: { + command_id: 'cmd-legacy-1', + idempotency_key: 'near-intents:legacy-1', + execution_key: 'near-intents:btc->eure', + quote_id: 'legacy-1', + asset_in: 'nep141:btc.omft.near', + asset_out: 'nep141:eure.omft.near', + amount_in: '100', + amount_out: '200', + }, + }, + }); + + assert.equal(routed.table, 'execute_trade_commands'); + assert.equal(routed.record.decision_key, 'cmd-legacy-1'); +}); + +test('history routing accepts legacy trade_result records without decision metadata', () => { + const routed = routeHistoryRecord({ + topic: 'exec.trade_result', + event: { + event_id: 'result-legacy-1', + event_type: 'trade_result', + venue: 'near-intents', + schema_version: 1, + ingested_at: '2026-04-02T08:00:00.000Z', + payload: { + command_id: 'cmd-legacy-1', + idempotency_key: 'near-intents:legacy-1', + execution_key: 'near-intents:btc->eure', + quote_id: 'legacy-1', + status: 'simulated_sent', + result_code: 'sent', + }, + }, + }); + + assert.equal(routed.table, 'trade_execution_results'); + assert.equal(routed.record.decision_key, 'cmd-legacy-1'); +}); diff --git a/test/liquidity-state.test.mjs b/test/liquidity-state.test.mjs new file mode 100644 index 0000000..8de75d7 --- /dev/null +++ b/test/liquidity-state.test.mjs @@ -0,0 +1,21 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { normalizeLiquidityState } from '../src/core/liquidity-state.mjs'; + +test('normalizeLiquidityState hydrates missing nested maps from persisted partial state', () => { + const state = normalizeLiquidityState( + { + last_refresh_at: null, + publish_count: 0, + }, + { withdrawalsFrozen: true }, + ); + + assert.deepEqual(state.deposit_addresses, {}); + assert.deepEqual(state.deposits, {}); + assert.deepEqual(state.tracked_withdrawals, {}); + assert.deepEqual(state.supported_tokens, {}); + assert.equal(state.withdrawals_frozen, true); + assert.equal(state.paused, false); +});