diff --git a/src/bus/kafka/consumer.mjs b/src/bus/kafka/consumer.mjs index dcd58c1..0f8d8f3 100644 --- a/src/bus/kafka/consumer.mjs +++ b/src/bus/kafka/consumer.mjs @@ -1,5 +1,6 @@ import { Kafka, logLevel } from 'kafkajs'; import { serializeError } from '../../core/log.mjs'; +import { ensureKafkaTopics } from './topics.mjs'; function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) { return new Kafka({ @@ -10,7 +11,9 @@ function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) } export async function createConsumer({ groupId, logger, ...options }) { - const consumer = createKafka(options).consumer({ groupId }); + const kafka = createKafka(options); + const consumer = kafka.consumer({ groupId }); + const ensuredTopics = new Set(); const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null; consumer.on(consumer.events.CONNECT, () => { @@ -45,7 +48,13 @@ export async function createConsumer({ groupId, logger, ...options }) { await consumer.connect(); return { - subscribe: (options) => consumer.subscribe(options), + async subscribe(options) { + await ensureKafkaTopics(kafka, [options?.topic], { + logger, + ensuredTopics, + }); + return consumer.subscribe(options); + }, run: (options) => consumer.run(options), pause: (topics) => consumer.pause(topics), resume: (topics) => consumer.resume(topics), diff --git a/src/bus/kafka/producer.mjs b/src/bus/kafka/producer.mjs index 3f07de0..c033b46 100644 --- a/src/bus/kafka/producer.mjs +++ b/src/bus/kafka/producer.mjs @@ -1,5 +1,6 @@ import { Kafka, logLevel } from 'kafkajs'; import { serializeError } from '../../core/log.mjs'; +import { ensureKafkaTopics } from './topics.mjs'; process.env.KAFKAJS_NO_PARTITIONER_WARNING ??= '1'; @@ -12,7 +13,9 @@ function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) } export async function createProducer({ logger, ...options } = {}) { - const producer = createKafka(options).producer(); + const kafka = createKafka(options); + const producer = kafka.producer(); + const ensuredTopics = new Set(); const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null; producer.on(producer.events.CONNECT, () => { @@ -45,6 +48,10 @@ export async function createProducer({ logger, ...options } = {}) { await producer.connect(); return { async sendJson(topic, event, { key = event?.event_id ?? event?.key ?? null } = {}) { + await ensureKafkaTopics(kafka, [topic], { + logger, + ensuredTopics, + }); await producer.send({ topic, messages: [{ key, value: JSON.stringify(event) }], diff --git a/src/bus/kafka/topics.mjs b/src/bus/kafka/topics.mjs new file mode 100644 index 0000000..5e24897 --- /dev/null +++ b/src/bus/kafka/topics.mjs @@ -0,0 +1,89 @@ +import { serializeError } from '../../core/log.mjs'; + +const DEFAULT_RETENTION_MS = '172800000'; +const DEFAULT_RETENTION_BYTES = '268435456'; + +export function uniqueTopicNames(topics = []) { + return [...new Set((topics || []).filter((topic) => typeof topic === 'string' && topic.length > 0))]; +} + +export function buildKafkaTopicDefinitions(topics = [], { + numPartitions = 1, + replicationFactor = 1, + retentionMs = DEFAULT_RETENTION_MS, + retentionBytes = DEFAULT_RETENTION_BYTES, +} = {}) { + return uniqueTopicNames(topics).map((topic) => ({ + topic, + numPartitions, + replicationFactor, + configEntries: [ + { name: 'retention.ms', value: String(retentionMs) }, + { name: 'retention.bytes', value: String(retentionBytes) }, + ], + })); +} + +export function findMissingTopics(existingTopics = [], requestedTopics = []) { + const existing = new Set(existingTopics || []); + return uniqueTopicNames(requestedTopics).filter((topic) => !existing.has(topic)); +} + +export async function ensureKafkaTopics(kafka, topics = [], { + logger = null, + ensuredTopics = null, + topicOptions = {}, +} = {}) { + const requestedTopics = uniqueTopicNames(topics).filter((topic) => !ensuredTopics?.has(topic)); + if (requestedTopics.length === 0) return { created: [], missing: [] }; + + const admin = kafka.admin(); + const kafkaLogger = logger ? logger.child({ component: 'kafka-admin' }) : null; + + try { + await admin.connect(); + const existingTopics = await admin.listTopics(); + const missingTopics = findMissingTopics(existingTopics, requestedTopics); + + if (missingTopics.length > 0) { + const topicDefinitions = buildKafkaTopicDefinitions(missingTopics, topicOptions); + try { + await admin.createTopics({ + waitForLeaders: true, + topics: topicDefinitions, + }); + } catch (error) { + if (!isTopicAlreadyExistsError(error)) { + throw error; + } + } + kafkaLogger?.info('kafka_topics_created', { + details: { + topics: missingTopics, + }, + }); + } + + for (const topic of requestedTopics) { + ensuredTopics?.add(topic); + } + + return { created: missingTopics, missing: missingTopics }; + } catch (error) { + kafkaLogger?.error('kafka_topics_ensure_failed', { + details: { + topics: requestedTopics, + error: serializeError(error), + }, + }); + throw error; + } finally { + await admin.disconnect().catch(() => {}); + } +} + +function isTopicAlreadyExistsError(error) { + return error?.type === 'TOPIC_ALREADY_EXISTS' + || error?.code === 36 + || /already exists/i.test(String(error?.message || '')); +} diff --git a/test/kafka-topics.test.mjs b/test/kafka-topics.test.mjs new file mode 100644 index 0000000..46678e9 --- /dev/null +++ b/test/kafka-topics.test.mjs @@ -0,0 +1,77 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; + +import { + buildKafkaTopicDefinitions, + ensureKafkaTopics, + findMissingTopics, + uniqueTopicNames, +} from '../src/bus/kafka/topics.mjs'; + +test('Kafka topic helpers dedupe requested topic names and preserve retention config', () => { + assert.deepEqual(uniqueTopicNames(['a', '', 'a', null, 'b']), ['a', 'b']); + assert.deepEqual(findMissingTopics(['a'], ['a', 'b', 'b']), ['b']); + + const definitions = buildKafkaTopicDefinitions(['ops.environment_status', 'ops.environment_status'], { + retentionMs: '1000', + retentionBytes: '2000', + }); + + assert.equal(definitions.length, 1); + assert.equal(definitions[0].topic, 'ops.environment_status'); + assert.equal(definitions[0].numPartitions, 1); + assert.equal(definitions[0].replicationFactor, 1); + assert.deepEqual(definitions[0].configEntries, [ + { name: 'retention.ms', value: '1000' }, + { name: 'retention.bytes', value: '2000' }, + ]); +}); + +test('Kafka topic ensure creates only missing topics and caches them for later calls', async () => { + const calls = []; + const existingTopics = ['raw.near_intents.quote']; + const admin = { + async connect() { + calls.push(['connect']); + }, + async listTopics() { + calls.push(['listTopics']); + return existingTopics; + }, + async createTopics(options) { + calls.push(['createTopics', options.topics.map((topic) => topic.topic)]); + existingTopics.push(...options.topics.map((topic) => topic.topic)); + return true; + }, + async disconnect() { + calls.push(['disconnect']); + }, + }; + const kafka = { admin: () => admin }; + const ensuredTopics = new Set(); + + const first = await ensureKafkaTopics(kafka, ['raw.near_intents.quote', 'ops.environment_status'], { + ensuredTopics, + }); + const second = await ensureKafkaTopics(kafka, ['ops.environment_status'], { + ensuredTopics, + }); + + assert.deepEqual(first.created, ['ops.environment_status']); + assert.deepEqual(second.created, []); + assert.deepEqual(calls, [ + ['connect'], + ['listTopics'], + ['createTopics', ['ops.environment_status']], + ['disconnect'], + ]); +}); + +test('Kafka producer and consumer wrappers ensure topics before use', async () => { + const { readFile } = await import('node:fs/promises'); + const consumerSource = await readFile(new URL('../src/bus/kafka/consumer.mjs', import.meta.url), 'utf8'); + const producerSource = await readFile(new URL('../src/bus/kafka/producer.mjs', import.meta.url), 'utf8'); + + assert.match(consumerSource, /ensureKafkaTopics\(kafka, \[options\?\.topic\]/); + assert.match(producerSource, /ensureKafkaTopics\(kafka, \[topic\]/); +});