Reconnect quote ingest after websocket errors
All checks were successful
deploy / deploy (push) Successful in 32s

Proof: NEAR Intents quote ingest now schedules reconnect on websocket error even when the runtime does not emit close; regression test, dashboard build, and full npm test pass.

Assumptions: The observed live ingest outage after the 0.49 percent rollout was caused by the startup socket_error path leaving the websocket disconnected; reconnecting preserves the existing pair filter and topics without changing trading size or funds exposure.

Still fake: Venue-native terminal fill events, fee attribution, realized per-trade PnL, and full inventory-skew strategy controls remain incomplete.
This commit is contained in:
philipp 2026-04-12 17:30:19 +02:00
parent 72b399d91f
commit d5a7325e48
2 changed files with 141 additions and 9 deletions

View file

@ -18,6 +18,7 @@ export async function startNearIntentsWs({
logger,
namespace = 'unrip',
onPublish = defaultOnPublish,
reconnectDelayMs = 2_000,
}) {
if (!apiKey) throw new Error('Missing NEAR_INTENTS_API_KEY');
@ -45,6 +46,7 @@ export async function startNearIntentsWs({
function connect() {
if (closed) return;
reconnectTimer = null;
reconnectCount += 1;
lastReconnectAt = new Date().toISOString();
@ -54,6 +56,7 @@ export async function startNearIntentsWs({
activeSocket = ws;
ws.addEventListener('open', () => {
if (activeSocket !== ws) return;
connected = true;
lastConnectedAt = new Date().toISOString();
logger?.info('connection_established', {
@ -64,6 +67,7 @@ export async function startNearIntentsWs({
});
ws.addEventListener('message', async (event) => {
if (activeSocket !== ws) return;
framesReceived += 1;
lastMessageAt = new Date().toISOString();
const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8');
@ -138,30 +142,53 @@ export async function startNearIntentsWs({
});
ws.addEventListener('close', () => {
if (activeSocket !== ws) return;
connected = false;
activeSocket = null;
lastDisconnectedAt = new Date().toISOString();
logger?.warn('connection_lost', {
namespace,
details: {
reconnect_in_ms: 2_000,
reconnect_in_ms: reconnectDelayMs,
},
});
if (!closed) {
reconnectTimer = setTimeout(connect, 2000);
}
scheduleReconnect();
});
ws.addEventListener('error', (err) => {
if (activeSocket !== ws) return;
connected = false;
lastDisconnectedAt = new Date().toISOString();
logger?.error('socket_error', {
namespace,
details: {
error: serializeError(err),
},
});
closeSocket(ws);
scheduleReconnect();
});
}
function closeSocket(ws) {
if (!ws || ws.readyState > WebSocket.OPEN) return;
try {
ws.close();
} catch (error) {
logger?.warn('socket_close_failed', {
namespace,
details: {
error: serializeError(error),
},
});
}
}
function scheduleReconnect() {
if (closed || reconnectTimer) return;
reconnectTimer = setTimeout(connect, reconnectDelayMs);
}
connect();
return {
@ -187,17 +214,24 @@ export async function startNearIntentsWs({
};
},
reconnect() {
if (activeSocket && activeSocket.readyState <= 1) {
activeSocket.close();
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (activeSocket && activeSocket.readyState <= WebSocket.OPEN) {
closeSocket(activeSocket);
} else {
connect();
}
},
close() {
closed = true;
if (reconnectTimer) clearTimeout(reconnectTimer);
if (activeSocket && activeSocket.readyState <= 1) {
activeSocket.close();
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (activeSocket && activeSocket.readyState <= WebSocket.OPEN) {
closeSocket(activeSocket);
}
},
};

View file

@ -0,0 +1,98 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { startNearIntentsWs } from '../src/venues/near-intents/ws.mjs';
function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function installMockWebSocket() {
const original = globalThis.WebSocket;
const instances = [];
class MockWebSocket {
static CONNECTING = 0;
static OPEN = 1;
static CLOSING = 2;
static CLOSED = 3;
constructor(url, options) {
this.url = url;
this.options = options;
this.readyState = MockWebSocket.CONNECTING;
this.sent = [];
this.listeners = new Map();
instances.push(this);
}
addEventListener(type, listener) {
const existing = this.listeners.get(type) || [];
existing.push(listener);
this.listeners.set(type, existing);
}
send(payload) {
this.sent.push(payload);
}
close() {
if (this.readyState === MockWebSocket.CLOSED) return;
this.readyState = MockWebSocket.CLOSED;
this.emit('close', {});
}
open() {
this.readyState = MockWebSocket.OPEN;
this.emit('open', {});
}
error(error = new Error('socket failed')) {
this.emit('error', error);
}
emit(type, event) {
for (const listener of this.listeners.get(type) || []) listener(event);
}
}
globalThis.WebSocket = MockWebSocket;
return {
instances,
restore() {
globalThis.WebSocket = original;
},
};
}
test('near intents ingest reconnects after websocket error before open without runtime close', async () => {
const mock = installMockWebSocket();
const producer = { sendJson: async () => {} };
const client = await startNearIntentsWs({
apiKey: 'api-key',
wsUrl: 'wss://relay.example/ws',
pairFilter: ['btc', 'eure'],
producer,
rawTopic: 'raw.near_intents.quote',
normalizedTopic: 'norm.swap_demand',
reconnectDelayMs: 1,
});
try {
assert.equal(mock.instances.length, 1);
mock.instances[0].error(new Error('network failed'));
await delay(10);
assert.equal(client.getState().connected, false);
assert.ok(client.getState().reconnect_count >= 2);
assert.ok(mock.instances.length >= 2);
mock.instances[1].open();
assert.equal(client.getState().connected, true);
assert.equal(mock.instances[1].sent.length, 2);
} finally {
client.close();
mock.restore();
}
});