Getting started with Kafka and Node.js - Setup with example
4 min read

Getting started with Kafka and Node.js - Setup with example

Let's build a pub/sub program using Kafka and Node.js, Kafka is a enterprise level tool for sending messages across the Microservices.

Kafka is generally used for two broad classes of applications:

  • Building real-time streaming data pipelines that reliably get data between systems or applications
  • Building real-time streaming applications that transform or react to the streams of data

To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.

First a few concepts:

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

In this example we use Producer and consumer API's.

Before we started lets setup the project folder and dependencies

Kafka single node setup

On a single machine, a 3 broker kafka instance is at best the minimum, for a hassle-free working. Also, replication factor is set to 2.

Say X,Y and Z are our kafka brokers. With replication factor 2, the data in X will be copied to both Y & Z, the data in Y will be copied to X & Z and the data of Z is copied to X & Y.


  • have java >= 1.8 installed.
  • get binary distribution of Kafka from here .


Extract the contents of the kafka archive to a convenient place and cd into it. Use a terminal multiplexer to run the components that make the kafka eco-system.


  • Edit the config file config/ and change the dataDir entry to some place that does not get wiped after a reboot.
    Ex:dataDir=/home/user /tmp/zookeeper
  • Start the zookeeper instance with
    $ bin/ config/

Kafka brokers

  • In the config folder there would be a file. This is the kafka server's config file. We need 3 instances of kafka brokers.
  • Make a copy. $ cp config/ config/
  • In the copy make the following changes  #unique id for our broker instance
port=9092    #port where it listens 
delete.topic.enable=true   #if we want to delete kafka topic stored in broker
log.dirs=/home/thatcoder/kafka-logs/01  #to a place thats not volatile #prevents leader not found error when connecting from remote machine
  • Make 2 more copies of this file and change the fields, port and log.dirs for each file.
  • Run the individual brokers like
  • Run the individual brokers like
$  bin/ config/
$  bin/ config/
$  bin/ config/

**Tip : ** Executing a $ jps on the shell would give all JVM instances. To kill the processes kill -9 <pid> would do the trick.

Testing out the install
  • Create a topic with
    $ bin/ --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic <topicname>
  • Push data onto it
    $ bin/ --broker-list localhost:9092,localhost:9093,localhost:9094 --sync --topic <topicname>
  • Fetch data from it
    $ bin/ --zookeeper localhost:2181 --topic <topicname> --from-beginning
  1. Make a folder with name kafka-node
  2. install kafka-node in project directory
npm install kafka-node --save

Now your package.json will look like this,

  "name": "kaas",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node producer.js",
    "dev": "nodemon producer.js"
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "kafka-node": "^2.4.1"

Now lets deep into codes,

Since it's a producer-consumer programming model, lets create two consumers and a producer file.

If I make a message broker with the topic name as 'example' then Kafka will send the message to the corresponding consumers which consume this 'example' topic.

Set up a config file with the following

module.exports = {
  kafka_topic: 'example',
  kafka_server: 'localhost:2181',

producer program

const kafka = require('kafka-node');
const bp = require('body-parser');
const config = require('./config');

try {
  const Producer = kafka.Producer;
  const client = new kafka.Client(config.kafka_server);
  const producer = new Producer(client);
  const kafka_topic = 'example';
  let payloads = [
      topic: kafka_topic,
      messages: config.kafka_topic

  producer.on('ready', async function() {
    let push_status = producer.send(payloads, (err, data) => {
      if (err) {
        console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
      } else {
        console.log('[kafka-producer -> '+kafka_topic+']: broker update success');

  producer.on('error', function(err) {
    console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
    throw err;
catch(e) {

consumer program (paste the same in both consumer files)

const kafka = require('kafka-node');
const bp = require('body-parser');
const config = require('./config');

try {
  const Consumer = kafka.HighLevelConsumer;
  const client = new kafka.Client(config.kafka_server);
  let consumer = new Consumer(
    [{ topic: config.kafka_topic, partition: 0 }],
      autoCommit: true,
      fetchMaxWaitMs: 1000,
      fetchMaxBytes: 1024 * 1024,
      encoding: 'utf8',
      fromOffset: false
  consumer.on('message', async function(message) {
      'kafka-> ',
  consumer.on('error', function(err) {
    console.log('error', err);
catch(e) {

Run the producer and consumer program and now you can see consumers programs fetch the data from the queue and we can see the console on the terminal.

Github link:

Happy coding 🙌🏻