Some checks failed
deploy / deploy (push) Failing after 31s
Proof: npm test; npm run operator-dashboard:build; node --test test/kafka-topics.test.mjs test/environment-status-history.test.mjs test/operator-dashboard.test.mjs test/ops-sentinel-static.test.mjs; node --check src/bus/kafka/topics.mjs src/bus/kafka/consumer.mjs src/bus/kafka/producer.mjs src/apps/history-writer.mjs src/apps/operator-dashboard.mjs src/apps/ops-sentinel.mjs; PYTHONPATH=. python3 test/repo_deployments_test.py; kubectl kustomize deploy/k8s/base. Assumptions: Redpanda admin topic creation is allowed for repo-owned app topics and uses the same one-partition retention policy as the bootstrap Job. Still fake: Topic self-creation only repairs repo-owned Kafka topic bootstrap drift; it does not change upstream NEAR Intents availability or prove quote settlement.
77 lines
2.6 KiB
JavaScript
77 lines
2.6 KiB
JavaScript
import test from 'node:test';
|
|
import assert from 'node:assert/strict';
|
|
|
|
import {
|
|
buildKafkaTopicDefinitions,
|
|
ensureKafkaTopics,
|
|
findMissingTopics,
|
|
uniqueTopicNames,
|
|
} from '../src/bus/kafka/topics.mjs';
|
|
|
|
test('Kafka topic helpers dedupe requested topic names and preserve retention config', () => {
|
|
assert.deepEqual(uniqueTopicNames(['a', '', 'a', null, 'b']), ['a', 'b']);
|
|
assert.deepEqual(findMissingTopics(['a'], ['a', 'b', 'b']), ['b']);
|
|
|
|
const definitions = buildKafkaTopicDefinitions(['ops.environment_status', 'ops.environment_status'], {
|
|
retentionMs: '1000',
|
|
retentionBytes: '2000',
|
|
});
|
|
|
|
assert.equal(definitions.length, 1);
|
|
assert.equal(definitions[0].topic, 'ops.environment_status');
|
|
assert.equal(definitions[0].numPartitions, 1);
|
|
assert.equal(definitions[0].replicationFactor, 1);
|
|
assert.deepEqual(definitions[0].configEntries, [
|
|
{ name: 'retention.ms', value: '1000' },
|
|
{ name: 'retention.bytes', value: '2000' },
|
|
]);
|
|
});
|
|
|
|
test('Kafka topic ensure creates only missing topics and caches them for later calls', async () => {
|
|
const calls = [];
|
|
const existingTopics = ['raw.near_intents.quote'];
|
|
const admin = {
|
|
async connect() {
|
|
calls.push(['connect']);
|
|
},
|
|
async listTopics() {
|
|
calls.push(['listTopics']);
|
|
return existingTopics;
|
|
},
|
|
async createTopics(options) {
|
|
calls.push(['createTopics', options.topics.map((topic) => topic.topic)]);
|
|
existingTopics.push(...options.topics.map((topic) => topic.topic));
|
|
return true;
|
|
},
|
|
async disconnect() {
|
|
calls.push(['disconnect']);
|
|
},
|
|
};
|
|
const kafka = { admin: () => admin };
|
|
const ensuredTopics = new Set();
|
|
|
|
const first = await ensureKafkaTopics(kafka, ['raw.near_intents.quote', 'ops.environment_status'], {
|
|
ensuredTopics,
|
|
});
|
|
const second = await ensureKafkaTopics(kafka, ['ops.environment_status'], {
|
|
ensuredTopics,
|
|
});
|
|
|
|
assert.deepEqual(first.created, ['ops.environment_status']);
|
|
assert.deepEqual(second.created, []);
|
|
assert.deepEqual(calls, [
|
|
['connect'],
|
|
['listTopics'],
|
|
['createTopics', ['ops.environment_status']],
|
|
['disconnect'],
|
|
]);
|
|
});
|
|
|
|
test('Kafka producer and consumer wrappers ensure topics before use', async () => {
|
|
const { readFile } = await import('node:fs/promises');
|
|
const consumerSource = await readFile(new URL('../src/bus/kafka/consumer.mjs', import.meta.url), 'utf8');
|
|
const producerSource = await readFile(new URL('../src/bus/kafka/producer.mjs', import.meta.url), 'utf8');
|
|
|
|
assert.match(consumerSource, /ensureKafkaTopics\(kafka, \[options\?\.topic\]/);
|
|
assert.match(producerSource, /ensureKafkaTopics\(kafka, \[topic\]/);
|
|
});
|