import { matchesPairFilter } from '../../core/pair-filter.mjs'; import { serializeError } from '../../core/log.mjs'; import { assertNormalizedSwapDemand } from '../../core/schemas.mjs'; import { buildNearIntentsQuoteEnvelope, buildNearIntentsRawEnvelope } from './normalize.mjs'; const DEFAULT_WS_URL = 'wss://solver-relay-v2.chaindefuser.com/ws'; const QUOTE_SUB_ID = 1; const QUOTE_STATUS_SUB_ID = 2; export async function startNearIntentsWs({ apiKey, wsUrl = DEFAULT_WS_URL, pairFilter, getPairFilter = () => pairFilter, matchesPair = null, producer, rawTopic, normalizedTopic, logger, namespace = 'unrip', onPublish = defaultOnPublish, reconnectDelayMs = 2_000, }) { if (!apiKey) throw new Error('Missing NEAR_INTENTS_API_KEY'); let quoteSubscriptionId = null; let quoteStatusSubscriptionId = null; let publishedCount = 0; let rawPublishedCount = 0; let publishLocked = false; let closed = false; let reconnectTimer = null; let activeSocket = null; let connected = false; let framesReceived = 0; let quoteFramesReceived = 0; let filteredCount = 0; let publishErrorCount = 0; let invalidJsonCount = 0; let lastMessageAt = null; let lastMatchingQuoteAt = null; let lastPublishedAt = null; let lastPublishedPair = null; let reconnectCount = 0; let lastConnectedAt = null; let lastDisconnectedAt = null; let lastReconnectAt = null; const closingSockets = new WeakSet(); function connect() { if (closed) return; reconnectTimer = null; reconnectCount += 1; lastReconnectAt = new Date().toISOString(); const ws = new WebSocket(wsUrl, { headers: { Authorization: `Bearer ${apiKey}` }, }); activeSocket = ws; ws.addEventListener('open', () => { if (activeSocket !== ws) return; connected = true; lastConnectedAt = new Date().toISOString(); logger?.info('connection_established', { namespace, }); ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_SUB_ID, method: 'subscribe', params: ['quote'] })); ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_STATUS_SUB_ID, method: 'subscribe', params: ['quote_status'] })); }); ws.addEventListener('message', async (event) => { if (activeSocket !== ws) return; framesReceived += 1; lastMessageAt = new Date().toISOString(); const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8'); let payload; try { payload = JSON.parse(text); } catch { invalidJsonCount += 1; return; } if (payload?.id === QUOTE_SUB_ID) { quoteSubscriptionId = extractSubscriptionId(payload.result); return; } if (payload?.id === QUOTE_STATUS_SUB_ID) { quoteStatusSubscriptionId = extractSubscriptionId(payload.result); return; } const eventFrame = extractQuoteEventFrame(payload); if (!eventFrame) return; quoteFramesReceived += 1; const { subscription, merged } = eventFrame; if (quoteStatusSubscriptionId && subscription === quoteStatusSubscriptionId) return; if (quoteSubscriptionId && subscription && subscription !== quoteSubscriptionId) return; if (publishLocked) return; const envelope = buildNearIntentsQuoteEnvelope(merged); const rawEnvelope = buildNearIntentsRawEnvelope(merged); try { await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id }); rawPublishedCount += 1; } catch (error) { publishErrorCount += 1; logger?.error('raw_publish_failed', { namespace, topic: rawTopic, details: { error: serializeError(error), quote_id: rawEnvelope.payload?.message?.quote_id || rawEnvelope.payload?.message?.quote_hash || null, }, }); } if (!envelope) return; assertNormalizedSwapDemand(envelope); const assetIn = envelope.payload?.asset_in; const assetOut = envelope.payload?.asset_out; if (!assetIn || !assetOut) return; const pairAllowed = matchesPair ? await matchesPair(assetIn, assetOut) : matchesPairFilter(assetIn, assetOut, getPairFilter()); if (!pairAllowed) { filteredCount += 1; return; } lastMatchingQuoteAt = new Date().toISOString(); publishLocked = true; try { await producer.sendJson(normalizedTopic, envelope, { key: envelope.payload.quote_id }); publishedCount += 1; lastPublishedAt = new Date().toISOString(); lastPublishedPair = `${assetIn}->${assetOut}`; onPublish(envelope, publishedCount); } catch (error) { publishErrorCount += 1; logger?.error('publish_failed', { namespace, topic: normalizedTopic, pair: `${assetIn}->${assetOut}`, details: { raw_topic: rawTopic, error: serializeError(error), quote_id: envelope.payload?.quote_id, }, }); } finally { publishLocked = false; } }); ws.addEventListener('close', () => { if (activeSocket !== ws) return; connected = false; activeSocket = null; lastDisconnectedAt = new Date().toISOString(); logger?.warn('connection_lost', { namespace, details: { reconnect_in_ms: reconnectDelayMs, }, }); scheduleReconnect(); }); ws.addEventListener('error', (err) => { if (activeSocket !== ws) return; if (closingSockets.has(ws)) return; connected = false; lastDisconnectedAt = new Date().toISOString(); logger?.error('socket_error', { namespace, details: { error: serializeError(err), }, }); closeSocket(ws); scheduleReconnect(); }); } function closeSocket(ws) { if (!ws || ws.readyState > WebSocket.OPEN || closingSockets.has(ws)) return; closingSockets.add(ws); try { ws.close(); } catch (error) { logger?.warn('socket_close_failed', { namespace, details: { error: serializeError(error), }, }); } } function scheduleReconnect() { if (closed || reconnectTimer) return; reconnectTimer = setTimeout(connect, reconnectDelayMs); } connect(); return { getState() { return { connected, reconnect_count: reconnectCount, frames_received: framesReceived, quote_frames_received: quoteFramesReceived, filtered_count: filteredCount, raw_published_count: rawPublishedCount, published_count: publishedCount, publish_error_count: publishErrorCount, invalid_json_count: invalidJsonCount, last_message_at: lastMessageAt, last_matching_quote_at: lastMatchingQuoteAt, last_published_at: lastPublishedAt, last_published_pair: lastPublishedPair, last_connected_at: lastConnectedAt, last_disconnected_at: lastDisconnectedAt, last_reconnect_at: lastReconnectAt, raw_topic: rawTopic, normalized_topic: normalizedTopic, }; }, reconnect() { if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } if (activeSocket && activeSocket.readyState <= WebSocket.OPEN) { closeSocket(activeSocket); } else { connect(); } }, close() { closed = true; if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } if (activeSocket && activeSocket.readyState <= WebSocket.OPEN) { closeSocket(activeSocket); } }, }; } function extractSubscriptionId(result) { if (typeof result === 'string') return result; if (result && typeof result === 'object') { return result.subscription || result.subscription_id || result.subscriber_id || null; } return null; } function extractQuoteEventFrame(payload) { const candidates = []; if (payload?.method === 'event' && payload?.params) { candidates.push(payload.params); } if (payload?.result && typeof payload.result === 'object') { candidates.push(payload.result); } if (payload && typeof payload === 'object') { candidates.push(payload); } for (const candidate of candidates) { const data = candidate?.data; const metadata = candidate?.metadata; const merged = isRecord(data) || isRecord(metadata) ? { ...(isRecord(metadata) ? metadata : {}), ...(isRecord(data) ? data : {}) } : candidate; if (!isRecord(merged)) continue; if (!looksLikeQuotePayload(merged)) continue; return { subscription: candidate?.subscription ?? null, merged, }; } return null; } function looksLikeQuotePayload(payload) { return Boolean( payload.quote_hash || payload.quote_id || payload.defuse_asset_identifier_in || payload.defuse_asset_identifier_out || payload.asset_in || payload.asset_out, ); } function isRecord(value) { return Boolean(value) && typeof value === 'object' && !Array.isArray(value); } function defaultOnPublish() {}