Fix live ops regressions
All checks were successful
deploy / deploy (push) Successful in 43s

Proof: active BTC/EURe tradeable loop on k3s must expose funding state cleanly, preserve durable history, and avoid inheriting dummy executor state.

Assumptions: retained Kafka topics may contain legacy dummy records; those should be tolerated in history ingestion without weakening the current live command shape.

Still fake: internal inventory is still unfunded, strategy and executor remain disarmed, and no live quote response has been submitted yet.
This commit is contained in:
philipp 2026-04-02 10:07:54 +02:00
parent 41b9ec680b
commit d6fc99dc60
7 changed files with 107 additions and 7 deletions

View file

@ -39,7 +39,7 @@ const topics = [
];
for (const topic of topics) {
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.subscribe({ topic, fromBeginning: false });
}
const state = {

View file

@ -4,6 +4,7 @@ import { createProducer } from '../bus/kafka/producer.mjs';
import { startControlApi } from '../core/control-api.mjs';
import { buildEventEnvelope } from '../core/event-envelope.mjs';
import { createJsonStateStore } from '../core/json-state-store.mjs';
import { normalizeLiquidityState } from '../core/liquidity-state.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import { assertLiquidityActionEvent } from '../core/schemas.mjs';
import { loadConfig } from '../lib/config.mjs';
@ -55,7 +56,9 @@ const assetsByChain = new Map([
]);
async function refresh() {
const state = store.getState();
const state = normalizeLiquidityState(store.getState(), {
withdrawalsFrozen: config.withdrawalsFrozen,
});
if (state.paused) return;
try {
@ -212,6 +215,9 @@ const controlApi = startControlApi({
path: '/pause',
handler: () => {
const state = store.getState();
normalizeLiquidityState(state, {
withdrawalsFrozen: config.withdrawalsFrozen,
});
state.paused = true;
store.setState(state);
return { ok: true, paused: true };
@ -222,6 +228,9 @@ const controlApi = startControlApi({
path: '/resume',
handler: async () => {
const state = store.getState();
normalizeLiquidityState(state, {
withdrawalsFrozen: config.withdrawalsFrozen,
});
state.paused = false;
store.setState(state);
await refresh();
@ -233,6 +242,9 @@ const controlApi = startControlApi({
path: '/freeze-withdrawals',
handler: async ({ body }) => {
const state = store.getState();
normalizeLiquidityState(state, {
withdrawalsFrozen: config.withdrawalsFrozen,
});
state.withdrawals_frozen = body.frozen !== false;
store.setState(state);
await publishAction({
@ -260,6 +272,9 @@ const controlApi = startControlApi({
}
const state = store.getState();
normalizeLiquidityState(state, {
withdrawalsFrozen: config.withdrawalsFrozen,
});
state.tracked_withdrawals[body.withdrawal_hash] = {
withdrawal_hash: body.withdrawal_hash,
asset_id: body.asset_id,

View file

@ -4,7 +4,7 @@ const INITIAL_STATE = {
commands: {},
};
export function createExecutorStateStore({ stateDir, fileName = 'commands.json' }) {
export function createExecutorStateStore({ stateDir, fileName = 'trade-executor-commands.json' }) {
const store = createJsonStateStore({
stateDir,
fileName,

View file

@ -0,0 +1,10 @@
export function normalizeLiquidityState(state, { withdrawalsFrozen }) {
state.deposit_addresses ||= {};
state.deposits ||= {};
state.tracked_withdrawals ||= {};
state.supported_tokens ||= {};
state.publish_count ||= 0;
state.withdrawals_frozen ??= withdrawalsFrozen;
state.paused ??= false;
return state;
}

View file

@ -94,14 +94,14 @@ export function assertExecuteTradeCommand(event) {
const payload = event.payload;
requireString(payload.command_id, 'payload.command_id');
requireString(payload.decision_id, 'payload.decision_id');
requireString(payload.idempotency_key, 'payload.idempotency_key');
requireString(payload.execution_key, 'payload.execution_key');
requireString(payload.quote_id, 'payload.quote_id');
requireString(payload.asset_in, 'payload.asset_in');
requireString(payload.asset_out, 'payload.asset_out');
requireString(payload.request_kind, 'payload.request_kind');
requireObject(payload.quote_output, 'payload.quote_output');
if (payload.decision_id != null) requireString(payload.decision_id, 'payload.decision_id');
if (payload.request_kind != null) requireString(payload.request_kind, 'payload.request_kind');
if (payload.quote_output != null) requireObject(payload.quote_output, 'payload.quote_output');
if (payload.amount_in != null) requireString(payload.amount_in, 'payload.amount_in');
if (payload.amount_out != null) requireString(payload.amount_out, 'payload.amount_out');
return event;
@ -113,11 +113,11 @@ export function assertTradeResult(event) {
const payload = event.payload;
requireString(payload.command_id, 'payload.command_id');
requireString(payload.decision_id, 'payload.decision_id');
requireString(payload.idempotency_key, 'payload.idempotency_key');
requireString(payload.execution_key, 'payload.execution_key');
requireString(payload.quote_id, 'payload.quote_id');
requireString(payload.status, 'payload.status');
if (payload.decision_id != null) requireString(payload.decision_id, 'payload.decision_id');
if (payload.result_code != null) requireString(payload.result_code, 'payload.result_code');
return event;
}

View file

@ -0,0 +1,54 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { routeHistoryRecord } from '../src/core/history-records.mjs';
test('history routing accepts legacy execute_trade records without decision metadata', () => {
const routed = routeHistoryRecord({
topic: 'cmd.execute_trade',
event: {
event_id: 'cmd-legacy-1',
event_type: 'execute_trade',
venue: 'near-intents',
schema_version: 1,
ingested_at: '2026-04-02T08:00:00.000Z',
payload: {
command_id: 'cmd-legacy-1',
idempotency_key: 'near-intents:legacy-1',
execution_key: 'near-intents:btc->eure',
quote_id: 'legacy-1',
asset_in: 'nep141:btc.omft.near',
asset_out: 'nep141:eure.omft.near',
amount_in: '100',
amount_out: '200',
},
},
});
assert.equal(routed.table, 'execute_trade_commands');
assert.equal(routed.record.decision_key, 'cmd-legacy-1');
});
test('history routing accepts legacy trade_result records without decision metadata', () => {
const routed = routeHistoryRecord({
topic: 'exec.trade_result',
event: {
event_id: 'result-legacy-1',
event_type: 'trade_result',
venue: 'near-intents',
schema_version: 1,
ingested_at: '2026-04-02T08:00:00.000Z',
payload: {
command_id: 'cmd-legacy-1',
idempotency_key: 'near-intents:legacy-1',
execution_key: 'near-intents:btc->eure',
quote_id: 'legacy-1',
status: 'simulated_sent',
result_code: 'sent',
},
},
});
assert.equal(routed.table, 'trade_execution_results');
assert.equal(routed.record.decision_key, 'cmd-legacy-1');
});

View file

@ -0,0 +1,21 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { normalizeLiquidityState } from '../src/core/liquidity-state.mjs';
test('normalizeLiquidityState hydrates missing nested maps from persisted partial state', () => {
const state = normalizeLiquidityState(
{
last_refresh_at: null,
publish_count: 0,
},
{ withdrawalsFrozen: true },
);
assert.deepEqual(state.deposit_addresses, {});
assert.deepEqual(state.deposits, {});
assert.deepEqual(state.tracked_withdrawals, {});
assert.deepEqual(state.supported_tokens, {});
assert.equal(state.withdrawals_frozen, true);
assert.equal(state.paused, false);
});