212 lines
5.8 KiB
JavaScript
212 lines
5.8 KiB
JavaScript
const {
|
|
createLogger,
|
|
LEVELS: { INFO },
|
|
} = require('./loggers')
|
|
|
|
const InstrumentationEventEmitter = require('./instrumentation/emitter')
|
|
const LoggerConsole = require('./loggers/console')
|
|
const Cluster = require('./cluster')
|
|
const createProducer = require('./producer')
|
|
const createConsumer = require('./consumer')
|
|
const createAdmin = require('./admin')
|
|
const ISOLATION_LEVEL = require('./protocol/isolationLevel')
|
|
const defaultSocketFactory = require('./network/socketFactory')
|
|
const once = require('./utils/once')
|
|
const websiteUrl = require('./utils/websiteUrl')
|
|
|
|
const PRIVATE = {
|
|
CREATE_CLUSTER: Symbol('private:Kafka:createCluster'),
|
|
CLUSTER_RETRY: Symbol('private:Kafka:clusterRetry'),
|
|
LOGGER: Symbol('private:Kafka:logger'),
|
|
OFFSETS: Symbol('private:Kafka:offsets'),
|
|
}
|
|
|
|
const DEFAULT_METADATA_MAX_AGE = 300000
|
|
const warnOfDefaultPartitioner = once(logger => {
|
|
if (process.env.KAFKAJS_NO_PARTITIONER_WARNING == null) {
|
|
logger.warn(
|
|
`KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at ${websiteUrl(
|
|
'docs/migration-guide-v2.0.0',
|
|
'producer-new-default-partitioner'
|
|
)} for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1"`
|
|
)
|
|
}
|
|
})
|
|
|
|
module.exports = class Client {
|
|
/**
|
|
* @param {Object} options
|
|
* @param {Array<string>} options.brokers example: ['127.0.0.1:9092', '127.0.0.1:9094']
|
|
* @param {Object} options.ssl
|
|
* @param {Object} options.sasl
|
|
* @param {string} options.clientId
|
|
* @param {number} [options.connectionTimeout=1000] - in milliseconds
|
|
* @param {number} options.authenticationTimeout - in milliseconds
|
|
* @param {number} options.reauthenticationThreshold - in milliseconds
|
|
* @param {number} [options.requestTimeout=30000] - in milliseconds
|
|
* @param {boolean} [options.enforceRequestTimeout]
|
|
* @param {import("../types").RetryOptions} [options.retry]
|
|
* @param {import("../types").ISocketFactory} [options.socketFactory]
|
|
*/
|
|
constructor({
|
|
brokers,
|
|
ssl,
|
|
sasl,
|
|
clientId,
|
|
connectionTimeout = 1000,
|
|
authenticationTimeout,
|
|
reauthenticationThreshold,
|
|
requestTimeout,
|
|
enforceRequestTimeout = true,
|
|
retry,
|
|
socketFactory = defaultSocketFactory(),
|
|
logLevel = INFO,
|
|
logCreator = LoggerConsole,
|
|
}) {
|
|
this[PRIVATE.OFFSETS] = new Map()
|
|
this[PRIVATE.LOGGER] = createLogger({ level: logLevel, logCreator })
|
|
this[PRIVATE.CLUSTER_RETRY] = retry
|
|
this[PRIVATE.CREATE_CLUSTER] = ({
|
|
metadataMaxAge,
|
|
allowAutoTopicCreation = true,
|
|
maxInFlightRequests = null,
|
|
instrumentationEmitter = null,
|
|
isolationLevel,
|
|
}) =>
|
|
new Cluster({
|
|
logger: this[PRIVATE.LOGGER],
|
|
retry: this[PRIVATE.CLUSTER_RETRY],
|
|
offsets: this[PRIVATE.OFFSETS],
|
|
socketFactory,
|
|
brokers,
|
|
ssl,
|
|
sasl,
|
|
clientId,
|
|
connectionTimeout,
|
|
authenticationTimeout,
|
|
reauthenticationThreshold,
|
|
requestTimeout,
|
|
enforceRequestTimeout,
|
|
metadataMaxAge,
|
|
instrumentationEmitter,
|
|
allowAutoTopicCreation,
|
|
maxInFlightRequests,
|
|
isolationLevel,
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
*/
|
|
producer({
|
|
createPartitioner,
|
|
retry,
|
|
metadataMaxAge = DEFAULT_METADATA_MAX_AGE,
|
|
allowAutoTopicCreation,
|
|
idempotent,
|
|
transactionalId,
|
|
transactionTimeout,
|
|
maxInFlightRequests,
|
|
} = {}) {
|
|
const instrumentationEmitter = new InstrumentationEventEmitter()
|
|
const cluster = this[PRIVATE.CREATE_CLUSTER]({
|
|
metadataMaxAge,
|
|
allowAutoTopicCreation,
|
|
maxInFlightRequests,
|
|
instrumentationEmitter,
|
|
})
|
|
|
|
if (createPartitioner == null) {
|
|
warnOfDefaultPartitioner(this[PRIVATE.LOGGER])
|
|
}
|
|
|
|
return createProducer({
|
|
retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
|
|
logger: this[PRIVATE.LOGGER],
|
|
cluster,
|
|
createPartitioner,
|
|
idempotent,
|
|
transactionalId,
|
|
transactionTimeout,
|
|
instrumentationEmitter,
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
*/
|
|
consumer({
|
|
groupId,
|
|
partitionAssigners,
|
|
metadataMaxAge = DEFAULT_METADATA_MAX_AGE,
|
|
sessionTimeout,
|
|
rebalanceTimeout,
|
|
heartbeatInterval,
|
|
maxBytesPerPartition,
|
|
minBytes,
|
|
maxBytes,
|
|
maxWaitTimeInMs,
|
|
retry = { retries: 5 },
|
|
allowAutoTopicCreation,
|
|
maxInFlightRequests,
|
|
readUncommitted = false,
|
|
rackId = '',
|
|
} = {}) {
|
|
const isolationLevel = readUncommitted
|
|
? ISOLATION_LEVEL.READ_UNCOMMITTED
|
|
: ISOLATION_LEVEL.READ_COMMITTED
|
|
|
|
const instrumentationEmitter = new InstrumentationEventEmitter()
|
|
const cluster = this[PRIVATE.CREATE_CLUSTER]({
|
|
metadataMaxAge,
|
|
allowAutoTopicCreation,
|
|
maxInFlightRequests,
|
|
isolationLevel,
|
|
instrumentationEmitter,
|
|
})
|
|
|
|
return createConsumer({
|
|
retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
|
|
logger: this[PRIVATE.LOGGER],
|
|
cluster,
|
|
groupId,
|
|
partitionAssigners,
|
|
sessionTimeout,
|
|
rebalanceTimeout,
|
|
heartbeatInterval,
|
|
maxBytesPerPartition,
|
|
minBytes,
|
|
maxBytes,
|
|
maxWaitTimeInMs,
|
|
isolationLevel,
|
|
instrumentationEmitter,
|
|
rackId,
|
|
metadataMaxAge,
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
*/
|
|
admin({ retry } = {}) {
|
|
const instrumentationEmitter = new InstrumentationEventEmitter()
|
|
const cluster = this[PRIVATE.CREATE_CLUSTER]({
|
|
allowAutoTopicCreation: false,
|
|
instrumentationEmitter,
|
|
})
|
|
|
|
return createAdmin({
|
|
retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
|
|
logger: this[PRIVATE.LOGGER],
|
|
instrumentationEmitter,
|
|
cluster,
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
*/
|
|
logger() {
|
|
return this[PRIVATE.LOGGER]
|
|
}
|
|
}
|