unrip/test/kafka-topics.test.mjs
philipp eb81f892e2
Some checks failed
deploy / deploy (push) Failing after 31s
Ensure Kafka topics before use
Proof: npm test; npm run operator-dashboard:build; node --test test/kafka-topics.test.mjs test/environment-status-history.test.mjs test/operator-dashboard.test.mjs test/ops-sentinel-static.test.mjs; node --check src/bus/kafka/topics.mjs src/bus/kafka/consumer.mjs src/bus/kafka/producer.mjs src/apps/history-writer.mjs src/apps/operator-dashboard.mjs src/apps/ops-sentinel.mjs; PYTHONPATH=. python3 test/repo_deployments_test.py; kubectl kustomize deploy/k8s/base.

Assumptions: Redpanda admin topic creation is allowed for repo-owned app topics and uses the same one-partition retention policy as the bootstrap Job.

Still fake: Topic self-creation only repairs repo-owned Kafka topic bootstrap drift; it does not change upstream NEAR Intents availability or prove quote settlement.
2026-04-17 14:39:14 +02:00

77 lines
2.6 KiB
JavaScript

import test from 'node:test';
import assert from 'node:assert/strict';
import {
buildKafkaTopicDefinitions,
ensureKafkaTopics,
findMissingTopics,
uniqueTopicNames,
} from '../src/bus/kafka/topics.mjs';
test('Kafka topic helpers dedupe requested topic names and preserve retention config', () => {
assert.deepEqual(uniqueTopicNames(['a', '', 'a', null, 'b']), ['a', 'b']);
assert.deepEqual(findMissingTopics(['a'], ['a', 'b', 'b']), ['b']);
const definitions = buildKafkaTopicDefinitions(['ops.environment_status', 'ops.environment_status'], {
retentionMs: '1000',
retentionBytes: '2000',
});
assert.equal(definitions.length, 1);
assert.equal(definitions[0].topic, 'ops.environment_status');
assert.equal(definitions[0].numPartitions, 1);
assert.equal(definitions[0].replicationFactor, 1);
assert.deepEqual(definitions[0].configEntries, [
{ name: 'retention.ms', value: '1000' },
{ name: 'retention.bytes', value: '2000' },
]);
});
test('Kafka topic ensure creates only missing topics and caches them for later calls', async () => {
const calls = [];
const existingTopics = ['raw.near_intents.quote'];
const admin = {
async connect() {
calls.push(['connect']);
},
async listTopics() {
calls.push(['listTopics']);
return existingTopics;
},
async createTopics(options) {
calls.push(['createTopics', options.topics.map((topic) => topic.topic)]);
existingTopics.push(...options.topics.map((topic) => topic.topic));
return true;
},
async disconnect() {
calls.push(['disconnect']);
},
};
const kafka = { admin: () => admin };
const ensuredTopics = new Set();
const first = await ensureKafkaTopics(kafka, ['raw.near_intents.quote', 'ops.environment_status'], {
ensuredTopics,
});
const second = await ensureKafkaTopics(kafka, ['ops.environment_status'], {
ensuredTopics,
});
assert.deepEqual(first.created, ['ops.environment_status']);
assert.deepEqual(second.created, []);
assert.deepEqual(calls, [
['connect'],
['listTopics'],
['createTopics', ['ops.environment_status']],
['disconnect'],
]);
});
test('Kafka producer and consumer wrappers ensure topics before use', async () => {
const { readFile } = await import('node:fs/promises');
const consumerSource = await readFile(new URL('../src/bus/kafka/consumer.mjs', import.meta.url), 'utf8');
const producerSource = await readFile(new URL('../src/bus/kafka/producer.mjs', import.meta.url), 'utf8');
assert.match(consumerSource, /ensureKafkaTopics\(kafka, \[options\?\.topic\]/);
assert.match(producerSource, /ensureKafkaTopics\(kafka, \[topic\]/);
});