doran/src/bus/kafka/producer.mjs
2026-03-28 20:53:29 +01:00

21 lines
588 B
JavaScript

import { Kafka } from 'kafkajs';
function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) {
return new Kafka({ clientId, brokers });
}
export async function createProducer(options = {}) {
const producer = createKafka(options).producer();
await producer.connect();
return {
async sendJson(topic, event, { key = event?.event_id ?? event?.key ?? null } = {}) {
await producer.send({
topic,
messages: [{ key, value: JSON.stringify(event) }],
});
},
async disconnect() {
await producer.disconnect();
},
};
}