Getting started with Kafka and Node.js - Setup with example
Let's build a pub/sub program using Kafka and Node.js, Kafka is a enterprise level tool for sending messages across the Microservices.
Kafka is generally used for two broad classes of applications:
- Building real-time streaming data pipelines that reliably get data between systems or applications
- Building real-time streaming applications that transform or react to the streams of data
To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.
First a few concepts:
- Kafka is run as a cluster on one or more servers that can span multiple datacenters.
- The Kafka cluster stores streams of records in categories called topics.
- Each record consists of a key, a value, and a timestamp.
Kafka has four core APIs:
- The Producer API allows an application to publish a stream of records to one or more Kafka topics.
- The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
- The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
- The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
In this example we use Producer and consumer API's.
Before we started lets setup the project folder and dependencies
Kafka single node setup
On a single machine, a 3 broker kafka instance is at best the minimum, for a hassle-free working. Also, replication factor is set to 2.
Say X,Y and Z are our kafka brokers. With replication factor 2, the data in X will be copied to both Y & Z, the data in Y will be copied to X & Z and the data of Z is copied to X & Y.
Prerequisites
- have java >= 1.8 installed.
- get binary distribution of Kafka from here .
Setup
Extract the contents of the kafka archive to a convenient place and cd
into it. Use a terminal multiplexer to run the components that make the kafka eco-system.
Zookeeper
- Edit the config file
config/server.properties
and change thedataDir
entry to some place that does not get wiped after a reboot.
Ex:dataDir=/home/user /tmp/zookeeper
- Start the zookeeper instance with
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Kafka brokers
- In the
config
folder there would be aserver.properties
file. This is the kafka server's config file. We need 3 instances of kafka brokers. - Make a copy.
$ cp config/server.properties config/server.b1.properties
- In the copy make the following changes
broker.id=1 #unique id for our broker instance
port=9092 #port where it listens
delete.topic.enable=true #if we want to delete kafka topic stored in broker
log.dirs=/home/thatcoder/kafka-logs/01 #to a place thats not volatile
advertised.host.name=10.0.0.81 #prevents leader not found error when connecting from remote machine
- Make 2 more copies of this file and change the fields
broker.id
,port
andlog.dirs
for each file. - Run the individual brokers like
- Run the individual brokers like
$ bin/kafka-server-start.sh config/server.b1.properties
$ bin/kafka-server-start.sh config/server.b2.properties
$ bin/kafka-server-start.sh config/server.b3.properties
**Tip : ** Executing a $ jps
on the shell would give all JVM instances. To kill the processes kill -9 <pid>
would do the trick.
Testing out the install
- Create a topic with
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic <topicname>
- Push data onto it
$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync --topic <topicname>
- Fetch data from it
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topicname> --from-beginning
- Make a folder with name kafka-node
- install kafka-node in project directory
npm install kafka-node --save
Now your package.json will look like this,
{
"name": "kaas",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node producer.js",
"dev": "nodemon producer.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"kafka-node": "^2.4.1"
}
}
Now lets deep into codes,
Since it's a producer-consumer programming model, lets create two consumers and a producer file.
If I make a message broker with the topic name as 'example' then Kafka will send the message to the corresponding consumers which consume this 'example' topic.
Set up a config file with the following
module.exports = {
kafka_topic: 'example',
kafka_server: 'localhost:2181',
};
producer program
const kafka = require('kafka-node');
const bp = require('body-parser');
const config = require('./config');
try {
const Producer = kafka.Producer;
const client = new kafka.Client(config.kafka_server);
const producer = new Producer(client);
const kafka_topic = 'example';
console.log(kafka_topic);
let payloads = [
{
topic: kafka_topic,
messages: config.kafka_topic
}
];
producer.on('ready', async function() {
let push_status = producer.send(payloads, (err, data) => {
if (err) {
console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
} else {
console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
}
});
});
producer.on('error', function(err) {
console.log(err);
console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
throw err;
});
}
catch(e) {
console.log(e);
}
consumer program (paste the same in both consumer files)
const kafka = require('kafka-node');
const bp = require('body-parser');
const config = require('./config');
try {
const Consumer = kafka.HighLevelConsumer;
const client = new kafka.Client(config.kafka_server);
let consumer = new Consumer(
client,
[{ topic: config.kafka_topic, partition: 0 }],
{
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: 'utf8',
fromOffset: false
}
);
consumer.on('message', async function(message) {
console.log('here');
console.log(
'kafka-> ',
message.value
);
})
consumer.on('error', function(err) {
console.log('error', err);
});
}
catch(e) {
console.log(e);
}
Run the producer and consumer program and now you can see consumers programs fetch the data from the queue and we can see the console on the terminal.
Github link: https://github.com/that-coder/kafka-example
Happy coding 🙌🏻