From 82017dd3014318b11d33e2ab29a063e7b67acbb8 Mon Sep 17 00:00:00 2001 From: philipp Date: Wed, 13 May 2026 18:25:54 +0200 Subject: [PATCH] Guard quote ingest against node OOM Proof: Live investigation showed doran-1 entered NodeNotReady with kubelet SystemOOM and TLS/control-plane timeouts while near-intents-ingest, history-writer, and operator-dashboard were the largest Node memory consumers. This commit adds websocket publish backpressure for the raw quote firehose and pod memory guardrails for the affected services. Assumptions: Dropping quote frames while Kafka publishing is backpressured is safer than allowing unbounded in-flight publishes to take down the single-node cluster; retained Kafka/Postgres history remains best-effort under overload until the platform has enough capacity for full raw retention. Still fake: This does not add durable queue spillover for skipped raw websocket frames, does not resize the node, and does not prove fee-complete trading PnL. --- deploy/k8s/base/unrip.yaml | 24 ++++++ src/venues/near-intents/ws.mjs | 75 ++++++++++--------- test/near-intents-ws.test.mjs | 64 ++++++++++++++++ .../unrip-resource-guardrails-static.test.mjs | 23 ++++++ 4 files changed, 149 insertions(+), 37 deletions(-) create mode 100644 test/unrip-resource-guardrails-static.test.mjs diff --git a/deploy/k8s/base/unrip.yaml b/deploy/k8s/base/unrip.yaml index 07bab3e..62d2894 100644 --- a/deploy/k8s/base/unrip.yaml +++ b/deploy/k8s/base/unrip.yaml @@ -293,6 +293,9 @@ spec: image: ghcr.io/example/unrip:bootstrap imagePullPolicy: IfNotPresent command: ["node", "src/apps/near-intents-ingest.mjs"] + env: + - name: NODE_OPTIONS + value: "--max-old-space-size=896" ports: - name: control-api containerPort: 8081 @@ -301,6 +304,11 @@ spec: name: unrip-config - secretRef: name: unrip-secrets + resources: + requests: + memory: 256Mi + limits: + memory: 1280Mi --- apiVersion: apps/v1 kind: Deployment @@ -428,6 +436,9 @@ spec: image: ghcr.io/example/unrip:bootstrap imagePullPolicy: IfNotPresent command: ["node", "src/apps/history-writer.mjs"] + env: + - name: NODE_OPTIONS + value: "--max-old-space-size=896" ports: - name: control-api containerPort: 8085 @@ -436,6 +447,11 @@ spec: name: unrip-config - secretRef: name: unrip-secrets + resources: + requests: + memory: 256Mi + limits: + memory: 1280Mi --- apiVersion: apps/v1 kind: Deployment @@ -570,6 +586,9 @@ spec: image: ghcr.io/example/unrip:bootstrap imagePullPolicy: IfNotPresent command: ["node", "src/apps/operator-dashboard.mjs"] + env: + - name: NODE_OPTIONS + value: "--max-old-space-size=896" ports: - name: http containerPort: 8090 @@ -578,3 +597,8 @@ spec: name: unrip-config - secretRef: name: unrip-secrets + resources: + requests: + memory: 256Mi + limits: + memory: 1280Mi diff --git a/src/venues/near-intents/ws.mjs b/src/venues/near-intents/ws.mjs index cb41096..fb2991a 100644 --- a/src/venues/near-intents/ws.mjs +++ b/src/venues/near-intents/ws.mjs @@ -35,6 +35,7 @@ export async function startNearIntentsWs({ let framesReceived = 0; let quoteFramesReceived = 0; let filteredCount = 0; + let backpressureSkippedCount = 0; let publishErrorCount = 0; let invalidJsonCount = 0; let lastMessageAt = null; @@ -101,44 +102,40 @@ export async function startNearIntentsWs({ if (quoteStatusSubscriptionId && subscription === quoteStatusSubscriptionId) return; if (quoteSubscriptionId && subscription && subscription !== quoteSubscriptionId) return; - if (publishLocked) return; - - const envelope = buildNearIntentsQuoteEnvelope(merged); - const rawEnvelope = buildNearIntentsRawEnvelope(merged); - - try { - await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id }); - rawPublishedCount += 1; - } catch (error) { - publishErrorCount += 1; - logger?.error('raw_publish_failed', { - namespace, - topic: rawTopic, - details: { - error: serializeError(error), - quote_id: rawEnvelope.payload?.message?.quote_id || rawEnvelope.payload?.message?.quote_hash || null, - }, - }); - } - - if (!envelope) return; - assertNormalizedSwapDemand(envelope); - - const assetIn = envelope.payload?.asset_in; - const assetOut = envelope.payload?.asset_out; - if (!assetIn || !assetOut) return; - - const pairAllowed = matchesPair - ? await matchesPair(assetIn, assetOut) - : matchesPairFilter(assetIn, assetOut, getPairFilter()); - if (!pairAllowed) { - filteredCount += 1; + if (publishLocked) { + backpressureSkippedCount += 1; return; } - lastMatchingQuoteAt = new Date().toISOString(); publishLocked = true; + let envelope = null; + let rawEnvelope = null; + let assetIn = null; + let assetOut = null; + let publishTopic = rawTopic; try { + envelope = buildNearIntentsQuoteEnvelope(merged); + rawEnvelope = buildNearIntentsRawEnvelope(merged); + + await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id }); + rawPublishedCount += 1; + if (!envelope) return; + assertNormalizedSwapDemand(envelope); + + assetIn = envelope.payload?.asset_in; + assetOut = envelope.payload?.asset_out; + if (!assetIn || !assetOut) return; + + const pairAllowed = matchesPair + ? await matchesPair(assetIn, assetOut) + : matchesPairFilter(assetIn, assetOut, getPairFilter()); + if (!pairAllowed) { + filteredCount += 1; + return; + } + lastMatchingQuoteAt = new Date().toISOString(); + + publishTopic = normalizedTopic; await producer.sendJson(normalizedTopic, envelope, { key: envelope.payload.quote_id }); publishedCount += 1; lastPublishedAt = new Date().toISOString(); @@ -146,14 +143,17 @@ export async function startNearIntentsWs({ onPublish(envelope, publishedCount); } catch (error) { publishErrorCount += 1; - logger?.error('publish_failed', { + logger?.error(publishTopic === rawTopic ? 'raw_publish_failed' : 'publish_failed', { namespace, - topic: normalizedTopic, - pair: `${assetIn}->${assetOut}`, + topic: publishTopic, + pair: assetIn && assetOut ? `${assetIn}->${assetOut}` : null, details: { raw_topic: rawTopic, error: serializeError(error), - quote_id: envelope.payload?.quote_id, + quote_id: envelope?.payload?.quote_id + || rawEnvelope?.payload?.message?.quote_id + || rawEnvelope?.payload?.message?.quote_hash + || null, }, }); } finally { @@ -221,6 +221,7 @@ export async function startNearIntentsWs({ frames_received: framesReceived, quote_frames_received: quoteFramesReceived, filtered_count: filteredCount, + backpressure_skipped_count: backpressureSkippedCount, raw_published_count: rawPublishedCount, published_count: publishedCount, publish_error_count: publishErrorCount, diff --git a/test/near-intents-ws.test.mjs b/test/near-intents-ws.test.mjs index 056e3ac..b70b4d2 100644 --- a/test/near-intents-ws.test.mjs +++ b/test/near-intents-ws.test.mjs @@ -143,3 +143,67 @@ test('near intents websocket close is reentrant-safe when close emits an error', mock.restore(); } }); + +test('near intents websocket skips quote frames while Kafka publish is backpressured', async () => { + const mock = installMockWebSocket(); + let releaseRawPublish; + const sends = []; + const producer = { + async sendJson(topic, event) { + sends.push({ topic, event }); + if (topic === 'raw.near_intents.quote' && !releaseRawPublish) { + await new Promise((resolve) => { + releaseRawPublish = resolve; + }); + } + }, + }; + const client = await startNearIntentsWs({ + apiKey: 'api-key', + wsUrl: 'wss://relay.example/ws', + pairFilter: ['btc', 'eure'], + producer, + rawTopic: 'raw.near_intents.quote', + normalizedTopic: 'norm.swap_demand', + reconnectDelayMs: 1, + }); + + function quote(quoteId) { + return { + method: 'event', + params: { + data: { + quote_id: quoteId, + defuse_asset_identifier_in: 'btc', + defuse_asset_identifier_out: 'eure', + exact_amount_in: '100', + }, + }, + }; + } + + try { + mock.instances[0].open(); + mock.instances[0].emit('message', { data: JSON.stringify(quote('quote-1')) }); + mock.instances[0].emit('message', { data: JSON.stringify(quote('quote-2')) }); + await delay(5); + + assert.equal(client.getState().backpressure_skipped_count, 1); + assert.deepEqual(sends.map((entry) => entry.event.payload?.quote_id || entry.event.payload?.message?.quote_id), [ + 'quote-1', + ]); + + releaseRawPublish(); + await delay(5); + + assert.equal(client.getState().raw_published_count, 1); + assert.equal(client.getState().published_count, 1); + assert.deepEqual(sends.map((entry) => entry.topic), [ + 'raw.near_intents.quote', + 'norm.swap_demand', + ]); + } finally { + client.close(); + mock.restore(); + } +}); diff --git a/test/unrip-resource-guardrails-static.test.mjs b/test/unrip-resource-guardrails-static.test.mjs new file mode 100644 index 0000000..0cc656f --- /dev/null +++ b/test/unrip-resource-guardrails-static.test.mjs @@ -0,0 +1,23 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; +import { readFileSync } from 'node:fs'; + +const manifest = readFileSync(new URL('../deploy/k8s/base/unrip.yaml', import.meta.url), 'utf8'); + +function deploymentBlock(name) { + const pattern = new RegExp( + `kind: Deployment\\nmetadata:\\n name: ${name}\\n[\\s\\S]*?(?=\\n---\\napiVersion:|\\n?$)`, + ); + const match = manifest.match(pattern); + assert.ok(match, `expected deployment ${name}`); + return match[0]; +} + +for (const name of ['near-intents-ingest', 'history-writer', 'operator-dashboard']) { + test(`${name} has memory guardrails for live quote pressure`, () => { + const block = deploymentBlock(name); + + assert.match(block, /name: NODE_OPTIONS\s+value: "--max-old-space-size=896"/); + assert.match(block, /resources:\s+requests:\s+memory: 256Mi\s+limits:\s+memory: 1280Mi/); + }); +}