Guard quote ingest against node OOM
Some checks failed
deploy / deploy (push) Failing after 47s

Proof: Live investigation showed doran-1 entered NodeNotReady with kubelet SystemOOM and TLS/control-plane timeouts while near-intents-ingest, history-writer, and operator-dashboard were the largest Node memory consumers. This commit adds websocket publish backpressure for the raw quote firehose and pod memory guardrails for the affected services.

Assumptions: Dropping quote frames while Kafka publishing is backpressured is safer than allowing unbounded in-flight publishes to take down the single-node cluster; retained Kafka/Postgres history remains best-effort under overload until the platform has enough capacity for full raw retention.

Still fake: This does not add durable queue spillover for skipped raw websocket frames, does not resize the node, and does not prove fee-complete trading PnL.
This commit is contained in:
philipp 2026-05-13 18:25:54 +02:00
parent 3cd88c682e
commit 82017dd301
4 changed files with 149 additions and 37 deletions

View file

@ -293,6 +293,9 @@ spec:
image: ghcr.io/example/unrip:bootstrap image: ghcr.io/example/unrip:bootstrap
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
command: ["node", "src/apps/near-intents-ingest.mjs"] command: ["node", "src/apps/near-intents-ingest.mjs"]
env:
- name: NODE_OPTIONS
value: "--max-old-space-size=896"
ports: ports:
- name: control-api - name: control-api
containerPort: 8081 containerPort: 8081
@ -301,6 +304,11 @@ spec:
name: unrip-config name: unrip-config
- secretRef: - secretRef:
name: unrip-secrets name: unrip-secrets
resources:
requests:
memory: 256Mi
limits:
memory: 1280Mi
--- ---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
@ -428,6 +436,9 @@ spec:
image: ghcr.io/example/unrip:bootstrap image: ghcr.io/example/unrip:bootstrap
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
command: ["node", "src/apps/history-writer.mjs"] command: ["node", "src/apps/history-writer.mjs"]
env:
- name: NODE_OPTIONS
value: "--max-old-space-size=896"
ports: ports:
- name: control-api - name: control-api
containerPort: 8085 containerPort: 8085
@ -436,6 +447,11 @@ spec:
name: unrip-config name: unrip-config
- secretRef: - secretRef:
name: unrip-secrets name: unrip-secrets
resources:
requests:
memory: 256Mi
limits:
memory: 1280Mi
--- ---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
@ -570,6 +586,9 @@ spec:
image: ghcr.io/example/unrip:bootstrap image: ghcr.io/example/unrip:bootstrap
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
command: ["node", "src/apps/operator-dashboard.mjs"] command: ["node", "src/apps/operator-dashboard.mjs"]
env:
- name: NODE_OPTIONS
value: "--max-old-space-size=896"
ports: ports:
- name: http - name: http
containerPort: 8090 containerPort: 8090
@ -578,3 +597,8 @@ spec:
name: unrip-config name: unrip-config
- secretRef: - secretRef:
name: unrip-secrets name: unrip-secrets
resources:
requests:
memory: 256Mi
limits:
memory: 1280Mi

View file

