From 7a059006d25271cd858f0175a223a9f970425bdd Mon Sep 17 00:00:00 2001 From: philipp Date: Sun, 12 Apr 2026 11:57:14 +0200 Subject: [PATCH] Fix solver relay websocket recovery Proof: Live investigation found the executor could stay disconnected from the solver relay after websocket error without close; relay client now reconnects after error and request timeout covers connection wait. Added regression coverage and npm test passes. Assumptions: Historical relay OK responses were technically accepted and mostly not filled because they were not selected or settled, while the current disconnected relay state was a repo-side runtime bug affecting new submissions. Still fake: Venue-native terminal fill events, fee attribution, realized per-trade PnL, and full inventory-skew strategy controls remain incomplete. --- src/venues/near-intents/solver-relay-ws.mjs | 141 +++++++++++++++----- test/solver-relay-ws.test.mjs | 139 +++++++++++++++++++ 2 files changed, 243 insertions(+), 37 deletions(-) create mode 100644 test/solver-relay-ws.test.mjs diff --git a/src/venues/near-intents/solver-relay-ws.mjs b/src/venues/near-intents/solver-relay-ws.mjs index fbb6d5d..2c726d6 100644 --- a/src/venues/near-intents/solver-relay-ws.mjs +++ b/src/venues/near-intents/solver-relay-ws.mjs @@ -6,6 +6,7 @@ export async function startSolverRelayWs({ logger = null, subscriptions = [], onEvent = () => {}, + reconnectDelayMs = 2000, }) { if (!apiKey) throw new Error('Missing NEAR_INTENTS_API_KEY'); @@ -15,7 +16,7 @@ export async function startSolverRelayWs({ let connected = false; let requestId = 1; const pending = new Map(); - let readyResolvers = []; + const readyResolvers = new Set(); let reconnectCount = 0; let lastMessageAt = null; let lastConnectedAt = null; @@ -25,9 +26,7 @@ export async function startSolverRelayWs({ connect(); return { - async request(method, params, { timeoutMs = 10_000 } = {}) { - await waitForConnection(); - + request(method, params, { timeoutMs = 10_000 } = {}) { const id = requestId++; const payload = { jsonrpc: '2.0', @@ -37,23 +36,53 @@ export async function startSolverRelayWs({ }; return new Promise((resolve, reject) => { + let settled = false; + let readyResolver = null; + const timeout = setTimeout(() => { - pending.delete(id); - reject(new Error(`${method} timed out`)); + fail(new Error(`${method} timed out`)); }, timeoutMs); - pending.set(id, { - resolve(result) { - clearTimeout(timeout); - resolve(result); - }, - reject(error) { - clearTimeout(timeout); - reject(error); - }, - }); + function cleanup() { + clearTimeout(timeout); + pending.delete(id); + if (readyResolver) readyResolvers.delete(readyResolver); + } - socket.send(JSON.stringify(payload)); + function succeed(result) { + if (settled) return; + settled = true; + cleanup(); + resolve(result); + } + + function fail(error) { + if (settled) return; + settled = true; + cleanup(); + reject(error); + } + + function sendRequest() { + if (settled) return; + if (!connected || !socket || socket.readyState !== WebSocket.OPEN) { + fail(new Error('Socket not connected')); + return; + } + + pending.set(id, { + resolve: succeed, + reject: fail, + }); + socket.send(JSON.stringify(payload)); + } + + if (connected) { + sendRequest(); + } else { + readyResolver = sendRequest; + readyResolvers.add(readyResolver); + } }); }, getState() { @@ -68,32 +97,37 @@ export async function startSolverRelayWs({ }; }, reconnect() { - if (socket && socket.readyState <= 1) { - socket.close(); - } else { - connect(); - } + connected = false; + closeSocket(); + scheduleReconnect(); }, close() { closed = true; + connected = false; if (reconnectTimer) clearTimeout(reconnectTimer); - if (socket && socket.readyState <= 1) socket.close(); + reconnectTimer = null; + closeSocket(); rejectAllPending(new Error('Socket closed')); + rejectReadyRequests(new Error('Socket closed')); }, }; function connect() { if (closed) return; + reconnectTimer = null; reconnectCount += 1; lastReconnectAt = new Date().toISOString(); - socket = new WebSocket(wsUrl, { + closeSocket(); + const activeSocket = new WebSocket(wsUrl, { headers: { Authorization: `Bearer ${apiKey}`, }, }); + socket = activeSocket; - socket.addEventListener('open', () => { + activeSocket.addEventListener('open', () => { + if (activeSocket !== socket) return; connected = true; lastConnectedAt = new Date().toISOString(); logger?.info('connection_established', { @@ -111,7 +145,8 @@ export async function startSolverRelayWs({ } }); - socket.addEventListener('message', (event) => { + activeSocket.addEventListener('message', (event) => { + if (activeSocket !== socket) return; lastMessageAt = new Date().toISOString(); const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8'); let payload; @@ -135,40 +170,72 @@ export async function startSolverRelayWs({ onEvent(payload); }); - socket.addEventListener('close', () => { + activeSocket.addEventListener('close', () => { + if (activeSocket !== socket) return; connected = false; lastDisconnectedAt = new Date().toISOString(); rejectAllPending(new Error('Socket disconnected')); logger?.warn('connection_lost', { venue: 'near-intents', details: { - reconnect_in_ms: 2000, + reconnect_in_ms: reconnectDelayMs, }, }); - if (!closed) reconnectTimer = setTimeout(connect, 2000); + scheduleReconnect(); }); - socket.addEventListener('error', (error) => { + activeSocket.addEventListener('error', (error) => { + if (activeSocket !== socket) return; + connected = false; + lastDisconnectedAt = new Date().toISOString(); + rejectAllPending(new Error('Socket error')); logger?.error('socket_error', { venue: 'near-intents', details: { error: serializeError(error), }, }); + closeSocket(); + scheduleReconnect(); }); } - function waitForConnection() { - if (connected) return Promise.resolve(); - return new Promise((resolve) => { - readyResolvers.push(resolve); - }); + function closeSocket() { + if (!socket || socket.readyState > WebSocket.OPEN) return; + try { + socket.close(); + } catch (error) { + logger?.warn('socket_close_failed', { + venue: 'near-intents', + details: { + error: serializeError(error), + }, + }); + } + } + + function scheduleReconnect() { + if (closed || reconnectTimer) return; + reconnectTimer = setTimeout(connect, reconnectDelayMs); } function resolveReady() { - for (const resolve of readyResolvers) resolve(); - readyResolvers = []; + const resolvers = [...readyResolvers]; + readyResolvers.clear(); + for (const resolve of resolvers) resolve(); + } + + function rejectReadyRequests(error) { + const resolvers = [...readyResolvers]; + readyResolvers.clear(); + for (const resolve of resolvers) { + try { + resolve(error); + } catch { + // The resolver owns its promise rejection path. + } + } } function rejectAllPending(error) { diff --git a/test/solver-relay-ws.test.mjs b/test/solver-relay-ws.test.mjs new file mode 100644 index 0000000..76063c5 --- /dev/null +++ b/test/solver-relay-ws.test.mjs @@ -0,0 +1,139 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { startSolverRelayWs } from '../src/venues/near-intents/solver-relay-ws.mjs'; + +function delay(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function installMockWebSocket() { + const original = globalThis.WebSocket; + const instances = []; + + class MockWebSocket { + static CONNECTING = 0; + static OPEN = 1; + static CLOSING = 2; + static CLOSED = 3; + + constructor(url, options) { + this.url = url; + this.options = options; + this.readyState = MockWebSocket.CONNECTING; + this.sent = []; + this.listeners = new Map(); + instances.push(this); + } + + addEventListener(type, listener) { + const existing = this.listeners.get(type) || []; + existing.push(listener); + this.listeners.set(type, existing); + } + + send(payload) { + this.sent.push(payload); + } + + close() { + if (this.readyState === MockWebSocket.CLOSED) return; + this.readyState = MockWebSocket.CLOSED; + this.emit('close', {}); + } + + open() { + this.readyState = MockWebSocket.OPEN; + this.emit('open', {}); + } + + error(error = new Error('socket failed')) { + this.emit('error', error); + } + + message(payload) { + this.emit('message', { data: JSON.stringify(payload) }); + } + + emit(type, event) { + for (const listener of this.listeners.get(type) || []) listener(event); + } + } + + globalThis.WebSocket = MockWebSocket; + + return { + instances, + restore() { + globalThis.WebSocket = original; + }, + }; +} + +test('solver relay request timeout includes time spent waiting for websocket connection', async () => { + const mock = installMockWebSocket(); + const client = await startSolverRelayWs({ + apiKey: 'api-key', + wsUrl: 'wss://relay.example/ws', + reconnectDelayMs: 1000, + }); + + try { + await assert.rejects( + client.request('quote_response', [], { timeoutMs: 5 }), + /quote_response timed out/, + ); + assert.equal(mock.instances[0].sent.length, 0); + } finally { + client.close(); + mock.restore(); + } +}); + +test('solver relay reconnects after websocket error even when no close event is emitted by the runtime', async () => { + const mock = installMockWebSocket(); + const client = await startSolverRelayWs({ + apiKey: 'api-key', + wsUrl: 'wss://relay.example/ws', + reconnectDelayMs: 1, + }); + + try { + assert.equal(mock.instances.length, 1); + mock.instances[0].error(new Error('network failed')); + await delay(10); + + assert.equal(client.getState().connected, false); + assert.ok(client.getState().reconnect_count >= 2); + assert.ok(mock.instances.length >= 2); + } finally { + client.close(); + mock.restore(); + } +}); + +test('solver relay sends queued request after connection and resolves rpc result', async () => { + const mock = installMockWebSocket(); + const client = await startSolverRelayWs({ + apiKey: 'api-key', + wsUrl: 'wss://relay.example/ws', + reconnectDelayMs: 1000, + subscriptions: ['quote_status'], + }); + + try { + const responsePromise = client.request('quote_response', [{ quote_id: 'quote-1' }], { timeoutMs: 50 }); + mock.instances[0].open(); + assert.equal(mock.instances[0].sent.length, 2); + + const sent = mock.instances[0].sent.map((entry) => JSON.parse(entry)); + const request = sent.find((entry) => entry.method === 'quote_response'); + assert.ok(request); + mock.instances[0].message({ id: request.id, result: 'OK' }); + await assert.doesNotReject(responsePromise); + assert.equal(await responsePromise, 'OK'); + } finally { + client.close(); + mock.restore(); + } +});