Ensure Kafka topics before use
Some checks failed
deploy / deploy (push) Failing after 31s

Proof: npm test; npm run operator-dashboard:build; node --test test/kafka-topics.test.mjs test/environment-status-history.test.mjs test/operator-dashboard.test.mjs test/ops-sentinel-static.test.mjs; node --check src/bus/kafka/topics.mjs src/bus/kafka/consumer.mjs src/bus/kafka/producer.mjs src/apps/history-writer.mjs src/apps/operator-dashboard.mjs src/apps/ops-sentinel.mjs; PYTHONPATH=. python3 test/repo_deployments_test.py; kubectl kustomize deploy/k8s/base.

Assumptions: Redpanda admin topic creation is allowed for repo-owned app topics and uses the same one-partition retention policy as the bootstrap Job.

Still fake: Topic self-creation only repairs repo-owned Kafka topic bootstrap drift; it does not change upstream NEAR Intents availability or prove quote settlement.
This commit is contained in:
philipp 2026-04-17 14:39:14 +02:00
parent 601450c664
commit eb81f892e2
4 changed files with 185 additions and 3 deletions

View file

@ -1,5 +1,6 @@
import { Kafka, logLevel } from 'kafkajs'; import { Kafka, logLevel } from 'kafkajs';
import { serializeError } from '../../core/log.mjs'; import { serializeError } from '../../core/log.mjs';
import { ensureKafkaTopics } from './topics.mjs';
function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) { function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) {
return new Kafka({ return new Kafka({
@ -10,7 +11,9 @@ function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {})
} }
export async function createConsumer({ groupId, logger, ...options }) { export async function createConsumer({ groupId, logger, ...options }) {
const consumer = createKafka(options).consumer({ groupId }); const kafka = createKafka(options);
const consumer = kafka.consumer({ groupId });
const ensuredTopics = new Set();
const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null; const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null;
consumer.on(consumer.events.CONNECT, () => { consumer.on(consumer.events.CONNECT, () => {
@ -45,7 +48,13 @@ export async function createConsumer({ groupId, logger, ...options }) {
await consumer.connect(); await consumer.connect();
return { return {
subscribe: (options) => consumer.subscribe(options), async subscribe(options) {
await ensureKafkaTopics(kafka, [options?.topic], {
logger,
ensuredTopics,
});
return consumer.subscribe(options);
},
run: (options) => consumer.run(options), run: (options) => consumer.run(options),
pause: (topics) => consumer.pause(topics), pause: (topics) => consumer.pause(topics),
resume: (topics) => consumer.resume(topics), resume: (topics) => consumer.resume(topics),

View file

@ -1,5 +1,6 @@
import { Kafka, logLevel } from 'kafkajs'; import { Kafka, logLevel } from 'kafkajs';
import { serializeError } from '../../core/log.mjs'; import { serializeError } from '../../core/log.mjs';
import { ensureKafkaTopics } from './topics.mjs';
process.env.KAFKAJS_NO_PARTITIONER_WARNING ??= '1'; process.env.KAFKAJS_NO_PARTITIONER_WARNING ??= '1';
@ -12,7 +13,9 @@ function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {})
} }
export async function createProducer({ logger, ...options } = {}) { export async function createProducer({ logger, ...options } = {}) {
const producer = createKafka(options).producer(); const kafka = createKafka(options);
const producer = kafka.producer();
const ensuredTopics = new Set();
const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null; const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null;
producer.on(producer.events.CONNECT, () => { producer.on(producer.events.CONNECT, () => {
@ -45,6 +48,10 @@ export async function createProducer({ logger, ...options } = {}) {
await producer.connect(); await producer.connect();
return { return {
async sendJson(topic, event, { key = event?.event_id ?? event?.key ?? null } = {}) { async sendJson(topic, event, { key = event?.event_id ?? event?.key ?? null } = {}) {
await ensureKafkaTopics(kafka, [topic], {
logger,
ensuredTopics,
});
await producer.send({ await producer.send({
topic, topic,
messages: [{ key, value: JSON.stringify(event) }], messages: [{ key, value: JSON.stringify(event) }],

89
src/bus/kafka/topics.mjs Normal file
View file

@ -0,0 +1,89 @@
import { serializeError } from '../../core/log.mjs';
const DEFAULT_RETENTION_MS = '172800000';
const DEFAULT_RETENTION_BYTES = '268435456';
export function uniqueTopicNames(topics = []) {
return [...new Set((topics || []).filter((topic) => typeof topic === 'string' && topic.length > 0))];
}
export function buildKafkaTopicDefinitions(topics = [], {
numPartitions = 1,
replicationFactor = 1,
retentionMs = DEFAULT_RETENTION_MS,
retentionBytes = DEFAULT_RETENTION_BYTES,
} = {}) {
return uniqueTopicNames(topics).map((topic) => ({
topic,
numPartitions,
replicationFactor,
configEntries: [
{ name: 'retention.ms', value: String(retentionMs) },
{ name: 'retention.bytes', value: String(retentionBytes) },
],
}));
}
export function findMissingTopics(existingTopics = [], requestedTopics = []) {
const existing = new Set(existingTopics || []);
return uniqueTopicNames(requestedTopics).filter((topic) => !existing.has(topic));
}
export async function ensureKafkaTopics(kafka, topics = [], {
logger = null,
ensuredTopics = null,
topicOptions = {},
} = {}) {
const requestedTopics = uniqueTopicNames(topics).filter((topic) => !ensuredTopics?.has(topic));
if (requestedTopics.length === 0) return { created: [], missing: [] };
const admin = kafka.admin();
const kafkaLogger = logger ? logger.child({ component: 'kafka-admin' }) : null;
try {
await admin.connect();
const existingTopics = await admin.listTopics();
const missingTopics = findMissingTopics(existingTopics, requestedTopics);
if (missingTopics.length > 0) {
const topicDefinitions = buildKafkaTopicDefinitions(missingTopics, topicOptions);
try {
await admin.createTopics({
waitForLeaders: true,
topics: topicDefinitions,
});
} catch (error) {
if (!isTopicAlreadyExistsError(error)) {
throw error;
}
}
kafkaLogger?.info('kafka_topics_created', {
details: {
topics: missingTopics,
},
});
}
for (const topic of requestedTopics) {
ensuredTopics?.add(topic);
}
return { created: missingTopics, missing: missingTopics };
} catch (error) {
kafkaLogger?.error('kafka_topics_ensure_failed', {
details: {
topics: requestedTopics,
error: serializeError(error),
},
});
throw error;
} finally {
await admin.disconnect().catch(() => {});
}
}
function isTopicAlreadyExistsError(error) {
return error?.type === 'TOPIC_ALREADY_EXISTS'
|| error?.code === 36
|| /already exists/i.test(String(error?.message || ''));
}

View file

@ -0,0 +1,77 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import {
buildKafkaTopicDefinitions,
ensureKafkaTopics,
findMissingTopics,
uniqueTopicNames,
} from '../src/bus/kafka/topics.mjs';
test('Kafka topic helpers dedupe requested topic names and preserve retention config', () => {
assert.deepEqual(uniqueTopicNames(['a', '', 'a', null, 'b']), ['a', 'b']);
assert.deepEqual(findMissingTopics(['a'], ['a', 'b', 'b']), ['b']);
const definitions = buildKafkaTopicDefinitions(['ops.environment_status', 'ops.environment_status'], {
retentionMs: '1000',
retentionBytes: '2000',
});
assert.equal(definitions.length, 1);
assert.equal(definitions[0].topic, 'ops.environment_status');
assert.equal(definitions[0].numPartitions, 1);
assert.equal(definitions[0].replicationFactor, 1);
assert.deepEqual(definitions[0].configEntries, [
{ name: 'retention.ms', value: '1000' },
{ name: 'retention.bytes', value: '2000' },
]);
});
test('Kafka topic ensure creates only missing topics and caches them for later calls', async () => {
const calls = [];
const existingTopics = ['raw.near_intents.quote'];
const admin = {
async connect() {
calls.push(['connect']);
},
async listTopics() {
calls.push(['listTopics']);
return existingTopics;
},
async createTopics(options) {
calls.push(['createTopics', options.topics.map((topic) => topic.topic)]);
existingTopics.push(...options.topics.map((topic) => topic.topic));
return true;
},
async disconnect() {
calls.push(['disconnect']);
},
};
const kafka = { admin: () => admin };
const ensuredTopics = new Set();
const first = await ensureKafkaTopics(kafka, ['raw.near_intents.quote', 'ops.environment_status'], {
ensuredTopics,
});
const second = await ensureKafkaTopics(kafka, ['ops.environment_status'], {
ensuredTopics,
});
assert.deepEqual(first.created, ['ops.environment_status']);
assert.deepEqual(second.created, []);
assert.deepEqual(calls, [
['connect'],
['listTopics'],
['createTopics', ['ops.environment_status']],
['disconnect'],
]);
});
test('Kafka producer and consumer wrappers ensure topics before use', async () => {
const { readFile } = await import('node:fs/promises');
const consumerSource = await readFile(new URL('../src/bus/kafka/consumer.mjs', import.meta.url), 'utf8');
const producerSource = await readFile(new URL('../src/bus/kafka/producer.mjs', import.meta.url), 'utf8');
assert.match(consumerSource, /ensureKafkaTopics\(kafka, \[options\?\.topic\]/);
assert.match(producerSource, /ensureKafkaTopics\(kafka, \[topic\]/);
});