Persist armed state across rollout
All checks were successful
deploy / deploy (push) Successful in 21s
All checks were successful
deploy / deploy (push) Successful in 21s
Proof: Strategy and executor arm state now survives pod restarts through durable control-state files mounted in their service state directories. Assumptions: The strategy-state PVC can be seeded before the first rollout restart, and both service state directories remain writable on the cluster. Still fake: Armed-state durability is local to the cluster volumes; there is no cross-cluster or database-backed operator control-state replication.
This commit is contained in:
parent
860471f267
commit
af74c48f29
8 changed files with 125 additions and 7 deletions
|
|
@ -63,6 +63,7 @@ KAFKA_CONSUMER_GROUP_OPS_SENTINEL=ops-sentinel-v1
|
||||||
POSTGRES_URL=postgresql://unrip:unrip@postgres:5432/unrip
|
POSTGRES_URL=postgresql://unrip:unrip@postgres:5432/unrip
|
||||||
|
|
||||||
# Service state
|
# Service state
|
||||||
|
STRATEGY_STATE_DIR=/var/lib/unrip/strategy-state
|
||||||
EXECUTOR_STATE_DIR=/var/lib/unrip/executor-state
|
EXECUTOR_STATE_DIR=/var/lib/unrip/executor-state
|
||||||
LIQUIDITY_STATE_DIR=/var/lib/unrip/liquidity-state
|
LIQUIDITY_STATE_DIR=/var/lib/unrip/liquidity-state
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -116,6 +116,8 @@ services:
|
||||||
redpanda:
|
redpanda:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
volumes:
|
||||||
|
- strategy-state:/var/lib/unrip/strategy-state
|
||||||
|
|
||||||
trade-executor:
|
trade-executor:
|
||||||
build: .
|
build: .
|
||||||
|
|
@ -131,5 +133,6 @@ services:
|
||||||
volumes:
|
volumes:
|
||||||
redpanda-data:
|
redpanda-data:
|
||||||
postgres-data:
|
postgres-data:
|
||||||
|
strategy-state:
|
||||||
executor-state:
|
executor-state:
|
||||||
liquidity-state:
|
liquidity-state:
|
||||||
|
|
|
||||||
|
|
@ -57,6 +57,7 @@ data:
|
||||||
KAFKA_CONSUMER_GROUP_STRATEGY: strategy-engine-v1
|
KAFKA_CONSUMER_GROUP_STRATEGY: strategy-engine-v1
|
||||||
KAFKA_CONSUMER_GROUP_EXECUTOR: trade-executor-v1
|
KAFKA_CONSUMER_GROUP_EXECUTOR: trade-executor-v1
|
||||||
KAFKA_CONSUMER_GROUP_OPS_SENTINEL: ops-sentinel-v1
|
KAFKA_CONSUMER_GROUP_OPS_SENTINEL: ops-sentinel-v1
|
||||||
|
STRATEGY_STATE_DIR: /var/lib/unrip/strategy-state
|
||||||
EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state
|
EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state
|
||||||
LIQUIDITY_STATE_DIR: /var/lib/unrip/liquidity-state
|
LIQUIDITY_STATE_DIR: /var/lib/unrip/liquidity-state
|
||||||
MARKET_REFERENCE_REFRESH_MS: "5000"
|
MARKET_REFERENCE_REFRESH_MS: "5000"
|
||||||
|
|
@ -85,6 +86,17 @@ data:
|
||||||
---
|
---
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: PersistentVolumeClaim
|
kind: PersistentVolumeClaim
|
||||||
|
metadata:
|
||||||
|
name: strategy-state
|
||||||
|
namespace: unrip
|
||||||
|
spec:
|
||||||
|
accessModes: ["ReadWriteOnce"]
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
storage: 5Gi
|
||||||
|
---
|
||||||
|
apiVersion: v1
|
||||||
|
kind: PersistentVolumeClaim
|
||||||
metadata:
|
metadata:
|
||||||
name: executor-state
|
name: executor-state
|
||||||
namespace: unrip
|
namespace: unrip
|
||||||
|
|
@ -335,6 +347,13 @@ spec:
|
||||||
name: unrip-config
|
name: unrip-config
|
||||||
- secretRef:
|
- secretRef:
|
||||||
name: unrip-secrets
|
name: unrip-secrets
|
||||||
|
volumeMounts:
|
||||||
|
- name: strategy-state
|
||||||
|
mountPath: /var/lib/unrip/strategy-state
|
||||||
|
volumes:
|
||||||
|
- name: strategy-state
|
||||||
|
persistentVolumeClaim:
|
||||||
|
claimName: strategy-state
|
||||||
---
|
---
|
||||||
apiVersion: apps/v1
|
apiVersion: apps/v1
|
||||||
kind: Deployment
|
kind: Deployment
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ import process from 'node:process';
|
||||||
|
|
||||||
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
||||||
import { createProducer } from '../bus/kafka/producer.mjs';
|
import { createProducer } from '../bus/kafka/producer.mjs';
|
||||||
|
import { createArmedStateStore } from '../core/armed-state-store.mjs';
|
||||||
import { startControlApi } from '../core/control-api.mjs';
|
import { startControlApi } from '../core/control-api.mjs';
|
||||||
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
|
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
|
||||||
import { createLogger, serializeError } from '../core/log.mjs';
|
import { createLogger, serializeError } from '../core/log.mjs';
|
||||||
|
|
@ -28,13 +29,18 @@ const producer = await createProducer({
|
||||||
clientId: config.kafkaClientId,
|
clientId: config.kafkaClientId,
|
||||||
logger,
|
logger,
|
||||||
});
|
});
|
||||||
|
const armedStateStore = createArmedStateStore({
|
||||||
|
stateDir: config.strategyStateDir,
|
||||||
|
fileName: 'strategy-engine-control.json',
|
||||||
|
initialArmed: config.strategyInitialArmed,
|
||||||
|
});
|
||||||
|
|
||||||
await consumer.subscribe({ topic: config.kafkaTopicNormSwapDemand, fromBeginning: false });
|
await consumer.subscribe({ topic: config.kafkaTopicNormSwapDemand, fromBeginning: false });
|
||||||
await consumer.subscribe({ topic: config.kafkaTopicRefMarketPrice, fromBeginning: false });
|
await consumer.subscribe({ topic: config.kafkaTopicRefMarketPrice, fromBeginning: false });
|
||||||
await consumer.subscribe({ topic: config.kafkaTopicStateIntentInventory, fromBeginning: false });
|
await consumer.subscribe({ topic: config.kafkaTopicStateIntentInventory, fromBeginning: false });
|
||||||
|
|
||||||
const state = {
|
const state = {
|
||||||
armed: config.strategyInitialArmed,
|
armed: armedStateStore.isArmed(),
|
||||||
paused: false,
|
paused: false,
|
||||||
threshold_pct: config.strategyGrossThresholdPct,
|
threshold_pct: config.strategyGrossThresholdPct,
|
||||||
max_notional_eure: config.strategyMaxNotionalEure,
|
max_notional_eure: config.strategyMaxNotionalEure,
|
||||||
|
|
@ -147,7 +153,10 @@ const controlApi = startControlApi({
|
||||||
namespace: config.projectNamespace,
|
namespace: config.projectNamespace,
|
||||||
stateProvider: {
|
stateProvider: {
|
||||||
getState() {
|
getState() {
|
||||||
return state;
|
return {
|
||||||
|
...state,
|
||||||
|
durable_control_state: armedStateStore.getState(),
|
||||||
|
};
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
routes: [
|
routes: [
|
||||||
|
|
@ -155,7 +164,7 @@ const controlApi = startControlApi({
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
path: '/arm',
|
path: '/arm',
|
||||||
handler: () => {
|
handler: () => {
|
||||||
state.armed = true;
|
state.armed = armedStateStore.setArmed(true).armed;
|
||||||
logger.warn('strategy_armed', { pair: config.activePair });
|
logger.warn('strategy_armed', { pair: config.activePair });
|
||||||
return { ok: true, armed: true };
|
return { ok: true, armed: true };
|
||||||
},
|
},
|
||||||
|
|
@ -164,7 +173,7 @@ const controlApi = startControlApi({
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
path: '/disarm',
|
path: '/disarm',
|
||||||
handler: () => {
|
handler: () => {
|
||||||
state.armed = false;
|
state.armed = armedStateStore.setArmed(false).armed;
|
||||||
logger.warn('strategy_disarmed', { pair: config.activePair });
|
logger.warn('strategy_disarmed', { pair: config.activePair });
|
||||||
return { ok: true, armed: false };
|
return { ok: true, armed: false };
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ import process from 'node:process';
|
||||||
|
|
||||||
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
||||||
import { createProducer } from '../bus/kafka/producer.mjs';
|
import { createProducer } from '../bus/kafka/producer.mjs';
|
||||||
|
import { createArmedStateStore } from '../core/armed-state-store.mjs';
|
||||||
import { startControlApi } from '../core/control-api.mjs';
|
import { startControlApi } from '../core/control-api.mjs';
|
||||||
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
|
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
|
||||||
import { createExecutorStateStore } from '../core/executor-state-store.mjs';
|
import { createExecutorStateStore } from '../core/executor-state-store.mjs';
|
||||||
|
|
@ -61,9 +62,14 @@ const relayClient = await startSolverRelayWs({
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir });
|
const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir });
|
||||||
|
const armedStateStore = createArmedStateStore({
|
||||||
|
stateDir: config.executorStateDir,
|
||||||
|
fileName: 'trade-executor-control.json',
|
||||||
|
initialArmed: config.executorInitialArmed,
|
||||||
|
});
|
||||||
|
|
||||||
const state = {
|
const state = {
|
||||||
armed: config.executorInitialArmed,
|
armed: armedStateStore.isArmed(),
|
||||||
paused: false,
|
paused: false,
|
||||||
draining: false,
|
draining: false,
|
||||||
last_command: null,
|
last_command: null,
|
||||||
|
|
@ -210,6 +216,7 @@ const controlApi = startControlApi({
|
||||||
signer_public_key: signer.getPublicKey().toString(),
|
signer_public_key: signer.getPublicKey().toString(),
|
||||||
signer_registered: signerRegistered,
|
signer_registered: signerRegistered,
|
||||||
...state,
|
...state,
|
||||||
|
durable_control_state: armedStateStore.getState(),
|
||||||
durable_state: stateStore.getState(),
|
durable_state: stateStore.getState(),
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
|
|
@ -219,7 +226,7 @@ const controlApi = startControlApi({
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
path: '/arm',
|
path: '/arm',
|
||||||
handler: () => {
|
handler: () => {
|
||||||
state.armed = true;
|
state.armed = armedStateStore.setArmed(true).armed;
|
||||||
return { ok: true, armed: true };
|
return { ok: true, armed: true };
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
@ -227,7 +234,7 @@ const controlApi = startControlApi({
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
path: '/disarm',
|
path: '/disarm',
|
||||||
handler: () => {
|
handler: () => {
|
||||||
state.armed = false;
|
state.armed = armedStateStore.setArmed(false).armed;
|
||||||
return { ok: true, armed: false };
|
return { ok: true, armed: false };
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
34
src/core/armed-state-store.mjs
Normal file
34
src/core/armed-state-store.mjs
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
import { createJsonStateStore } from './json-state-store.mjs';
|
||||||
|
|
||||||
|
export function createArmedStateStore({
|
||||||
|
stateDir,
|
||||||
|
fileName = 'armed-state.json',
|
||||||
|
initialArmed = false,
|
||||||
|
}) {
|
||||||
|
const store = createJsonStateStore({
|
||||||
|
stateDir,
|
||||||
|
fileName,
|
||||||
|
initialState: {
|
||||||
|
armed: Boolean(initialArmed),
|
||||||
|
updated_at: null,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
getState() {
|
||||||
|
return store.getState();
|
||||||
|
},
|
||||||
|
isArmed() {
|
||||||
|
return Boolean(store.getState().armed);
|
||||||
|
},
|
||||||
|
setArmed(armed) {
|
||||||
|
const nextArmed = Boolean(armed);
|
||||||
|
const nextState = store.update((state) => {
|
||||||
|
state.armed = nextArmed;
|
||||||
|
state.updated_at = new Date().toISOString();
|
||||||
|
return state;
|
||||||
|
});
|
||||||
|
return nextState;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
@ -36,6 +36,7 @@ const DEFAULTS = {
|
||||||
kafkaConsumerGroupStrategy: 'strategy-engine-v1',
|
kafkaConsumerGroupStrategy: 'strategy-engine-v1',
|
||||||
kafkaConsumerGroupExecutor: 'trade-executor-v1',
|
kafkaConsumerGroupExecutor: 'trade-executor-v1',
|
||||||
kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1',
|
kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1',
|
||||||
|
strategyStateDir: './var/strategy-state',
|
||||||
executorStateDir: './var/executor-state',
|
executorStateDir: './var/executor-state',
|
||||||
liquidityStateDir: './var/liquidity-state',
|
liquidityStateDir: './var/liquidity-state',
|
||||||
postgresUrl: 'postgresql://unrip:unrip@127.0.0.1:5432/unrip',
|
postgresUrl: 'postgresql://unrip:unrip@127.0.0.1:5432/unrip',
|
||||||
|
|
@ -232,6 +233,7 @@ export function loadConfig({ envPath = '.env' } = {}) {
|
||||||
process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor,
|
process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor,
|
||||||
kafkaConsumerGroupOpsSentinel:
|
kafkaConsumerGroupOpsSentinel:
|
||||||
process.env.KAFKA_CONSUMER_GROUP_OPS_SENTINEL || DEFAULTS.kafkaConsumerGroupOpsSentinel,
|
process.env.KAFKA_CONSUMER_GROUP_OPS_SENTINEL || DEFAULTS.kafkaConsumerGroupOpsSentinel,
|
||||||
|
strategyStateDir: process.env.STRATEGY_STATE_DIR || DEFAULTS.strategyStateDir,
|
||||||
executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir,
|
executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir,
|
||||||
liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir,
|
liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir,
|
||||||
postgresUrl: process.env.POSTGRES_URL || DEFAULTS.postgresUrl,
|
postgresUrl: process.env.POSTGRES_URL || DEFAULTS.postgresUrl,
|
||||||
|
|
|
||||||
43
test/armed-state-store.test.mjs
Normal file
43
test/armed-state-store.test.mjs
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
import test from 'node:test';
|
||||||
|
import assert from 'node:assert/strict';
|
||||||
|
import fs from 'node:fs';
|
||||||
|
import os from 'node:os';
|
||||||
|
import path from 'node:path';
|
||||||
|
|
||||||
|
import { createArmedStateStore } from '../src/core/armed-state-store.mjs';
|
||||||
|
|
||||||
|
test('armed state store persists arm changes across reload', () => {
|
||||||
|
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), 'unrip-armed-state-'));
|
||||||
|
const store = createArmedStateStore({
|
||||||
|
stateDir,
|
||||||
|
fileName: 'strategy-engine-control.json',
|
||||||
|
initialArmed: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(store.isArmed(), false);
|
||||||
|
|
||||||
|
const armedState = store.setArmed(true);
|
||||||
|
assert.equal(armedState.armed, true);
|
||||||
|
assert.ok(armedState.updated_at);
|
||||||
|
|
||||||
|
const reloaded = createArmedStateStore({
|
||||||
|
stateDir,
|
||||||
|
fileName: 'strategy-engine-control.json',
|
||||||
|
initialArmed: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(reloaded.isArmed(), true);
|
||||||
|
assert.equal(reloaded.getState().armed, true);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('armed state store falls back to initial state when no persisted file exists', () => {
|
||||||
|
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), 'unrip-armed-state-default-'));
|
||||||
|
const store = createArmedStateStore({
|
||||||
|
stateDir,
|
||||||
|
fileName: 'trade-executor-control.json',
|
||||||
|
initialArmed: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(store.isArmed(), true);
|
||||||
|
assert.equal(store.getState().updated_at, null);
|
||||||
|
});
|
||||||
Loading…
Add table
Reference in a new issue