Some checks failed
deploy / deploy (push) Failing after 52s
Proof: npm test (209/209) covers stale command expiry, bounded executor state, bounded strategy quote cache, bounded quote outcome refresh, and resource guardrails. Assumptions: current DB pair config and armed state remain the operator-approved live trading controls; stale quote commands are unsafe to submit after their min_deadline_ms. Still fake: quote outcomes still infer fills from inventory deltas rather than a venue-native terminal fill event.
528 lines
17 KiB
JavaScript
528 lines
17 KiB
JavaScript
import process from 'node:process';
|
|
|
|
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
|
import { createProducer } from '../bus/kafka/producer.mjs';
|
|
import { createArmedStateStore } from '../core/armed-state-store.mjs';
|
|
import { startControlApi } from '../core/control-api.mjs';
|
|
import { classifyExecuteCommandExpiry } from '../core/executor-command-expiry.mjs';
|
|
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
|
|
import { createExecutorStateStore } from '../core/executor-state-store.mjs';
|
|
import { createIntentRequestController } from '../core/intent-request-controller.mjs';
|
|
import { createLogger, serializeError } from '../core/log.mjs';
|
|
import {
|
|
assertExecuteTradeCommand,
|
|
assertIntentRequestPreflightEvent,
|
|
assertIntentRequestSubmissionResultEvent,
|
|
assertTradeResult,
|
|
} from '../core/schemas.mjs';
|
|
import { loadConfig } from '../lib/config.mjs';
|
|
import {
|
|
createPostgresPool,
|
|
createTradingConfigStore,
|
|
ensureHistorySchema,
|
|
insertHistoryEvent,
|
|
loadIntentRequestPreflightByIdOrKey,
|
|
loadIntentRequestSubmissionsForStatusRefresh,
|
|
loadLatestIntentRequestSubmission,
|
|
loadLatestInventorySnapshot,
|
|
loadLatestMarketPrice,
|
|
refreshIntentRequestOutcomes,
|
|
seedTradingConfig,
|
|
} from '../lib/postgres.mjs';
|
|
import { buildQuoteResponseSubmission } from '../venues/near-intents/signing.mjs';
|
|
import { startSolverRelayWs } from '../venues/near-intents/solver-relay-ws.mjs';
|
|
import { createSolverRelayRpcClient, createVerifierClient } from '../venues/near-intents/verifier-client.mjs';
|
|
import { ageMs } from '../core/runtime-health.mjs';
|
|
|
|
const config = loadConfig();
|
|
const logger = createLogger({
|
|
service: 'trade-executor',
|
|
component: 'executor',
|
|
namespace: config.projectNamespace,
|
|
venue: 'near-intents',
|
|
});
|
|
|
|
if (!config.nearIntentsApiKey || !config.nearIntentsAccountId || !config.nearIntentsSignerPrivateKey) {
|
|
logger.error('missing_executor_config', {
|
|
details: {
|
|
required: [
|
|
'NEAR_INTENTS_API_KEY',
|
|
'NEAR_INTENTS_ACCOUNT_ID',
|
|
'NEAR_INTENTS_SIGNER_PRIVATE_KEY',
|
|
],
|
|
},
|
|
});
|
|
process.exit(1);
|
|
}
|
|
|
|
const consumer = await createConsumer({
|
|
groupId: config.kafkaConsumerGroupExecutor,
|
|
brokers: config.kafkaBrokers,
|
|
clientId: config.kafkaClientId,
|
|
logger,
|
|
});
|
|
const producer = await createProducer({
|
|
brokers: config.kafkaBrokers,
|
|
clientId: config.kafkaClientId,
|
|
logger,
|
|
});
|
|
|
|
const verifierClient = createVerifierClient({
|
|
nearRpcUrl: config.nearRpcUrl,
|
|
verifierContract: config.nearVerifierContract,
|
|
signerPrivateKey: config.nearIntentsSignerPrivateKey,
|
|
});
|
|
const signer = verifierClient.getSigner();
|
|
const solverRelayRpcClient = createSolverRelayRpcClient({
|
|
rpcUrl: config.nearIntentsRpcUrl,
|
|
apiKey: config.nearIntentsApiKey,
|
|
});
|
|
const requestPool = createPostgresPool({
|
|
connectionString: config.postgresUrl,
|
|
});
|
|
await ensureHistorySchema(requestPool);
|
|
await seedTradingConfig(requestPool);
|
|
const tradingConfigStore = createTradingConfigStore({
|
|
pool: requestPool,
|
|
logger: logger.child({ component: 'trading-config' }),
|
|
});
|
|
await tradingConfigStore.forceRefresh();
|
|
const relayClient = await startSolverRelayWs({
|
|
apiKey: config.nearIntentsApiKey,
|
|
wsUrl: config.nearIntentsWsUrl,
|
|
logger: logger.child({ component: 'solver-relay' }),
|
|
subscriptions: ['quote_status'],
|
|
onEvent(payload) {
|
|
state.last_quote_status = payload?.params || payload?.result || payload;
|
|
},
|
|
});
|
|
const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir });
|
|
const armedStateStore = createArmedStateStore({
|
|
stateDir: config.executorStateDir,
|
|
fileName: 'trade-executor-control.json',
|
|
initialArmed: config.executorInitialArmed,
|
|
});
|
|
|
|
const state = {
|
|
armed: armedStateStore.isArmed(),
|
|
paused: false,
|
|
draining: false,
|
|
last_command: null,
|
|
last_request: null,
|
|
last_venue_response: null,
|
|
last_quote_status: null,
|
|
last_error: null,
|
|
in_flight_count: 0,
|
|
submitted_count: 0,
|
|
request_creation: {
|
|
last_preflight: null,
|
|
last_submission_result: null,
|
|
preflight_count: 0,
|
|
accepted_count: 0,
|
|
blocked_count: 0,
|
|
failed_count: 0,
|
|
maker_suppressed: false,
|
|
maker_suppressed_until: null,
|
|
},
|
|
};
|
|
|
|
const requestController = createIntentRequestController({
|
|
config,
|
|
store: createIntentRequestStore(),
|
|
relayRpcClient: solverRelayRpcClient,
|
|
verifierClient,
|
|
signer,
|
|
isArmed: () => state.armed,
|
|
isPaused: () => state.paused,
|
|
getTradingConfig: () => tradingConfigStore.getConfig(),
|
|
withMakerSuppressed,
|
|
logger: logger.child({ component: 'intent-request-controller' }),
|
|
});
|
|
|
|
async function withMakerSuppressed(operation) {
|
|
const previousSuppressed = state.request_creation.maker_suppressed;
|
|
const previousUntil = state.request_creation.maker_suppressed_until;
|
|
state.request_creation.maker_suppressed = true;
|
|
state.request_creation.maker_suppressed_until = new Date(Date.now() + (config.intentRequestQuoteTimeoutMs || config.executorResponseTimeoutMs || 10_000)).toISOString();
|
|
try {
|
|
return await operation();
|
|
} finally {
|
|
state.request_creation.maker_suppressed = previousSuppressed;
|
|
state.request_creation.maker_suppressed_until = previousUntil;
|
|
}
|
|
}
|
|
|
|
function isMakerSuppressedForOwnRequest() {
|
|
return state.request_creation.maker_suppressed === true;
|
|
}
|
|
|
|
await consumer.subscribe({ topic: config.kafkaTopicCmdExecuteTrade, fromBeginning: false });
|
|
await consumer.run({
|
|
eachMessage: async ({ message }) => {
|
|
if (!message.value) return;
|
|
try {
|
|
const event = parseEventMessage(message.value.toString());
|
|
assertExecuteTradeCommand(event);
|
|
await handleCommand(event);
|
|
} catch (error) {
|
|
state.last_error = serializeError(error);
|
|
logger.error('executor_message_failed', {
|
|
topic: config.kafkaTopicCmdExecuteTrade,
|
|
details: {
|
|
error: serializeError(error),
|
|
},
|
|
});
|
|
}
|
|
},
|
|
});
|
|
|
|
async function handleCommand(event) {
|
|
const payload = event.payload;
|
|
state.last_command = payload;
|
|
|
|
const existing = stateStore.get(payload.command_id);
|
|
if (existing?.status === 'submitted') {
|
|
logger.warn('duplicate_command_skipped', {
|
|
topic: config.kafkaTopicCmdExecuteTrade,
|
|
pair: payload.pair,
|
|
details: {
|
|
command_id: payload.command_id,
|
|
},
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (isMakerSuppressedForOwnRequest()) {
|
|
await publishResult(payload, {
|
|
status: 'rejected',
|
|
result_code: 'own_request_preflight_in_progress',
|
|
note: 'Own request preflight is suppressing maker quote responses to avoid self-matching.',
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (state.paused) return;
|
|
|
|
if (!state.armed) {
|
|
await publishResult(payload, {
|
|
status: 'rejected',
|
|
result_code: 'executor_disarmed',
|
|
note: 'executor is disarmed',
|
|
});
|
|
return;
|
|
}
|
|
|
|
const expiry = classifyExecuteCommandExpiry(event);
|
|
if (expiry.expired) {
|
|
stateStore.markFailed(payload.command_id, {
|
|
quote_id: payload.quote_id,
|
|
error: {
|
|
name: 'StaleExecuteTradeCommand',
|
|
message: expiry.reason,
|
|
},
|
|
});
|
|
await publishResult(payload, {
|
|
status: 'rejected',
|
|
result_code: 'stale_execute_command',
|
|
note: 'execute command deadline elapsed before relay submission',
|
|
command_age_ms: expiry.age_ms == null ? null : String(expiry.age_ms),
|
|
command_deadline_ms: expiry.deadline_ms == null ? null : String(expiry.deadline_ms),
|
|
command_deadline_at: expiry.deadline_at,
|
|
stale_reason: expiry.reason,
|
|
});
|
|
return;
|
|
}
|
|
|
|
stateStore.markProcessing(payload.command_id, {
|
|
quote_id: payload.quote_id,
|
|
idempotency_key: payload.idempotency_key,
|
|
execution_key: payload.execution_key,
|
|
});
|
|
state.in_flight_count += 1;
|
|
|
|
try {
|
|
const currentSaltHex = await verifierClient.currentSalt();
|
|
const submission = buildQuoteResponseSubmission({
|
|
command: payload,
|
|
signerAccountId: config.nearIntentsAccountId,
|
|
signer,
|
|
verifierContract: config.nearVerifierContract,
|
|
currentSaltHex,
|
|
});
|
|
state.last_request = submission;
|
|
|
|
const response = await relayClient.request('quote_response', [submission], {
|
|
timeoutMs: config.executorResponseTimeoutMs,
|
|
});
|
|
state.last_venue_response = response;
|
|
state.last_error = null;
|
|
|
|
await publishResult(payload, {
|
|
status: 'submitted',
|
|
result_code: response === 'OK' ? 'quote_response_ok' : 'quote_response_ack',
|
|
venue_response: response,
|
|
});
|
|
stateStore.markSubmitted(payload.command_id, {
|
|
quote_id: payload.quote_id,
|
|
result: response,
|
|
});
|
|
state.submitted_count += 1;
|
|
} catch (error) {
|
|
state.last_error = serializeError(error);
|
|
stateStore.markFailed(payload.command_id, {
|
|
quote_id: payload.quote_id,
|
|
error: serializeError(error),
|
|
});
|
|
await publishResult(payload, {
|
|
status: 'failed',
|
|
result_code: 'submission_failed',
|
|
error: serializeError(error),
|
|
});
|
|
} finally {
|
|
state.in_flight_count = Math.max(0, state.in_flight_count - 1);
|
|
if (state.draining && state.in_flight_count === 0) {
|
|
setTimeout(() => shutdown(), 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
async function publishResult(command, extraPayload) {
|
|
const event = buildEventEnvelope({
|
|
source: 'trade-executor',
|
|
venue: 'near-intents',
|
|
eventType: 'trade_result',
|
|
payload: {
|
|
command_id: command.command_id,
|
|
decision_id: command.decision_id,
|
|
idempotency_key: command.idempotency_key,
|
|
execution_key: command.execution_key,
|
|
quote_id: command.quote_id,
|
|
pair: command.pair,
|
|
pair_id: command.pair_id || null,
|
|
pair_config_id: command.pair_config_id || null,
|
|
pair_config_version: command.pair_config_version || null,
|
|
edge_bps: command.edge_bps || null,
|
|
max_notional: command.max_notional || null,
|
|
price_route_id: command.price_route_id || null,
|
|
...extraPayload,
|
|
},
|
|
});
|
|
assertTradeResult(event);
|
|
await producer.sendJson(config.kafkaTopicExecTradeResult, event, { key: command.execution_key });
|
|
}
|
|
|
|
const controlApi = startControlApi({
|
|
host: config.tradeExecutorControlHost,
|
|
port: config.tradeExecutorControlPort,
|
|
logger: logger.child({ component: 'control-api' }),
|
|
service: 'trade-executor',
|
|
namespace: config.projectNamespace,
|
|
stateProvider: {
|
|
async getState() {
|
|
const signerRegistered = await verifierClient.isPublicKeyRegistered({
|
|
accountId: config.nearIntentsAccountId,
|
|
}).catch(() => null);
|
|
return {
|
|
account_id: config.nearIntentsAccountId,
|
|
signer_public_key: signer.getPublicKey().toString(),
|
|
signer_registered: signerRegistered,
|
|
relay: relayClient.getState(),
|
|
trading_config: tradingConfigStore.getState(),
|
|
...state,
|
|
durable_control_state: armedStateStore.getState(),
|
|
durable_state: stateStore.getSummary({ limit: 50 }),
|
|
};
|
|
},
|
|
},
|
|
healthProvider: {
|
|
getHealth() {
|
|
const relay = relayClient.getState();
|
|
const freshnessAgeMs = ageMs(relay.last_message_at);
|
|
return {
|
|
ok: relay.connected
|
|
&& tradingConfigStore.getState().ok === true
|
|
&& (freshnessAgeMs == null || freshnessAgeMs <= config.opsSentinelExecutorRelayStaleMs),
|
|
connected: relay.connected,
|
|
trading_config_ok: tradingConfigStore.getState().ok,
|
|
trading_config_block_reason: tradingConfigStore.getState().block_reason,
|
|
relay_last_message_at: relay.last_message_at,
|
|
relay_freshness_age_ms: freshnessAgeMs,
|
|
paused: state.paused,
|
|
armed: state.armed,
|
|
reason:
|
|
tradingConfigStore.getState().ok !== true
|
|
? tradingConfigStore.getState().block_reason || 'trading config unavailable'
|
|
: relay.connected
|
|
? freshnessAgeMs != null && freshnessAgeMs > config.opsSentinelExecutorRelayStaleMs
|
|
? 'solver relay stale'
|
|
: null
|
|
: 'solver relay disconnected',
|
|
};
|
|
},
|
|
},
|
|
routes: [
|
|
{
|
|
method: 'POST',
|
|
path: '/reconnect',
|
|
handler: () => {
|
|
relayClient.reconnect();
|
|
return { ok: true, reconnecting: true };
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/arm',
|
|
handler: () => {
|
|
state.armed = armedStateStore.setArmed(true).armed;
|
|
return { ok: true, armed: true };
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/disarm',
|
|
handler: () => {
|
|
state.armed = armedStateStore.setArmed(false).armed;
|
|
return { ok: true, armed: false };
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/pause',
|
|
handler: () => {
|
|
state.paused = true;
|
|
consumer.pause([{ topic: config.kafkaTopicCmdExecuteTrade }]);
|
|
return { ok: true, paused: true };
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/resume',
|
|
handler: () => {
|
|
state.paused = false;
|
|
consumer.resume([{ topic: config.kafkaTopicCmdExecuteTrade }]);
|
|
return { ok: true, paused: false };
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/intent-request/preflight',
|
|
handler: async ({ body }) => {
|
|
const result = await requestController.preflight(body || {});
|
|
state.request_creation.last_preflight = result;
|
|
state.request_creation.preflight_count += 1;
|
|
if (result.state === 'blocked') state.request_creation.blocked_count += 1;
|
|
return result;
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/intent-request/submit',
|
|
handler: async ({ body }) => {
|
|
const result = await requestController.submit(body || {});
|
|
if (result?.statusCode != null) return result;
|
|
state.request_creation.last_submission_result = result.submission_result || null;
|
|
const status = result.submission_result?.status;
|
|
if (status === 'accepted_by_relay') state.request_creation.accepted_count += 1;
|
|
if (status === 'blocked') state.request_creation.blocked_count += 1;
|
|
if (status === 'failed') state.request_creation.failed_count += 1;
|
|
return result;
|
|
},
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/intent-request/refresh-outcomes',
|
|
handler: async () => requestController.refreshOutcomes(),
|
|
},
|
|
{
|
|
method: 'POST',
|
|
path: '/drain',
|
|
handler: () => {
|
|
state.draining = true;
|
|
state.paused = true;
|
|
consumer.pause([{ topic: config.kafkaTopicCmdExecuteTrade }]);
|
|
if (state.in_flight_count === 0) {
|
|
setTimeout(() => shutdown(), 0);
|
|
}
|
|
return { ok: true, draining: true };
|
|
},
|
|
},
|
|
],
|
|
});
|
|
|
|
function createIntentRequestStore() {
|
|
return {
|
|
loadLatestInventorySnapshot: () => loadLatestInventorySnapshot(requestPool),
|
|
loadLatestMarketPrice: () => loadLatestMarketPrice(requestPool),
|
|
findPreflight: ({ requestId = null, idempotencyKey = null } = {}) => (
|
|
loadIntentRequestPreflightByIdOrKey(requestPool, { requestId, idempotencyKey })
|
|
),
|
|
findSubmissionByRequest: ({ requestId } = {}) => (
|
|
loadLatestIntentRequestSubmission(requestPool, { requestId })
|
|
),
|
|
loadSubmissionsForStatusRefresh: () => (
|
|
loadIntentRequestSubmissionsForStatusRefresh(requestPool, { limit: 20 })
|
|
),
|
|
async insertPreflight(payload) {
|
|
const event = buildEventEnvelope({
|
|
source: 'trade-executor',
|
|
venue: 'near-intents',
|
|
eventType: 'intent_request_preflight',
|
|
observedAt: payload.created_at,
|
|
payload,
|
|
});
|
|
assertIntentRequestPreflightEvent(event);
|
|
await insertHistoryEvent(requestPool, {
|
|
table: 'intent_request_preflights',
|
|
topic: 'intent.request.preflight',
|
|
event,
|
|
record: {
|
|
quote_id: null,
|
|
pair: payload.pair || payload.source_asset_id + '->' + payload.destination_asset_id,
|
|
decision_key: payload.request_id,
|
|
},
|
|
});
|
|
},
|
|
async insertSubmissionResult(payload) {
|
|
const event = buildEventEnvelope({
|
|
source: 'trade-executor',
|
|
venue: 'near-intents',
|
|
eventType: 'intent_request_submission_result',
|
|
observedAt: payload.status_checked_at || payload.submitted_at,
|
|
payload,
|
|
});
|
|
assertIntentRequestSubmissionResultEvent(event);
|
|
await insertHistoryEvent(requestPool, {
|
|
table: 'intent_request_submission_results',
|
|
topic: 'intent.request.submission_result',
|
|
event,
|
|
record: {
|
|
quote_id: null,
|
|
pair: payload.pair || payload.source_asset_id + '->' + payload.destination_asset_id,
|
|
decision_key: payload.request_id,
|
|
},
|
|
});
|
|
},
|
|
refreshOutcomes: async () => {
|
|
const tradingConfig = await tradingConfigStore.getConfig();
|
|
if (!tradingConfig.ok || !tradingConfig.tradingBtc || !tradingConfig.tradingEure) {
|
|
throw new Error(`trading config unavailable: ${tradingConfig.blockReason || 'missing current assets'}`);
|
|
}
|
|
return refreshIntentRequestOutcomes(requestPool, {
|
|
btcAsset: tradingConfig.tradingBtc,
|
|
eureAsset: tradingConfig.tradingEure,
|
|
});
|
|
},
|
|
};
|
|
}
|
|
|
|
async function shutdown() {
|
|
await controlApi.close().catch(() => {});
|
|
relayClient.close();
|
|
await consumer.disconnect();
|
|
await producer.disconnect();
|
|
await requestPool.end().catch(() => {});
|
|
process.exit(0);
|
|
}
|
|
|
|
process.on('SIGINT', shutdown);
|
|
process.on('SIGTERM', shutdown);
|