import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { createArmedStateStore } from '../core/armed-state-store.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { assertInventorySnapshotEvent, assertMarketPriceEvent, assertNormalizedSwapDemand } from '../core/schemas.mjs'; import { evaluateTradeOpportunity } from '../core/strategy.mjs'; import { loadConfig } from '../lib/config.mjs'; const config = loadConfig(); const logger = createLogger({ service: 'strategy-engine', component: 'strategy', namespace: config.projectNamespace, venue: 'near-intents', }); const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupStrategy, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const producer = await createProducer({ brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const armedStateStore = createArmedStateStore({ stateDir: config.strategyStateDir, fileName: 'strategy-engine-control.json', initialArmed: config.strategyInitialArmed, }); await consumer.subscribe({ topic: config.kafkaTopicNormSwapDemand, fromBeginning: false }); await consumer.subscribe({ topic: config.kafkaTopicRefMarketPrice, fromBeginning: false }); await consumer.subscribe({ topic: config.kafkaTopicStateIntentInventory, fromBeginning: false }); const state = { armed: armedStateStore.isArmed(), paused: false, threshold_pct: config.strategyGrossThresholdPct, max_notional_eure: config.strategyMaxNotionalEure, latest_price_event: null, latest_inventory_event: null, latest_decision: null, recent_decisions: [], skipped_counts: {}, seen_quotes: {}, }; await consumer.run({ eachMessage: async ({ topic, message }) => { if (!message.value) return; try { const event = parseEventMessage(message.value.toString()); if (topic === config.kafkaTopicRefMarketPrice) { assertMarketPriceEvent(event); state.latest_price_event = event; return; } if (topic === config.kafkaTopicStateIntentInventory) { assertInventorySnapshotEvent(event); state.latest_inventory_event = event; return; } assertNormalizedSwapDemand(event); await handleDemand(event); } catch (error) { logger.error('strategy_message_failed', { topic, pair: config.activePair, details: { error: serializeError(error), }, }); } }, }); async function handleDemand(event) { if (state.paused) return; if (state.seen_quotes[event.payload.quote_id]) { await publishDecision({ decision_id: `duplicate-${event.payload.quote_id}`, quote_id: event.payload.quote_id, pair: event.payload.pair || `${event.payload.asset_in}->${event.payload.asset_out}`, direction: 'duplicate', request_kind: event.payload.request_kind, decision: 'rejected', decision_reason: 'duplicate_quote_id', threshold_pct: String(state.threshold_pct), max_notional_eure: String(state.max_notional_eure), strategy_armed: state.armed, }); return; } state.seen_quotes[event.payload.quote_id] = true; const evaluation = evaluateTradeOpportunity({ demandEvent: event, priceEvent: state.latest_price_event, inventoryEvent: state.latest_inventory_event, config, armed: state.armed, thresholdPct: state.threshold_pct, maxNotionalEure: state.max_notional_eure, }); await publishDecision(evaluation.decision); if (evaluation.command) { const commandEvent = buildEventEnvelope({ source: 'strategy-engine', venue: 'near-intents', eventType: 'execute_trade', observedAt: event.observed_at, payload: evaluation.command, }); await producer.sendJson(config.kafkaTopicCmdExecuteTrade, commandEvent, { key: evaluation.command.execution_key }); } } async function publishDecision(decisionPayload) { const decisionAt = decisionPayload.decision_at || new Date().toISOString(); const normalizedDecisionPayload = { ...decisionPayload, decision_at: decisionAt, }; const event = buildEventEnvelope({ source: 'strategy-engine', venue: 'near-intents', eventType: 'trade_decision', observedAt: decisionAt, payload: normalizedDecisionPayload, }); await producer.sendJson(config.kafkaTopicDecisionTradeDecision, event, { key: normalizedDecisionPayload.quote_id }); state.latest_decision = normalizedDecisionPayload; state.recent_decisions.unshift(normalizedDecisionPayload); state.recent_decisions = state.recent_decisions.slice(0, 20); state.skipped_counts[normalizedDecisionPayload.decision_reason] = (state.skipped_counts[normalizedDecisionPayload.decision_reason] || 0) + 1; } const controlApi = startControlApi({ host: config.strategyEngineControlHost, port: config.strategyEngineControlPort, logger: logger.child({ component: 'control-api' }), service: 'strategy-engine', namespace: config.projectNamespace, stateProvider: { getState() { return { ...state, durable_control_state: armedStateStore.getState(), }; }, }, routes: [ { method: 'POST', path: '/arm', handler: () => { state.armed = armedStateStore.setArmed(true).armed; logger.warn('strategy_armed', { pair: config.activePair }); return { ok: true, armed: true }; }, }, { method: 'POST', path: '/disarm', handler: () => { state.armed = armedStateStore.setArmed(false).armed; logger.warn('strategy_disarmed', { pair: config.activePair }); return { ok: true, armed: false }; }, }, { method: 'POST', path: '/pause', handler: () => { state.paused = true; consumer.pause([ { topic: config.kafkaTopicNormSwapDemand }, { topic: config.kafkaTopicRefMarketPrice }, { topic: config.kafkaTopicStateIntentInventory }, ]); return { ok: true, paused: true }; }, }, { method: 'POST', path: '/resume', handler: () => { state.paused = false; consumer.resume([ { topic: config.kafkaTopicNormSwapDemand }, { topic: config.kafkaTopicRefMarketPrice }, { topic: config.kafkaTopicStateIntentInventory }, ]); return { ok: true, paused: false }; }, }, { method: 'PUT', path: '/threshold', handler: ({ body }) => { const next = Number(body.threshold_pct); if (!Number.isFinite(next) || next <= 0) { return { statusCode: 400, payload: { error: 'threshold_pct must be > 0' } }; } state.threshold_pct = next; return { ok: true, threshold_pct: next }; }, }, { method: 'PUT', path: '/limits', handler: ({ body }) => { const next = Number(body.max_notional_eure); if (!Number.isFinite(next) || next <= 0) { return { statusCode: 400, payload: { error: 'max_notional_eure must be > 0' } }; } state.max_notional_eure = next; return { ok: true, max_notional_eure: next }; }, }, ], }); async function shutdown() { await controlApi.close().catch(() => {}); await consumer.disconnect(); await producer.disconnect(); process.exit(0); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);