import test from 'node:test'; import assert from 'node:assert/strict'; import { buildKafkaTopicDefinitions, ensureKafkaTopics, findMissingTopics, uniqueTopicNames, } from '../src/bus/kafka/topics.mjs'; test('Kafka topic helpers dedupe requested topic names and preserve retention config', () => { assert.deepEqual(uniqueTopicNames(['a', '', 'a', null, 'b']), ['a', 'b']); assert.deepEqual(findMissingTopics(['a'], ['a', 'b', 'b']), ['b']); const definitions = buildKafkaTopicDefinitions(['ops.environment_status', 'ops.environment_status'], { retentionMs: '1000', retentionBytes: '2000', }); assert.equal(definitions.length, 1); assert.equal(definitions[0].topic, 'ops.environment_status'); assert.equal(definitions[0].numPartitions, 1); assert.equal(definitions[0].replicationFactor, 1); assert.deepEqual(definitions[0].configEntries, [ { name: 'retention.ms', value: '1000' }, { name: 'retention.bytes', value: '2000' }, ]); }); test('Kafka topic ensure creates only missing topics and caches them for later calls', async () => { const calls = []; const existingTopics = ['raw.near_intents.quote']; const admin = { async connect() { calls.push(['connect']); }, async listTopics() { calls.push(['listTopics']); return existingTopics; }, async createTopics(options) { calls.push(['createTopics', options.topics.map((topic) => topic.topic)]); existingTopics.push(...options.topics.map((topic) => topic.topic)); return true; }, async disconnect() { calls.push(['disconnect']); }, }; const kafka = { admin: () => admin }; const ensuredTopics = new Set(); const first = await ensureKafkaTopics(kafka, ['raw.near_intents.quote', 'ops.environment_status'], { ensuredTopics, }); const second = await ensureKafkaTopics(kafka, ['ops.environment_status'], { ensuredTopics, }); assert.deepEqual(first.created, ['ops.environment_status']); assert.deepEqual(second.created, []); assert.deepEqual(calls, [ ['connect'], ['listTopics'], ['createTopics', ['ops.environment_status']], ['disconnect'], ]); }); test('Kafka producer and consumer wrappers ensure topics before use', async () => { const { readFile } = await import('node:fs/promises'); const consumerSource = await readFile(new URL('../src/bus/kafka/consumer.mjs', import.meta.url), 'utf8'); const producerSource = await readFile(new URL('../src/bus/kafka/producer.mjs', import.meta.url), 'utf8'); assert.match(consumerSource, /ensureKafkaTopics\(kafka, \[options\?\.topic\]/); assert.match(producerSource, /ensureKafkaTopics\(kafka, \[topic\]/); });