diff --git a/.env.example b/.env.example index 8547df9..de24d66 100644 --- a/.env.example +++ b/.env.example @@ -63,6 +63,7 @@ KAFKA_CONSUMER_GROUP_OPS_SENTINEL=ops-sentinel-v1 POSTGRES_URL=postgresql://unrip:unrip@postgres:5432/unrip # Service state +STRATEGY_STATE_DIR=/var/lib/unrip/strategy-state EXECUTOR_STATE_DIR=/var/lib/unrip/executor-state LIQUIDITY_STATE_DIR=/var/lib/unrip/liquidity-state diff --git a/compose.yml b/compose.yml index 8852ccf..f8b6681 100644 --- a/compose.yml +++ b/compose.yml @@ -116,6 +116,8 @@ services: redpanda: condition: service_healthy restart: unless-stopped + volumes: + - strategy-state:/var/lib/unrip/strategy-state trade-executor: build: . @@ -131,5 +133,6 @@ services: volumes: redpanda-data: postgres-data: + strategy-state: executor-state: liquidity-state: diff --git a/deploy/k8s/base/unrip.yaml b/deploy/k8s/base/unrip.yaml index eebfa14..a02463e 100644 --- a/deploy/k8s/base/unrip.yaml +++ b/deploy/k8s/base/unrip.yaml @@ -57,6 +57,7 @@ data: KAFKA_CONSUMER_GROUP_STRATEGY: strategy-engine-v1 KAFKA_CONSUMER_GROUP_EXECUTOR: trade-executor-v1 KAFKA_CONSUMER_GROUP_OPS_SENTINEL: ops-sentinel-v1 + STRATEGY_STATE_DIR: /var/lib/unrip/strategy-state EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state LIQUIDITY_STATE_DIR: /var/lib/unrip/liquidity-state MARKET_REFERENCE_REFRESH_MS: "5000" @@ -85,6 +86,17 @@ data: --- apiVersion: v1 kind: PersistentVolumeClaim +metadata: + name: strategy-state + namespace: unrip +spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 5Gi +--- +apiVersion: v1 +kind: PersistentVolumeClaim metadata: name: executor-state namespace: unrip @@ -335,6 +347,13 @@ spec: name: unrip-config - secretRef: name: unrip-secrets + volumeMounts: + - name: strategy-state + mountPath: /var/lib/unrip/strategy-state + volumes: + - name: strategy-state + persistentVolumeClaim: + claimName: strategy-state --- apiVersion: apps/v1 kind: Deployment diff --git a/src/apps/strategy-engine.mjs b/src/apps/strategy-engine.mjs index 90b6e8c..a900e3b 100644 --- a/src/apps/strategy-engine.mjs +++ b/src/apps/strategy-engine.mjs @@ -2,6 +2,7 @@ import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; +import { createArmedStateStore } from '../core/armed-state-store.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; @@ -28,13 +29,18 @@ const producer = await createProducer({ clientId: config.kafkaClientId, logger, }); +const armedStateStore = createArmedStateStore({ + stateDir: config.strategyStateDir, + fileName: 'strategy-engine-control.json', + initialArmed: config.strategyInitialArmed, +}); await consumer.subscribe({ topic: config.kafkaTopicNormSwapDemand, fromBeginning: false }); await consumer.subscribe({ topic: config.kafkaTopicRefMarketPrice, fromBeginning: false }); await consumer.subscribe({ topic: config.kafkaTopicStateIntentInventory, fromBeginning: false }); const state = { - armed: config.strategyInitialArmed, + armed: armedStateStore.isArmed(), paused: false, threshold_pct: config.strategyGrossThresholdPct, max_notional_eure: config.strategyMaxNotionalEure, @@ -147,7 +153,10 @@ const controlApi = startControlApi({ namespace: config.projectNamespace, stateProvider: { getState() { - return state; + return { + ...state, + durable_control_state: armedStateStore.getState(), + }; }, }, routes: [ @@ -155,7 +164,7 @@ const controlApi = startControlApi({ method: 'POST', path: '/arm', handler: () => { - state.armed = true; + state.armed = armedStateStore.setArmed(true).armed; logger.warn('strategy_armed', { pair: config.activePair }); return { ok: true, armed: true }; }, @@ -164,7 +173,7 @@ const controlApi = startControlApi({ method: 'POST', path: '/disarm', handler: () => { - state.armed = false; + state.armed = armedStateStore.setArmed(false).armed; logger.warn('strategy_disarmed', { pair: config.activePair }); return { ok: true, armed: false }; }, diff --git a/src/apps/trade-executor.mjs b/src/apps/trade-executor.mjs index 9f4c2f2..2fff22c 100644 --- a/src/apps/trade-executor.mjs +++ b/src/apps/trade-executor.mjs @@ -2,6 +2,7 @@ import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; +import { createArmedStateStore } from '../core/armed-state-store.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createExecutorStateStore } from '../core/executor-state-store.mjs'; @@ -61,9 +62,14 @@ const relayClient = await startSolverRelayWs({ }, }); const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir }); +const armedStateStore = createArmedStateStore({ + stateDir: config.executorStateDir, + fileName: 'trade-executor-control.json', + initialArmed: config.executorInitialArmed, +}); const state = { - armed: config.executorInitialArmed, + armed: armedStateStore.isArmed(), paused: false, draining: false, last_command: null, @@ -210,6 +216,7 @@ const controlApi = startControlApi({ signer_public_key: signer.getPublicKey().toString(), signer_registered: signerRegistered, ...state, + durable_control_state: armedStateStore.getState(), durable_state: stateStore.getState(), }; }, @@ -219,7 +226,7 @@ const controlApi = startControlApi({ method: 'POST', path: '/arm', handler: () => { - state.armed = true; + state.armed = armedStateStore.setArmed(true).armed; return { ok: true, armed: true }; }, }, @@ -227,7 +234,7 @@ const controlApi = startControlApi({ method: 'POST', path: '/disarm', handler: () => { - state.armed = false; + state.armed = armedStateStore.setArmed(false).armed; return { ok: true, armed: false }; }, }, diff --git a/src/core/armed-state-store.mjs b/src/core/armed-state-store.mjs new file mode 100644 index 0000000..62c009d --- /dev/null +++ b/src/core/armed-state-store.mjs @@ -0,0 +1,34 @@ +import { createJsonStateStore } from './json-state-store.mjs'; + +export function createArmedStateStore({ + stateDir, + fileName = 'armed-state.json', + initialArmed = false, +}) { + const store = createJsonStateStore({ + stateDir, + fileName, + initialState: { + armed: Boolean(initialArmed), + updated_at: null, + }, + }); + + return { + getState() { + return store.getState(); + }, + isArmed() { + return Boolean(store.getState().armed); + }, + setArmed(armed) { + const nextArmed = Boolean(armed); + const nextState = store.update((state) => { + state.armed = nextArmed; + state.updated_at = new Date().toISOString(); + return state; + }); + return nextState; + }, + }; +} diff --git a/src/lib/config.mjs b/src/lib/config.mjs index aa8b84a..6f79d0d 100644 --- a/src/lib/config.mjs +++ b/src/lib/config.mjs @@ -36,6 +36,7 @@ const DEFAULTS = { kafkaConsumerGroupStrategy: 'strategy-engine-v1', kafkaConsumerGroupExecutor: 'trade-executor-v1', kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1', + strategyStateDir: './var/strategy-state', executorStateDir: './var/executor-state', liquidityStateDir: './var/liquidity-state', postgresUrl: 'postgresql://unrip:unrip@127.0.0.1:5432/unrip', @@ -232,6 +233,7 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor, kafkaConsumerGroupOpsSentinel: process.env.KAFKA_CONSUMER_GROUP_OPS_SENTINEL || DEFAULTS.kafkaConsumerGroupOpsSentinel, + strategyStateDir: process.env.STRATEGY_STATE_DIR || DEFAULTS.strategyStateDir, executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir, liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir, postgresUrl: process.env.POSTGRES_URL || DEFAULTS.postgresUrl, diff --git a/test/armed-state-store.test.mjs b/test/armed-state-store.test.mjs new file mode 100644 index 0000000..41f7908 --- /dev/null +++ b/test/armed-state-store.test.mjs @@ -0,0 +1,43 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; + +import { createArmedStateStore } from '../src/core/armed-state-store.mjs'; + +test('armed state store persists arm changes across reload', () => { + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), 'unrip-armed-state-')); + const store = createArmedStateStore({ + stateDir, + fileName: 'strategy-engine-control.json', + initialArmed: false, + }); + + assert.equal(store.isArmed(), false); + + const armedState = store.setArmed(true); + assert.equal(armedState.armed, true); + assert.ok(armedState.updated_at); + + const reloaded = createArmedStateStore({ + stateDir, + fileName: 'strategy-engine-control.json', + initialArmed: false, + }); + + assert.equal(reloaded.isArmed(), true); + assert.equal(reloaded.getState().armed, true); +}); + +test('armed state store falls back to initial state when no persisted file exists', () => { + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), 'unrip-armed-state-default-')); + const store = createArmedStateStore({ + stateDir, + fileName: 'trade-executor-control.json', + initialArmed: true, + }); + + assert.equal(store.isArmed(), true); + assert.equal(store.getState().updated_at, null); +});