Achieving Custom Batch Size Consumption in Kafkajs using the `eachMessage` Callback
Introduction
Kafka is a strong distributed streaming platform that enables real-time data processing and consumption. When dealing with Kafka, you may encounter situations in which you need to consume a certain number of messages in a batch. However, the KafkaJS library does not provide an easy mechanism to accomplish this. In this article, we’ll look at how to use KafkaJS’s eachMessage
method to implement batch consuming with a given batch size. We’ll also make sure that the batch is processed even if it doesn’t reach the necessary size within the predefined time frame.
Table of Contents
· Introduction
· Table of Contents
· Prerequisites
· Dependencies and Configuration
· Creating the Consumer and Topic
· Running the Consumer
· Batch Processing Logic
· Consuming Each Message
· Conclusion
Prerequisites
- Fundamental knowledge of Kafka concepts like topics, brokers, and consumer groups.
- Node.js and npm are installed on your machine.
- The
kafkajs
library and the@kafkajs/confluent-schema-registry
package are installed.
npm install kafkajs @kafkajs/confluent-schema-registry
Dependencies and Configuration
The code starts by importing the required dependencies and setting up the Kafka and Schema Registry instances.
const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
// Configure Kafka
const kafka = new Kafka({
clientId: 'batch-consumer',
brokers: ['localhost:9092'], // Replace with your Kafka broker(s) configuration
});
const registry = new SchemaRegistry({
host: 'http://localhost:8081/'
});
Here, we import Kafka
and SchemaRegistry
from their respective modules. The Kafka
library is used to create a Kafka instance with the desired configuration, such as the clientId
and brokers
(replace them with your specific configuration). The SchemaRegistry
library is used to build an instance of schema registry for working with Avro schemas.
Creating the Consumer and Topic
Next, we create the consumer and provide the topic from which we want to consume events.
const consumer = kafka.consumer({
groupId: 'sample-kafka-topic-consumer',
heartbeatInterval: 10000, // should be lower than sessionTimeout
sessionTimeout: 60000,
});
const topic = 'sample-kafka-topic';
Using the kafka.consumer()
method, we build a consumer instance. We provide a groupId
to identify the consumer group. Modify the heartbeatInterval
and sessionTimeout
parameters to meet your needs. In addition, we specify the topic from which we wish to consume, which in this case is sample-kafka-topic
.
Running the Consumer
Now, let’s define the runConsumer
function, which will be used to run the consumer.
const runConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
// ...
};
runConsumer().catch((error) => {
console.error('Error running the consumer:', error);
});
In this step, we define the runConsumer
function, which connects the consumer to the Kafka cluster using consumer.connect()
and subscribes to the desired topic using consumer.subscribe()
. The fromBeginning: true
option ensures that we consume messages from the beginning of the topic. Lastly, we invoke the runConsumer()
function and handle any errors encountered during execution.
Batch Processing Logic
This component implements the main functionality of batch consuming with a given batch size.
const messageQueue = [];
const batchSize = 5;
let timer;
const processBatch = async () => {
clearTimeout(timer);
const currBatch = [];
while (messageQueue.length > 0) {
currBatch.push(messageQueue.shift());
}
await Promise.allSettled(
currBatch.map(async (message) => {
const decodedValue = await registry.decode(message.value);
console.log(decodedValue);
})
);
};
// If the batch size does not reach the expected size in 10s, will process the batch after 10s
const scheduleBatchProcessing = () => {
clearTimeout(timer);
timer = setTimeout(processBatch, 10000); // 10 seconds
};
In this part, we define the variables and functions required for batch processing.
messageQueue
is an array that acts as a queue for messages to be stored until the batch size is achieved.batchSize
defines the desired batch size.processBatch
is an asynchronous function that processes the batch of messages. It extracts messages from themessageQueue
, decodes their values using theregistry.decode()
method, and then applies any custom logic.scheduleBatchProcessing
is a function that schedules batch processing. It starts a timer to see if the batch size has reached the target size within a certain amount of time (10 seconds). If not, theprocessBatch
function is called with the available messages inmessageQueue
.
Consuming Each Message
Lastly, we consume each message using the consumer’s eachMessage
method.
await consumer.run({
eachMessage: async ({ message }) => {
try {
messageQueue.push(message);
if (messageQueue.length >= batchSize) {
await processBatch();
} else {
scheduleBatchProcessing();
}
} catch (error) {
console.log(error);
}
},
});
Here, we call the run
method of the consumer and provide a callback function with the eachMessage
property. Inside the function, we push the message to the messageQueue
and check if the batch size has reached the desired value. If it has, we call processBatch()
to handle the batch. Or else, we use scheduleBatchProcessing()
to schedule the batch processing.
Conclusion
In this article, we looked at how to use the KafkaJS library to create a batch consumer with a predetermined batch size. Even though KafkaJS does not give a direct method for consuming a certain amount of messages, we achieved batch processing by leveraging the eachMessage
method. We ensured effective batch consumption by queuing the messages and processing them when the appropriate batch size was attained. In addition, we implemented a timeout mechanism to allow the batch to be processed even if it did not reach the desired size within the time limit. Feel free to modify the code to meet your individual needs and scale it to fit the needs of your application.