Welcome folks,

Read about microservices/ event-driven architecture first.
link to the read article

So let's make a pub/sub program using Kafka and Node.js, Kafka is a enterprise level tool for sending messages across the microservices.

Kafka is generally used for two broad classes of applications:

  • Building real-time streaming data pipelines that reliably get data between systems or applications
  • Building real-time streaming applications that transform or react to the streams of data

To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.

First a few concepts:

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

In this example we use Producer and consumer API's.

Before we started lets setup the project folder and dependencies

Kafka single node setup

On a single machine, a 3 broker kafka instance is at best the minimum, for a hassle-free working. Also, replication factor is set to 2.

Say X,Y and Z are our kafka brokers. With replication factor 2, the data in X will be copied to both Y & Z, the data in Y will be copied to X & Z and the data of Z is copied to X & Y.

Prerequisites

  • have java >= 1.8 installed.
  • get binary distribution of Kafka from here .

Setup

Extract the contents of the kafka archive to a convenient place and cd into it. Use a terminal multiplexer to run the components that make the kafka eco-system.

Zookeeper

  • Edit the config file config/server.properties and change the dataDir entry to some place that does not get wiped after a reboot.
    Ex:dataDir=/home/user /tmp/zookeeper
  • Start the zookeeper instance with
    $ bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka brokers

  • In the config folder there would be a server.properties file. This is the kafka server's config file. We need 3 instances of kafka brokers.
  • Make a copy. $ cp config/server.properties config/server.b1.properties
  • In the copy make the following changes
broker.id=1  #unique id for our broker instance
port=9092    #port where it listens 
delete.topic.enable=true   #if we want to delete kafka topic stored in broker
log.dirs=/home/neoito/kafka-logs/01  #to a place thats not volatile 
advertised.host.name=10.0.0.81 #prevents leader not found error when connecting from remote machine
  • Make 2 more copies of this file and change the fields broker.id, port and log.dirs for each file.
  • Run the individual brokers like
$  bin/kafka-server-start.sh config/server.b1.properties
$  bin/kafka-server-start.sh config/server.b2.properties
$  bin/kafka-server-start.sh config/server.b3.properties

**Tip : ** Executing a $ jps on the shell would give all JVM instances. To kill the processes kill -9 <pid> would do the trick.

Testing out the install
  • Create a topic with
    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic <topicname>
  • Push data onto it
    $ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync --topic <topicname>
  • Fetch data from it
    $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topicname> --from-beginning
  1. Make a folder with name kafka-node
  2. install kafka-node in project directory
npm install kafka-node --save

Now your package.json will look like this,

{
  "name": "kaas",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node producer.js",
    "dev": "nodemon producer.js"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "kafka-node": "^2.4.1"
  }
}

Now lets deep into codes,

Since it's a producer-consumer programming model, lets create two consumers and a producer file.

If I make a message broker with the topic name as 'example' then Kafka will send the message to the corresponding consumers which consume this 'example' topic.

Set up a config file with the following

module.exports = {
  kafka_topic: 'example',
  kafka_server: 'localhost:2181',
};

producer program



const kafka = require('kafka-node');
const bp = require('body-parser');
const config = require('./config');

try {
  const Producer = kafka.Producer;
  const client = new kafka.Client(config.kafka_server);
  const producer = new Producer(client);
  const kafka_topic = 'example';
  console.log(kafka_topic);
  let payloads = [
    {
      topic: kafka_topic,
      messages: config.kafka_topic
    }
  ];

  producer.on('ready', async function() {
    let push_status = producer.send(payloads, (err, data) => {
      if (err) {
        console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
      } else {
        console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
      }
    });
  });

  producer.on('error', function(err) {
    console.log(err);
    console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
    throw err;
  });
}
catch(e) {
  console.log(e);
}

consumer program (paste the same in both consumer files)


const kafka = require('kafka-node');
const bp = require('body-parser');
const config = require('./config');

try {
  const Consumer = kafka.HighLevelConsumer;
  const client = new kafka.Client(config.kafka_server);
  let consumer = new Consumer(
    client,
    [{ topic: config.kafka_topic, partition: 0 }],
    {
      autoCommit: true,
      fetchMaxWaitMs: 1000,
      fetchMaxBytes: 1024 * 1024,
      encoding: 'utf8',
      fromOffset: false
    }
  );
  consumer.on('message', async function(message) {
    console.log('here');
    console.log(
      'kafka-> ',
      message.value
    );
  })
  consumer.on('error', function(err) {
    console.log('error', err);
  });
}
catch(e) {
  console.log(e);
}

Run the producer and consumer program and now you can see consumers programs fetch the data from the queue and we can see the console on the terminal.

Github link: https://github.com/that-coder/kafka-example

Happy coding 🙌🏻