unrip/test/near-intents-ws.test.mjs
philipp 82017dd301
Some checks failed
deploy / deploy (push) Failing after 47s
Guard quote ingest against node OOM
Proof: Live investigation showed doran-1 entered NodeNotReady with kubelet SystemOOM and TLS/control-plane timeouts while near-intents-ingest, history-writer, and operator-dashboard were the largest Node memory consumers. This commit adds websocket publish backpressure for the raw quote firehose and pod memory guardrails for the affected services.

Assumptions: Dropping quote frames while Kafka publishing is backpressured is safer than allowing unbounded in-flight publishes to take down the single-node cluster; retained Kafka/Postgres history remains best-effort under overload until the platform has enough capacity for full raw retention.

Still fake: This does not add durable queue spillover for skipped raw websocket frames, does not resize the node, and does not prove fee-complete trading PnL.
2026-05-13 18:25:54 +02:00

209 lines
5.5 KiB
JavaScript

import test from 'node:test';
import assert from 'node:assert/strict';
import { startNearIntentsWs } from '../src/venues/near-intents/ws.mjs';
function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function installMockWebSocket() {
const original = globalThis.WebSocket;
const instances = [];
class MockWebSocket {
static CONNECTING = 0;
static OPEN = 1;
static CLOSING = 2;
static CLOSED = 3;
constructor(url, options) {
this.url = url;
this.options = options;
this.readyState = MockWebSocket.CONNECTING;
this.sent = [];
this.listeners = new Map();
this.closeCalls = 0;
this.emitErrorDuringClose = false;
instances.push(this);
}
addEventListener(type, listener) {
const existing = this.listeners.get(type) || [];
existing.push(listener);
this.listeners.set(type, existing);
}
send(payload) {
this.sent.push(payload);
}
close() {
this.closeCalls += 1;
if (this.readyState === MockWebSocket.CLOSED) return;
if (this.emitErrorDuringClose) {
this.emit('error', new Error('socket failed while closing'));
}
this.readyState = MockWebSocket.CLOSED;
this.emit('close', {});
}
open() {
this.readyState = MockWebSocket.OPEN;
this.emit('open', {});
}
error(error = new Error('socket failed')) {
this.emit('error', error);
}
emit(type, event) {
for (const listener of this.listeners.get(type) || []) listener(event);
}
}
globalThis.WebSocket = MockWebSocket;
return {
instances,
restore() {
globalThis.WebSocket = original;
},
};
}
test('near intents ingest reconnects after websocket error before open without runtime close', async () => {
const mock = installMockWebSocket();
const producer = { sendJson: async () => {} };
const client = await startNearIntentsWs({
apiKey: 'api-key',
wsUrl: 'wss://relay.example/ws',
pairFilter: ['btc', 'eure'],
producer,
rawTopic: 'raw.near_intents.quote',
normalizedTopic: 'norm.swap_demand',
reconnectDelayMs: 1,
});
try {
assert.equal(mock.instances.length, 1);
mock.instances[0].error(new Error('network failed'));
await delay(10);
assert.equal(client.getState().connected, false);
assert.ok(client.getState().reconnect_count >= 2);
assert.ok(mock.instances.length >= 2);
mock.instances[1].open();
assert.equal(client.getState().connected, true);
assert.equal(mock.instances[1].sent.length, 2);
} finally {
client.close();
mock.restore();
}
});
test('near intents websocket close is reentrant-safe when close emits an error', async () => {
const mock = installMockWebSocket();
const errors = [];
const producer = { sendJson: async () => {} };
const logger = {
error: (event) => errors.push(event),
info: () => {},
warn: () => {},
};
const client = await startNearIntentsWs({
apiKey: 'api-key',
wsUrl: 'wss://relay.example/ws',
pairFilter: ['btc', 'eure'],
producer,
rawTopic: 'raw.near_intents.quote',
normalizedTopic: 'norm.swap_demand',
logger,
reconnectDelayMs: 1,
});
try {
assert.equal(mock.instances.length, 1);
mock.instances[0].open();
mock.instances[0].emitErrorDuringClose = true;
assert.doesNotThrow(() => {
mock.instances[0].error(new Error('network failed'));
});
await delay(10);
assert.equal(mock.instances[0].closeCalls, 1);
assert.equal(errors.filter((event) => event === 'socket_error').length, 1);
assert.equal(client.getState().connected, false);
assert.ok(client.getState().reconnect_count >= 2);
assert.ok(mock.instances.length >= 2);
} finally {
client.close();
mock.restore();
}
});
test('near intents websocket skips quote frames while Kafka publish is backpressured', async () => {
const mock = installMockWebSocket();
let releaseRawPublish;
const sends = [];
const producer = {
async sendJson(topic, event) {
sends.push({ topic, event });
if (topic === 'raw.near_intents.quote' && !releaseRawPublish) {
await new Promise((resolve) => {
releaseRawPublish = resolve;
});
}
},
};
const client = await startNearIntentsWs({
apiKey: 'api-key',
wsUrl: 'wss://relay.example/ws',
pairFilter: ['btc', 'eure'],
producer,
rawTopic: 'raw.near_intents.quote',
normalizedTopic: 'norm.swap_demand',
reconnectDelayMs: 1,
});
function quote(quoteId) {
return {
method: 'event',
params: {
data: {
quote_id: quoteId,
defuse_asset_identifier_in: 'btc',
defuse_asset_identifier_out: 'eure',
exact_amount_in: '100',
},
},
};
}
try {
mock.instances[0].open();
mock.instances[0].emit('message', { data: JSON.stringify(quote('quote-1')) });
mock.instances[0].emit('message', { data: JSON.stringify(quote('quote-2')) });
await delay(5);
assert.equal(client.getState().backpressure_skipped_count, 1);
assert.deepEqual(sends.map((entry) => entry.event.payload?.quote_id || entry.event.payload?.message?.quote_id), [
'quote-1',
]);
releaseRawPublish();
await delay(5);
assert.equal(client.getState().raw_published_count, 1);
assert.equal(client.getState().published_count, 1);
assert.deepEqual(sends.map((entry) => entry.topic), [
'raw.near_intents.quote',
'norm.swap_demand',
]);
} finally {
client.close();
mock.restore();
}
});