import { matchesPairFilter } from '../../core/pair-filter.mjs'; import { serializeError } from '../../core/log.mjs'; import { assertNormalizedSwapDemand } from '../../core/schemas.mjs'; import { buildNearIntentsQuoteEnvelope, buildNearIntentsRawEnvelope } from './normalize.mjs'; const DEFAULT_WS_URL = 'wss://solver-relay-v2.chaindefuser.com/ws'; const QUOTE_SUB_ID = 1; const QUOTE_STATUS_SUB_ID = 2; export async function startNearIntentsWs({ apiKey, wsUrl = DEFAULT_WS_URL, pairFilter, getPairFilter = () => pairFilter, producer, rawTopic, normalizedTopic, logger, namespace = 'unrip', onPublish = defaultOnPublish, }) { if (!apiKey) throw new Error('Missing NEAR_INTENTS_API_KEY'); let quoteSubscriptionId = null; let quoteStatusSubscriptionId = null; let publishedCount = 0; let publishLocked = false; let closed = false; let reconnectTimer = null; let activeSocket = null; let connected = false; let framesReceived = 0; let quoteFramesReceived = 0; let filteredCount = 0; let publishErrorCount = 0; let invalidJsonCount = 0; let lastMessageAt = null; let lastMatchingQuoteAt = null; let lastPublishedAt = null; let lastPublishedPair = null; let reconnectCount = 0; let lastConnectedAt = null; let lastDisconnectedAt = null; let lastReconnectAt = null; function connect() { if (closed) return; reconnectCount += 1; lastReconnectAt = new Date().toISOString(); const ws = new WebSocket(wsUrl, { headers: { Authorization: `Bearer ${apiKey}` }, }); activeSocket = ws; ws.addEventListener('open', () => { connected = true; lastConnectedAt = new Date().toISOString(); logger?.info('connection_established', { namespace, }); ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_SUB_ID, method: 'subscribe', params: ['quote'] })); ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_STATUS_SUB_ID, method: 'subscribe', params: ['quote_status'] })); }); ws.addEventListener('message', async (event) => { framesReceived += 1; lastMessageAt = new Date().toISOString(); const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8'); let payload; try { payload = JSON.parse(text); } catch { invalidJsonCount += 1; return; } if (payload?.id === QUOTE_SUB_ID) { quoteSubscriptionId = extractSubscriptionId(payload.result); return; } if (payload?.id === QUOTE_STATUS_SUB_ID) { quoteStatusSubscriptionId = extractSubscriptionId(payload.result); return; } const eventFrame = extractQuoteEventFrame(payload); if (!eventFrame) return; quoteFramesReceived += 1; const { subscription, merged } = eventFrame; if (quoteStatusSubscriptionId && subscription === quoteStatusSubscriptionId) return; if (quoteSubscriptionId && subscription && subscription !== quoteSubscriptionId) return; if (publishLocked) return; const rawEnvelope = buildNearIntentsRawEnvelope(merged); const envelope = buildNearIntentsQuoteEnvelope(merged); if (!envelope) return; assertNormalizedSwapDemand(envelope); const assetIn = envelope.payload?.asset_in; const assetOut = envelope.payload?.asset_out; if (!assetIn || !assetOut) return; const activePairFilter = getPairFilter(); if (!matchesPairFilter(assetIn, assetOut, activePairFilter)) { filteredCount += 1; return; } lastMatchingQuoteAt = new Date().toISOString(); publishLocked = true; try { await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id }); await producer.sendJson(normalizedTopic, envelope, { key: envelope.payload.quote_id }); publishedCount += 1; lastPublishedAt = new Date().toISOString(); lastPublishedPair = `${assetIn}->${assetOut}`; onPublish(envelope, publishedCount); } catch (error) { publishErrorCount += 1; logger?.error('publish_failed', { namespace, topic: normalizedTopic, pair: `${assetIn}->${assetOut}`, details: { raw_topic: rawTopic, error: serializeError(error), quote_id: envelope.payload?.quote_id, }, }); } finally { publishLocked = false; } }); ws.addEventListener('close', () => { connected = false; activeSocket = null; lastDisconnectedAt = new Date().toISOString(); logger?.warn('connection_lost', { namespace, details: { reconnect_in_ms: 2_000, }, }); if (!closed) { reconnectTimer = setTimeout(connect, 2000); } }); ws.addEventListener('error', (err) => { logger?.error('socket_error', { namespace, details: { error: serializeError(err), }, }); }); } connect(); return { getState() { return { connected, reconnect_count: reconnectCount, frames_received: framesReceived, quote_frames_received: quoteFramesReceived, filtered_count: filteredCount, published_count: publishedCount, publish_error_count: publishErrorCount, invalid_json_count: invalidJsonCount, last_message_at: lastMessageAt, last_matching_quote_at: lastMatchingQuoteAt, last_published_at: lastPublishedAt, last_published_pair: lastPublishedPair, last_connected_at: lastConnectedAt, last_disconnected_at: lastDisconnectedAt, last_reconnect_at: lastReconnectAt, raw_topic: rawTopic, normalized_topic: normalizedTopic, }; }, reconnect() { if (activeSocket && activeSocket.readyState <= 1) { activeSocket.close(); } else { connect(); } }, close() { closed = true; if (reconnectTimer) clearTimeout(reconnectTimer); if (activeSocket && activeSocket.readyState <= 1) { activeSocket.close(); } }, }; } function extractSubscriptionId(result) { if (typeof result === 'string') return result; if (result && typeof result === 'object') { return result.subscription || result.subscription_id || result.subscriber_id || null; } return null; } function extractQuoteEventFrame(payload) { const candidates = []; if (payload?.method === 'event' && payload?.params) { candidates.push(payload.params); } if (payload?.result && typeof payload.result === 'object') { candidates.push(payload.result); } if (payload && typeof payload === 'object') { candidates.push(payload); } for (const candidate of candidates) { const data = candidate?.data; const metadata = candidate?.metadata; const merged = isRecord(data) || isRecord(metadata) ? { ...(isRecord(metadata) ? metadata : {}), ...(isRecord(data) ? data : {}) } : candidate; if (!isRecord(merged)) continue; if (!looksLikeQuotePayload(merged)) continue; return { subscription: candidate?.subscription ?? null, merged, }; } return null; } function looksLikeQuotePayload(payload) { return Boolean( payload.quote_hash || payload.quote_id || payload.defuse_asset_identifier_in || payload.defuse_asset_identifier_out || payload.asset_in || payload.asset_out, ); } function isRecord(value) { return Boolean(value) && typeof value === 'object' && !Array.isArray(value); } function defaultOnPublish() {}