import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { createArmedStateStore } from '../core/armed-state-store.mjs'; import { startControlApi } from '../core/control-api.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createExecutorStateStore } from '../core/executor-state-store.mjs'; import { createIntentRequestController } from '../core/intent-request-controller.mjs'; import { createLogger, serializeError } from '../core/log.mjs'; import { assertExecuteTradeCommand, assertIntentRequestPreflightEvent, assertIntentRequestSubmissionResultEvent, assertTradeResult, } from '../core/schemas.mjs'; import { loadConfig } from '../lib/config.mjs'; import { createPostgresPool, createTradingConfigStore, ensureHistorySchema, insertHistoryEvent, loadIntentRequestPreflightByIdOrKey, loadIntentRequestSubmissionsForStatusRefresh, loadLatestIntentRequestSubmission, loadLatestInventorySnapshot, loadLatestMarketPrice, refreshIntentRequestOutcomes, seedTradingConfig, } from '../lib/postgres.mjs'; import { buildQuoteResponseSubmission } from '../venues/near-intents/signing.mjs'; import { startSolverRelayWs } from '../venues/near-intents/solver-relay-ws.mjs'; import { createSolverRelayRpcClient, createVerifierClient } from '../venues/near-intents/verifier-client.mjs'; import { ageMs } from '../core/runtime-health.mjs'; const config = loadConfig(); const logger = createLogger({ service: 'trade-executor', component: 'executor', namespace: config.projectNamespace, venue: 'near-intents', }); if (!config.nearIntentsApiKey || !config.nearIntentsAccountId || !config.nearIntentsSignerPrivateKey) { logger.error('missing_executor_config', { details: { required: [ 'NEAR_INTENTS_API_KEY', 'NEAR_INTENTS_ACCOUNT_ID', 'NEAR_INTENTS_SIGNER_PRIVATE_KEY', ], }, }); process.exit(1); } const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupExecutor, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const producer = await createProducer({ brokers: config.kafkaBrokers, clientId: config.kafkaClientId, logger, }); const verifierClient = createVerifierClient({ nearRpcUrl: config.nearRpcUrl, verifierContract: config.nearVerifierContract, signerPrivateKey: config.nearIntentsSignerPrivateKey, }); const signer = verifierClient.getSigner(); const solverRelayRpcClient = createSolverRelayRpcClient({ rpcUrl: config.nearIntentsRpcUrl, apiKey: config.nearIntentsApiKey, }); const requestPool = createPostgresPool({ connectionString: config.postgresUrl, }); await ensureHistorySchema(requestPool); await seedTradingConfig(requestPool); const tradingConfigStore = createTradingConfigStore({ pool: requestPool, logger: logger.child({ component: 'trading-config' }), }); await tradingConfigStore.forceRefresh(); const relayClient = await startSolverRelayWs({ apiKey: config.nearIntentsApiKey, wsUrl: config.nearIntentsWsUrl, logger: logger.child({ component: 'solver-relay' }), subscriptions: ['quote_status'], onEvent(payload) { state.last_quote_status = payload?.params || payload?.result || payload; }, }); const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir }); const armedStateStore = createArmedStateStore({ stateDir: config.executorStateDir, fileName: 'trade-executor-control.json', initialArmed: config.executorInitialArmed, }); const state = { armed: armedStateStore.isArmed(), paused: false, draining: false, last_command: null, last_request: null, last_venue_response: null, last_quote_status: null, last_error: null, in_flight_count: 0, submitted_count: 0, request_creation: { last_preflight: null, last_submission_result: null, preflight_count: 0, accepted_count: 0, blocked_count: 0, failed_count: 0, maker_suppressed: false, maker_suppressed_until: null, }, }; const requestController = createIntentRequestController({ config, store: createIntentRequestStore(), relayRpcClient: solverRelayRpcClient, verifierClient, signer, isArmed: () => state.armed, isPaused: () => state.paused, getTradingConfig: () => tradingConfigStore.getConfig(), withMakerSuppressed, logger: logger.child({ component: 'intent-request-controller' }), }); async function withMakerSuppressed(operation) { const previousSuppressed = state.request_creation.maker_suppressed; const previousUntil = state.request_creation.maker_suppressed_until; state.request_creation.maker_suppressed = true; state.request_creation.maker_suppressed_until = new Date(Date.now() + (config.intentRequestQuoteTimeoutMs || config.executorResponseTimeoutMs || 10_000)).toISOString(); try { return await operation(); } finally { state.request_creation.maker_suppressed = previousSuppressed; state.request_creation.maker_suppressed_until = previousUntil; } } function isMakerSuppressedForOwnRequest() { return state.request_creation.maker_suppressed === true; } await consumer.subscribe({ topic: config.kafkaTopicCmdExecuteTrade, fromBeginning: false }); await consumer.run({ eachMessage: async ({ message }) => { if (!message.value) return; try { const event = parseEventMessage(message.value.toString()); assertExecuteTradeCommand(event); await handleCommand(event); } catch (error) { state.last_error = serializeError(error); logger.error('executor_message_failed', { topic: config.kafkaTopicCmdExecuteTrade, details: { error: serializeError(error), }, }); } }, }); async function handleCommand(event) { const payload = event.payload; state.last_command = payload; const existing = stateStore.get(payload.command_id); if (existing?.status === 'submitted') { logger.warn('duplicate_command_skipped', { topic: config.kafkaTopicCmdExecuteTrade, pair: payload.pair, details: { command_id: payload.command_id, }, }); return; } if (isMakerSuppressedForOwnRequest()) { await publishResult(payload, { status: 'rejected', result_code: 'own_request_preflight_in_progress', note: 'Own request preflight is suppressing maker quote responses to avoid self-matching.', }); return; } if (state.paused) return; if (!state.armed) { await publishResult(payload, { status: 'rejected', result_code: 'executor_disarmed', note: 'executor is disarmed', }); return; } stateStore.markProcessing(payload.command_id, { quote_id: payload.quote_id, idempotency_key: payload.idempotency_key, execution_key: payload.execution_key, }); state.in_flight_count += 1; try { const currentSaltHex = await verifierClient.currentSalt(); const submission = buildQuoteResponseSubmission({ command: payload, signerAccountId: config.nearIntentsAccountId, signer, verifierContract: config.nearVerifierContract, currentSaltHex, }); state.last_request = submission; const response = await relayClient.request('quote_response', [submission], { timeoutMs: config.executorResponseTimeoutMs, }); state.last_venue_response = response; state.last_error = null; await publishResult(payload, { status: 'submitted', result_code: response === 'OK' ? 'quote_response_ok' : 'quote_response_ack', venue_response: response, }); stateStore.markSubmitted(payload.command_id, { quote_id: payload.quote_id, result: response, }); state.submitted_count += 1; } catch (error) { state.last_error = serializeError(error); stateStore.markFailed(payload.command_id, { quote_id: payload.quote_id, error: serializeError(error), }); await publishResult(payload, { status: 'failed', result_code: 'submission_failed', error: serializeError(error), }); } finally { state.in_flight_count = Math.max(0, state.in_flight_count - 1); if (state.draining && state.in_flight_count === 0) { setTimeout(() => shutdown(), 0); } } } async function publishResult(command, extraPayload) { const event = buildEventEnvelope({ source: 'trade-executor', venue: 'near-intents', eventType: 'trade_result', payload: { command_id: command.command_id, decision_id: command.decision_id, idempotency_key: command.idempotency_key, execution_key: command.execution_key, quote_id: command.quote_id, pair: command.pair, pair_id: command.pair_id || null, pair_config_id: command.pair_config_id || null, pair_config_version: command.pair_config_version || null, edge_bps: command.edge_bps || null, max_notional: command.max_notional || null, price_route_id: command.price_route_id || null, ...extraPayload, }, }); assertTradeResult(event); await producer.sendJson(config.kafkaTopicExecTradeResult, event, { key: command.execution_key }); } const controlApi = startControlApi({ host: config.tradeExecutorControlHost, port: config.tradeExecutorControlPort, logger: logger.child({ component: 'control-api' }), service: 'trade-executor', namespace: config.projectNamespace, stateProvider: { async getState() { const signerRegistered = await verifierClient.isPublicKeyRegistered({ accountId: config.nearIntentsAccountId, }).catch(() => null); return { account_id: config.nearIntentsAccountId, signer_public_key: signer.getPublicKey().toString(), signer_registered: signerRegistered, relay: relayClient.getState(), trading_config: tradingConfigStore.getState(), ...state, durable_control_state: armedStateStore.getState(), durable_state: stateStore.getState(), }; }, }, healthProvider: { getHealth() { const relay = relayClient.getState(); const freshnessAgeMs = ageMs(relay.last_message_at); return { ok: relay.connected && tradingConfigStore.getState().ok === true && (freshnessAgeMs == null || freshnessAgeMs <= config.opsSentinelExecutorRelayStaleMs), connected: relay.connected, trading_config_ok: tradingConfigStore.getState().ok, trading_config_block_reason: tradingConfigStore.getState().block_reason, relay_last_message_at: relay.last_message_at, relay_freshness_age_ms: freshnessAgeMs, paused: state.paused, armed: state.armed, reason: tradingConfigStore.getState().ok !== true ? tradingConfigStore.getState().block_reason || 'trading config unavailable' : relay.connected ? freshnessAgeMs != null && freshnessAgeMs > config.opsSentinelExecutorRelayStaleMs ? 'solver relay stale' : null : 'solver relay disconnected', }; }, }, routes: [ { method: 'POST', path: '/reconnect', handler: () => { relayClient.reconnect(); return { ok: true, reconnecting: true }; }, }, { method: 'POST', path: '/arm', handler: () => { state.armed = armedStateStore.setArmed(true).armed; return { ok: true, armed: true }; }, }, { method: 'POST', path: '/disarm', handler: () => { state.armed = armedStateStore.setArmed(false).armed; return { ok: true, armed: false }; }, }, { method: 'POST', path: '/pause', handler: () => { state.paused = true; consumer.pause([{ topic: config.kafkaTopicCmdExecuteTrade }]); return { ok: true, paused: true }; }, }, { method: 'POST', path: '/resume', handler: () => { state.paused = false; consumer.resume([{ topic: config.kafkaTopicCmdExecuteTrade }]); return { ok: true, paused: false }; }, }, { method: 'POST', path: '/intent-request/preflight', handler: async ({ body }) => { const result = await requestController.preflight(body || {}); state.request_creation.last_preflight = result; state.request_creation.preflight_count += 1; if (result.state === 'blocked') state.request_creation.blocked_count += 1; return result; }, }, { method: 'POST', path: '/intent-request/submit', handler: async ({ body }) => { const result = await requestController.submit(body || {}); if (result?.statusCode != null) return result; state.request_creation.last_submission_result = result.submission_result || null; const status = result.submission_result?.status; if (status === 'accepted_by_relay') state.request_creation.accepted_count += 1; if (status === 'blocked') state.request_creation.blocked_count += 1; if (status === 'failed') state.request_creation.failed_count += 1; return result; }, }, { method: 'POST', path: '/intent-request/refresh-outcomes', handler: async () => requestController.refreshOutcomes(), }, { method: 'POST', path: '/drain', handler: () => { state.draining = true; state.paused = true; consumer.pause([{ topic: config.kafkaTopicCmdExecuteTrade }]); if (state.in_flight_count === 0) { setTimeout(() => shutdown(), 0); } return { ok: true, draining: true }; }, }, ], }); function createIntentRequestStore() { return { loadLatestInventorySnapshot: () => loadLatestInventorySnapshot(requestPool), loadLatestMarketPrice: () => loadLatestMarketPrice(requestPool), findPreflight: ({ requestId = null, idempotencyKey = null } = {}) => ( loadIntentRequestPreflightByIdOrKey(requestPool, { requestId, idempotencyKey }) ), findSubmissionByRequest: ({ requestId } = {}) => ( loadLatestIntentRequestSubmission(requestPool, { requestId }) ), loadSubmissionsForStatusRefresh: () => ( loadIntentRequestSubmissionsForStatusRefresh(requestPool, { limit: 20 }) ), async insertPreflight(payload) { const event = buildEventEnvelope({ source: 'trade-executor', venue: 'near-intents', eventType: 'intent_request_preflight', observedAt: payload.created_at, payload, }); assertIntentRequestPreflightEvent(event); await insertHistoryEvent(requestPool, { table: 'intent_request_preflights', topic: 'intent.request.preflight', event, record: { quote_id: null, pair: payload.pair || payload.source_asset_id + '->' + payload.destination_asset_id, decision_key: payload.request_id, }, }); }, async insertSubmissionResult(payload) { const event = buildEventEnvelope({ source: 'trade-executor', venue: 'near-intents', eventType: 'intent_request_submission_result', observedAt: payload.status_checked_at || payload.submitted_at, payload, }); assertIntentRequestSubmissionResultEvent(event); await insertHistoryEvent(requestPool, { table: 'intent_request_submission_results', topic: 'intent.request.submission_result', event, record: { quote_id: null, pair: payload.pair || payload.source_asset_id + '->' + payload.destination_asset_id, decision_key: payload.request_id, }, }); }, refreshOutcomes: async () => { const tradingConfig = await tradingConfigStore.getConfig(); if (!tradingConfig.ok || !tradingConfig.tradingBtc || !tradingConfig.tradingEure) { throw new Error(`trading config unavailable: ${tradingConfig.blockReason || 'missing current assets'}`); } return refreshIntentRequestOutcomes(requestPool, { btcAsset: tradingConfig.tradingBtc, eureAsset: tradingConfig.tradingEure, }); }, }; } async function shutdown() { await controlApi.close().catch(() => {}); relayClient.close(); await consumer.disconnect(); await producer.disconnect(); await requestPool.end().catch(() => {}); process.exit(0); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);