From d5a7325e484f721cfc49165424517111b61c0374 Mon Sep 17 00:00:00 2001 From: philipp Date: Sun, 12 Apr 2026 17:30:19 +0200 Subject: [PATCH] Reconnect quote ingest after websocket errors Proof: NEAR Intents quote ingest now schedules reconnect on websocket error even when the runtime does not emit close; regression test, dashboard build, and full npm test pass. Assumptions: The observed live ingest outage after the 0.49 percent rollout was caused by the startup socket_error path leaving the websocket disconnected; reconnecting preserves the existing pair filter and topics without changing trading size or funds exposure. Still fake: Venue-native terminal fill events, fee attribution, realized per-trade PnL, and full inventory-skew strategy controls remain incomplete. --- src/venues/near-intents/ws.mjs | 52 ++++++++++++++---- test/near-intents-ws.test.mjs | 98 ++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 9 deletions(-) create mode 100644 test/near-intents-ws.test.mjs diff --git a/src/venues/near-intents/ws.mjs b/src/venues/near-intents/ws.mjs index fdaad8c..4e2bc3f 100644 --- a/src/venues/near-intents/ws.mjs +++ b/src/venues/near-intents/ws.mjs @@ -18,6 +18,7 @@ export async function startNearIntentsWs({ logger, namespace = 'unrip', onPublish = defaultOnPublish, + reconnectDelayMs = 2_000, }) { if (!apiKey) throw new Error('Missing NEAR_INTENTS_API_KEY'); @@ -45,6 +46,7 @@ export async function startNearIntentsWs({ function connect() { if (closed) return; + reconnectTimer = null; reconnectCount += 1; lastReconnectAt = new Date().toISOString(); @@ -54,6 +56,7 @@ export async function startNearIntentsWs({ activeSocket = ws; ws.addEventListener('open', () => { + if (activeSocket !== ws) return; connected = true; lastConnectedAt = new Date().toISOString(); logger?.info('connection_established', { @@ -64,6 +67,7 @@ export async function startNearIntentsWs({ }); ws.addEventListener('message', async (event) => { + if (activeSocket !== ws) return; framesReceived += 1; lastMessageAt = new Date().toISOString(); const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8'); @@ -138,30 +142,53 @@ export async function startNearIntentsWs({ }); ws.addEventListener('close', () => { + if (activeSocket !== ws) return; connected = false; activeSocket = null; lastDisconnectedAt = new Date().toISOString(); logger?.warn('connection_lost', { namespace, details: { - reconnect_in_ms: 2_000, + reconnect_in_ms: reconnectDelayMs, }, }); - if (!closed) { - reconnectTimer = setTimeout(connect, 2000); - } + scheduleReconnect(); }); ws.addEventListener('error', (err) => { + if (activeSocket !== ws) return; + connected = false; + lastDisconnectedAt = new Date().toISOString(); logger?.error('socket_error', { namespace, details: { error: serializeError(err), }, }); + closeSocket(ws); + scheduleReconnect(); }); } + function closeSocket(ws) { + if (!ws || ws.readyState > WebSocket.OPEN) return; + try { + ws.close(); + } catch (error) { + logger?.warn('socket_close_failed', { + namespace, + details: { + error: serializeError(error), + }, + }); + } + } + + function scheduleReconnect() { + if (closed || reconnectTimer) return; + reconnectTimer = setTimeout(connect, reconnectDelayMs); + } + connect(); return { @@ -187,17 +214,24 @@ export async function startNearIntentsWs({ }; }, reconnect() { - if (activeSocket && activeSocket.readyState <= 1) { - activeSocket.close(); + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + if (activeSocket && activeSocket.readyState <= WebSocket.OPEN) { + closeSocket(activeSocket); } else { connect(); } }, close() { closed = true; - if (reconnectTimer) clearTimeout(reconnectTimer); - if (activeSocket && activeSocket.readyState <= 1) { - activeSocket.close(); + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + if (activeSocket && activeSocket.readyState <= WebSocket.OPEN) { + closeSocket(activeSocket); } }, }; diff --git a/test/near-intents-ws.test.mjs b/test/near-intents-ws.test.mjs new file mode 100644 index 0000000..355e544 --- /dev/null +++ b/test/near-intents-ws.test.mjs @@ -0,0 +1,98 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { startNearIntentsWs } from '../src/venues/near-intents/ws.mjs'; + +function delay(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function installMockWebSocket() { + const original = globalThis.WebSocket; + const instances = []; + + class MockWebSocket { + static CONNECTING = 0; + static OPEN = 1; + static CLOSING = 2; + static CLOSED = 3; + + constructor(url, options) { + this.url = url; + this.options = options; + this.readyState = MockWebSocket.CONNECTING; + this.sent = []; + this.listeners = new Map(); + instances.push(this); + } + + addEventListener(type, listener) { + const existing = this.listeners.get(type) || []; + existing.push(listener); + this.listeners.set(type, existing); + } + + send(payload) { + this.sent.push(payload); + } + + close() { + if (this.readyState === MockWebSocket.CLOSED) return; + this.readyState = MockWebSocket.CLOSED; + this.emit('close', {}); + } + + open() { + this.readyState = MockWebSocket.OPEN; + this.emit('open', {}); + } + + error(error = new Error('socket failed')) { + this.emit('error', error); + } + + emit(type, event) { + for (const listener of this.listeners.get(type) || []) listener(event); + } + } + + globalThis.WebSocket = MockWebSocket; + + return { + instances, + restore() { + globalThis.WebSocket = original; + }, + }; +} + +test('near intents ingest reconnects after websocket error before open without runtime close', async () => { + const mock = installMockWebSocket(); + const producer = { sendJson: async () => {} }; + const client = await startNearIntentsWs({ + apiKey: 'api-key', + wsUrl: 'wss://relay.example/ws', + pairFilter: ['btc', 'eure'], + producer, + rawTopic: 'raw.near_intents.quote', + normalizedTopic: 'norm.swap_demand', + reconnectDelayMs: 1, + }); + + try { + assert.equal(mock.instances.length, 1); + mock.instances[0].error(new Error('network failed')); + await delay(10); + + assert.equal(client.getState().connected, false); + assert.ok(client.getState().reconnect_count >= 2); + assert.ok(mock.instances.length >= 2); + + mock.instances[1].open(); + assert.equal(client.getState().connected, true); + assert.equal(mock.instances[1].sent.length, 2); + } finally { + client.close(); + mock.restore(); + } +});