Sitemap

Achieving Custom Batch Size Consumption in Kafkajs using the `eachMessage` Callback

4 min readJun 25, 2023

Introduction

Kafka is a strong distributed streaming platform that enables real-time data processing and consumption. When dealing with Kafka, you may encounter situations in which you need to consume a certain number of messages in a batch. However, the KafkaJS library does not provide an easy mechanism to accomplish this. In this article, we’ll look at how to use KafkaJS’s eachMessage method to implement batch consuming with a given batch size. We’ll also make sure that the batch is processed even if it doesn’t reach the necessary size within the predefined time frame.

Table of Contents

· Introduction
· Table of Contents
· Prerequisites
· Dependencies and Configuration
· Creating the Consumer and Topic
· Running the Consumer
· Batch Processing Logic
· Consuming Each Message
· Conclusion

Prerequisites

  • Fundamental knowledge of Kafka concepts like topics, brokers, and consumer groups.
  • Node.js and npm are installed on your machine.
  • The kafkajs library and the @kafkajs/confluent-schema-registry package are installed.
npm install kafkajs @kafkajs/confluent-schema-registry

Dependencies and Configuration

The code starts by importing the required dependencies and setting up the Kafka and Schema Registry instances.

const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');

// Configure Kafka
const kafka = new Kafka({
clientId: 'batch-consumer',
brokers: ['localhost:9092'], // Replace with your Kafka broker(s) configuration
});

const registry = new SchemaRegistry({
host: 'http://localhost:8081/'
});

Here, we import Kafka and SchemaRegistry from their respective modules. The Kafka library is used to create a Kafka instance with the desired configuration, such as the clientId and brokers (replace them with your specific configuration). The SchemaRegistry library is used to build an instance of schema registry for working with Avro schemas.

Creating the Consumer and Topic

Next, we create the consumer and provide the topic from which we want to consume events.

const consumer = kafka.consumer({ 
groupId: 'sample-kafka-topic-consumer',
heartbeatInterval: 10000, // should be lower than sessionTimeout
sessionTimeout: 60000,
});

const topic = 'sample-kafka-topic';

Using the kafka.consumer() method, we build a consumer instance. We provide a groupId to identify the consumer group. Modify the heartbeatInterval and sessionTimeout parameters to meet your needs. In addition, we specify the topic from which we wish to consume, which in this case is sample-kafka-topic.

Running the Consumer

Now, let’s define the runConsumer function, which will be used to run the consumer.

const runConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });

// ...
};

runConsumer().catch((error) => {
console.error('Error running the consumer:', error);
});

In this step, we define the runConsumer function, which connects the consumer to the Kafka cluster using consumer.connect() and subscribes to the desired topic using consumer.subscribe(). The fromBeginning: true option ensures that we consume messages from the beginning of the topic. Lastly, we invoke the runConsumer() function and handle any errors encountered during execution.

Batch Processing Logic

This component implements the main functionality of batch consuming with a given batch size.

const messageQueue = [];
const batchSize = 5;
let timer;

const processBatch = async () => {
clearTimeout(timer);
const currBatch = [];
while (messageQueue.length > 0) {
currBatch.push(messageQueue.shift());
}

await Promise.allSettled(
currBatch.map(async (message) => {
const decodedValue = await registry.decode(message.value);
console.log(decodedValue);
})
);
};

// If the batch size does not reach the expected size in 10s, will process the batch after 10s
const scheduleBatchProcessing = () => {
clearTimeout(timer);
timer = setTimeout(processBatch, 10000); // 10 seconds
};

In this part, we define the variables and functions required for batch processing.

  • messageQueue is an array that acts as a queue for messages to be stored until the batch size is achieved.
  • batchSize defines the desired batch size.
  • processBatch is an asynchronous function that processes the batch of messages. It extracts messages from the messageQueue, decodes their values using the registry.decode() method, and then applies any custom logic.
  • scheduleBatchProcessing is a function that schedules batch processing. It starts a timer to see if the batch size has reached the target size within a certain amount of time (10 seconds). If not, the processBatch function is called with the available messages in messageQueue.

Consuming Each Message

Lastly, we consume each message using the consumer’s eachMessage method.

await consumer.run({
eachMessage: async ({ message }) => {
try {
messageQueue.push(message);
if (messageQueue.length >= batchSize) {
await processBatch();
} else {
scheduleBatchProcessing();
}
} catch (error) {
console.log(error);
}
},
});

Here, we call the run method of the consumer and provide a callback function with the eachMessage property. Inside the function, we push the message to the messageQueue and check if the batch size has reached the desired value. If it has, we call processBatch() to handle the batch. Or else, we use scheduleBatchProcessing() to schedule the batch processing.

Conclusion

In this article, we looked at how to use the KafkaJS library to create a batch consumer with a predetermined batch size. Even though KafkaJS does not give a direct method for consuming a certain amount of messages, we achieved batch processing by leveraging the eachMessage method. We ensured effective batch consumption by queuing the messages and processing them when the appropriate batch size was attained. In addition, we implemented a timeout mechanism to allow the batch to be processed even if it did not reach the desired size within the time limit. Feel free to modify the code to meet your individual needs and scale it to fit the needs of your application.

--

--

Tenusha Guruge
Tenusha Guruge

Written by Tenusha Guruge

Software Engineer at Sysco LABS. Graduate of Sri Lanka Institute of Information Technology (SLIIT).

Responses (1)