@ -35,6 +35,7 @@ export async function startNearIntentsWs({
let framesReceived = 0; let framesReceived = 0;
let quoteFramesReceived = 0; let quoteFramesReceived = 0;
let filteredCount = 0; let filteredCount = 0;
let backpressureSkippedCount = 0;
let publishErrorCount = 0; let publishErrorCount = 0;
let invalidJsonCount = 0; let invalidJsonCount = 0;
let lastMessageAt = null; let lastMessageAt = null;
@ -101,44 +102,40 @@ export async function startNearIntentsWs({
if (quoteStatusSubscriptionId && subscription === quoteStatusSubscriptionId) return; if (quoteStatusSubscriptionId && subscription === quoteStatusSubscriptionId) return;
if (quoteSubscriptionId && subscription && subscription !== quoteSubscriptionId) return; if (quoteSubscriptionId && subscription && subscription !== quoteSubscriptionId) return;
if (publishLocked) return; if (publishLocked) {
backpressureSkippedCount += 1;
const envelope = buildNearIntentsQuoteEnvelope(merged);
const rawEnvelope = buildNearIntentsRawEnvelope(merged);
try {
await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id });
rawPublishedCount += 1;
} catch (error) {
publishErrorCount += 1;
logger?.error('raw_publish_failed', {
namespace,
topic: rawTopic,
details: {
error: serializeError(error),
quote_id: rawEnvelope.payload?.message?.quote_id || rawEnvelope.payload?.message?.quote_hash || null,
},
});
}
if (!envelope) return;
assertNormalizedSwapDemand(envelope);
const assetIn = envelope.payload?.asset_in;
const assetOut = envelope.payload?.asset_out;
if (!assetIn || !assetOut) return;
const pairAllowed = matchesPair
? await matchesPair(assetIn, assetOut)
: matchesPairFilter(assetIn, assetOut, getPairFilter());
if (!pairAllowed) {
filteredCount += 1;
return; return;
} }
lastMatchingQuoteAt = new Date().toISOString();
publishLocked = true; publishLocked = true;
let envelope = null;
let rawEnvelope = null;
let assetIn = null;
let assetOut = null;
let publishTopic = rawTopic;
try { try {
envelope = buildNearIntentsQuoteEnvelope(merged);
rawEnvelope = buildNearIntentsRawEnvelope(merged);
await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id });
rawPublishedCount += 1;
if (!envelope) return;
assertNormalizedSwapDemand(envelope);
assetIn = envelope.payload?.asset_in;
assetOut = envelope.payload?.asset_out;
if (!assetIn || !assetOut) return;
const pairAllowed = matchesPair
? await matchesPair(assetIn, assetOut)
: matchesPairFilter(assetIn, assetOut, getPairFilter());
if (!pairAllowed) {
filteredCount += 1;
return;
}
lastMatchingQuoteAt = new Date().toISOString();
publishTopic = normalizedTopic;
await producer.sendJson(normalizedTopic, envelope, { key: envelope.payload.quote_id }); await producer.sendJson(normalizedTopic, envelope, { key: envelope.payload.quote_id });
publishedCount += 1; publishedCount += 1;
lastPublishedAt = new Date().toISOString(); lastPublishedAt = new Date().toISOString();
@ -146,14 +143,17 @@ export async function startNearIntentsWs({
onPublish(envelope, publishedCount); onPublish(envelope, publishedCount);
} catch (error) { } catch (error) {
publishErrorCount += 1; publishErrorCount += 1;
logger?.error('publish_failed', { logger?.error(publishTopic === rawTopic ? 'raw_publish_failed' : 'publish_failed', {
namespace, namespace,
topic: normalizedTopic, topic: publishTopic,
pair: `${assetIn}->${assetOut}`, pair: assetIn && assetOut ? `${assetIn}->${assetOut}` : null,
details: { details: {
raw_topic: rawTopic, raw_topic: rawTopic,
error: serializeError(error), error: serializeError(error),
quote_id: envelope.payload?.quote_id, quote_id: envelope?.payload?.quote_id
|| rawEnvelope?.payload?.message?.quote_id
|| rawEnvelope?.payload?.message?.quote_hash
|| null,
}, },
}); });
} finally { } finally {
@ -221,6 +221,7 @@ export async function startNearIntentsWs({
frames_received: framesReceived, frames_received: framesReceived,
quote_frames_received: quoteFramesReceived, quote_frames_received: quoteFramesReceived,
filtered_count: filteredCount, filtered_count: filteredCount,
backpressure_skipped_count: backpressureSkippedCount,
raw_published_count: rawPublishedCount, raw_published_count: rawPublishedCount,
published_count: publishedCount, published_count: publishedCount,
publish_error_count: publishErrorCount, publish_error_count: publishErrorCount,

View file

@ -143,3 +143,67 @@ test('near intents websocket close is reentrant-safe when close emits an error',
mock.restore(); mock.restore();
} }
}); });
test('near intents websocket skips quote frames while Kafka publish is backpressured', async () => {
const mock = installMockWebSocket();
let releaseRawPublish;
const sends = [];
const producer = {
async sendJson(topic, event) {
sends.push({ topic, event });
if (topic === 'raw.near_intents.quote' && !releaseRawPublish) {
await new Promise((resolve) => {
releaseRawPublish = resolve;
});
}
},
};
const client = await startNearIntentsWs({
apiKey: 'api-key',
wsUrl: 'wss://relay.example/ws',
pairFilter: ['btc', 'eure'],
producer,
rawTopic: 'raw.near_intents.quote',
normalizedTopic: 'norm.swap_demand',
reconnectDelayMs: 1,
});
function quote(quoteId) {
return {
method: 'event',
params: {
data: {
quote_id: quoteId,
defuse_asset_identifier_in: 'btc',
defuse_asset_identifier_out: 'eure',
exact_amount_in: '100',
},
},
};
}
try {
mock.instances[0].open();
mock.instances[0].emit('message', { data: JSON.stringify(quote('quote-1')) });
mock.instances[0].emit('message', { data: JSON.stringify(quote('quote-2')) });
await delay(5);
assert.equal(client.getState().backpressure_skipped_count, 1);
assert.deepEqual(sends.map((entry) => entry.event.payload?.quote_id || entry.event.payload?.message?.quote_id), [
'quote-1',
]);
releaseRawPublish();
await delay(5);
assert.equal(client.getState().raw_published_count, 1);
assert.equal(client.getState().published_count, 1);
assert.deepEqual(sends.map((entry) => entry.topic), [
'raw.near_intents.quote',
'norm.swap_demand',
]);
} finally {
client.close();
mock.restore();
}
});

View file

@ -0,0 +1,23 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { readFileSync } from 'node:fs';
const manifest = readFileSync(new URL('../deploy/k8s/base/unrip.yaml', import.meta.url), 'utf8');
function deploymentBlock(name) {
const pattern = new RegExp(
`kind: Deployment\\nmetadata:\\n name: ${name}\\n[\\s\\S]*?(?=\\n---\\napiVersion:|\\n?$)`,
);
const match = manifest.match(pattern);
assert.ok(match, `expected deployment ${name}`);
return match[0];
}
for (const name of ['near-intents-ingest', 'history-writer', 'operator-dashboard']) {
test(`${name} has memory guardrails for live quote pressure`, () => {
const block = deploymentBlock(name);
assert.match(block, /name: NODE_OPTIONS\s+value: "--max-old-space-size=896"/);
assert.match(block, /resources:\s+requests:\s+memory: 256Mi\s+limits:\s+memory: 1280Mi/);
});
}