import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createExecutorStateStore } from '../core/executor-state-store.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { assertExecuteTradeCommand, assertTradeResult } from '../core/schemas.mjs'; import { loadConfig } from '../lib/config.mjs'; import { buildQuoteResponseSubmission } from '../venues/near-intents/signing.mjs'; import { startSolverRelayWs } from '../venues/near-intents/solver-relay-ws.mjs'; import { createVerifierClient } from '../venues/near-intents/verifier-client.mjs'; const config = loadConfig(); const logger = createLogger({ service: 'trade-executor', component: 'executor', namespace: config.projectNamespace, venue: 'near-intents', }); if (!config.nearIntentsApiKey || !config.nearIntentsAccountId || !config.nearIntentsSignerPrivateKey) { logger.error('missing_executor_config', { details: { required: [ 'NEAR_INTENTS_API_KEY', 'NEAR_INTENTS_ACCOUNT_ID', 'NEAR_INTENTS_SIGNER_PRIVATE_KEY', ], }, }); process.exit(1); } const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupExecutor, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const producer = await createProducer({ brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const verifierClient = createVerifierClient({ nearRpcUrl: config.nearRpcUrl, verifierContract: config.nearVerifierContract, signerPrivateKey: config.nearIntentsSignerPrivateKey, }); const signer = verifierClient.getSigner(); const relayClient = await startSolverRelayWs({ apiKey: config.nearIntentsApiKey, wsUrl: config.nearIntentsWsUrl, logger: logger.child({ component: 'solver-relay' }), subscriptions: ['quote_status'], onEvent(payload) { state.last_quote_status = payload?.params || payload?.result || payload; }, }); const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir }); const state = { armed: config.executorInitialArmed, paused: false, draining: false, last_command: null, last_request: null, last_venue_response: null, last_quote_status: null, last_error: null, in_flight_count: 0, completed_count: 0, }; await consumer.subscribe({ topic: config.kafkaTopicCmdExecuteTrade, fromBeginning: false }); await consumer.run({ eachMessage: async ({ message }) => { if (!message.value) return; try { const event = parseEventMessage(message.value.toString()); assertExecuteTradeCommand(event); await handleCommand(event); } catch (error) { state.last_error = serializeError(error); logger.error('executor_message_failed', { topic: config.kafkaTopicCmdExecuteTrade, details: { error: serializeError(error), }, }); } }, }); async function handleCommand(event) { const payload = event.payload; state.last_command = payload; const existing = stateStore.get(payload.command_id); if (existing?.status === 'completed') { logger.warn('duplicate_command_skipped', { topic: config.kafkaTopicCmdExecuteTrade, pair: payload.pair, details: { command_id: payload.command_id, }, }); return; } if (state.paused) return; if (!state.armed) { await publishResult(payload, { status: 'rejected', result_code: 'executor_disarmed', note: 'executor is disarmed', }); return; } stateStore.markProcessing(payload.command_id, { quote_id: payload.quote_id, idempotency_key: payload.idempotency_key, execution_key: payload.execution_key, }); state.in_flight_count += 1; try { const currentSaltHex = await verifierClient.currentSalt(); const submission = buildQuoteResponseSubmission({ command: payload, signerAccountId: config.nearIntentsAccountId, signer, verifierContract: config.nearVerifierContract, currentSaltHex, }); state.last_request = submission; const response = await relayClient.request('quote_response', [submission], { timeoutMs: config.executorResponseTimeoutMs, }); state.last_venue_response = response; state.last_error = null; await publishResult(payload, { status: 'submitted', result_code: response === 'OK' ? 'quote_response_ok' : 'quote_response_ack', venue_response: response, }); stateStore.markCompleted(payload.command_id, { quote_id: payload.quote_id, result: response, }); state.completed_count += 1; } catch (error) { state.last_error = serializeError(error); stateStore.markFailed(payload.command_id, { quote_id: payload.quote_id, error: serializeError(error), }); await publishResult(payload, { status: 'failed', result_code: 'submission_failed', error: serializeError(error), }); } finally { state.in_flight_count = Math.max(0, state.in_flight_count - 1); if (state.draining && state.in_flight_count === 0) { setTimeout(() => shutdown(), 0); } } } async function publishResult(command, extraPayload) { const event = buildEventEnvelope({ source: 'trade-executor', venue: 'near-intents', eventType: 'trade_result', payload: { command_id: command.command_id, decision_id: command.decision_id, idempotency_key: command.idempotency_key, execution_key: command.execution_key, quote_id: command.quote_id, pair: command.pair, ...extraPayload, }, }); assertTradeResult(event); await producer.sendJson(config.kafkaTopicExecTradeResult, event, { key: command.execution_key }); } const controlApi = startControlApi({ host: config.tradeExecutorControlHost, port: config.tradeExecutorControlPort, logger: logger.child({ component: 'control-api' }), service: 'trade-executor', namespace: config.projectNamespace, stateProvider: { async getState() { const signerRegistered = await verifierClient.isPublicKeyRegistered({ accountId: config.nearIntentsAccountId, }).catch(() => null); return { account_id: config.nearIntentsAccountId, signer_public_key: signer.getPublicKey().toString(), signer_registered: signerRegistered, ...state, durable_state: stateStore.getState(), }; }, }, routes: [ { method: 'POST', path: '/arm', handler: () => { state.armed = true; return { ok: true, armed: true }; }, }, { method: 'POST', path: '/disarm', handler: () => { state.armed = false; return { ok: true, armed: false }; }, }, { method: 'POST', path: '/pause', handler: () => { state.paused = true; consumer.pause([{ topic: config.kafkaTopicCmdExecuteTrade }]); return { ok: true, paused: true }; }, }, { method: 'POST', path: '/resume', handler: () => { state.paused = false; consumer.resume([{ topic: config.kafkaTopicCmdExecuteTrade }]); return { ok: true, paused: false }; }, }, { method: 'POST', path: '/drain', handler: () => { state.draining = true; state.paused = true; consumer.pause([{ topic: config.kafkaTopicCmdExecuteTrade }]); if (state.in_flight_count === 0) { setTimeout(() => shutdown(), 0); } return { ok: true, draining: true }; }, }, ], }); async function shutdown() { await controlApi.close().catch(() => {}); relayClient.close(); await consumer.disconnect(); await producer.disconnect(); process.exit(0); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);