doran/projects/unrip/src/apps/dummy-consumer.mjs

42 lines
1.4 KiB
JavaScript

import process from 'node:process';
import { createConsumer } from '../bus/kafka/consumer.mjs';
import { logStatus } from '../core/log.mjs';
import { parseEventMessage } from '../core/event-envelope.mjs';
import { assertTradeResult } from '../core/schemas.mjs';
import { loadConfig } from '../lib/config.mjs';
const config = loadConfig();
const consumer = await createConsumer({
groupId: `${config.kafkaConsumerGroupExecutor}-results-view`,
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
});
await consumer.subscribe({ topic: config.kafkaTopicExecTradeResult, fromBeginning: false });
logStatus(`result consumer subscribed to ${config.kafkaTopicExecTradeResult}`);
process.on('SIGINT', async () => {
await consumer.disconnect();
process.exit(0);
});
process.on('SIGTERM', async () => {
await consumer.disconnect();
process.exit(0);
});
await consumer.run({
eachMessage: async ({ message }) => {
if (!message.value) return;
let event;
try {
event = parseEventMessage(message.value.toString());
} catch {
logStatus('result consumer received non-JSON message; skipping');
return;
}
assertTradeResult(event);
const payload = event.payload;
console.log(`[result] command_id=${payload.command_id} quote_id=${payload.quote_id} status=${payload.status} result_code=${payload.result_code || 'n/a'}`);
},
});