Simple Kafka Demo
In this blog, we will walk you through a simple Kafka Demo
Create 3 Virtual Machines
- We will create a two node kafka cluster
- We will use the third node for compile the Java / Scala producers and consumers
- Please refer to the Vagrant Page if you need more details
- We used the below vagrant file
======================================================
Vagrantfile
======================================================
Vagrant.configure(“2”) do |config|
## config.vm.box = “bento/centos-7.2”
config.vm.define “kfdmc1” do |kfdmc1|
kfdmc1.vm.box = “bento/centos-7.2”
kfdmc1.vm.network “private_network”, ip:”192.168.77.10″
kfdmc1.vm.hostname = “kfdmc1”
kfdmc1.vm.provider :virtualbox do |vb|
vb.name = “kfdmc1”
vb.memory = 2048
vb.cpus = 1
end
end
config.vm.define “kfdmc2” do |kfdmc2|
kfdmc2.vm.box = “bento/centos-7.2”
kfdmc2.vm.network “private_network”, ip:”192.168.77.11″
kfdmc2.vm.hostname = “kfdmc2”
kfdmc2.vm.provider :virtualbox do |vb|
vb.name = “kfdmc2”
vb.memory = 2048
vb.cpus = 1
end
end
config.vm.define “kfdmc6” do |kfdmc6|
kfdmc6.vm.box = “bento/centos-7.2”
kfdmc6.vm.network “private_network”, ip:”192.168.77.15″
kfdmc6.vm.hostname = “kfdmc6”
kfdmc6.vm.provider :virtualbox do |vb|
vb.name = “kfdmc6”
vb.memory = 2048
vb.cpus = 1
end
end
end
- We used the below to update the /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.77.10 kfdmc1
192.168.77.11 kfdmc2
192.168.77.15 kfdmc6
- We used the below commands to setup a user account
su – root
groupadd kafka
useradd kafka -g kafka
passwd kafka
su – kafka
Download and Install Java (on all machines)
sudo yum install -y java-1.8.0-openjdk-devel
Install and Setup and Configure a Kafka 2 Node Cluster
Download latest kafka from kafka website (https://kafka.apache.org/quickstart )
download kafka_2.11-0.11.0.0.tgz to /vagrant as /vagrant/kafka_2.11-0.11.0.0.tgz
on both below machines “kfdmc1” and “kfdmc2”
- cd /home/kafka
- tar -xvf /vagrant/kafka_2.11-0.11.0.0.tgz
- cd kafka_2.11-0.11.0.0
- vi config/server.properties
- zookeeper.connect=192.168.77.10:2181
Start Zookeeper
- bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka Broker
- bin/kafka-server-start.sh config/server.properties
Test Kafka Single Node Setup
- bin/kafka-topics.sh –create –zookeeper 192.168.77.10:2181 –replication-factor 1 –partitions 1 –topic test
- bin/kafka-topics.sh –list –zookeeper 192.168.77.10:2181
- bin/kafka-console-producer.sh –broker-list 192.168.77.10:9092 –topic test
- This is a message
- This is another message
- bin/kafka-console-consumer.sh –bootstrap-server 192.168.77.10:9092 –topic test –from-beginning
- All the text you type in the producer will appear in the consumer window
Kafka Multi Node Setup
on “kfdmc2”
- cd /home/kafka
- vi config/server.properties
- broker.id=1
- zookeeper.connect=192.168.77.10:2181
- Start the Second Kafka Broker
- bin/kafka-server-start.sh config/server.properties
Test Kafka Multi Node Setup
- bin/kafka-topics.sh –create –zookeeper 192.168.77.10:2181 –replication-factor 2 –partitions 1 –topic test2
- bin/kafka-topics.sh –list –zookeeper 192.168.77.10:2181
- Producer on “kfdmc1” and consumer on “kfdmc2”
- On “kfdmc1”
- bin/kafka-console-producer.sh –broker-list 192.168.77.10:9092,192.168.77.11:9092 –topic test2
- On “kfdmc2”
- bin/kafka-console-consumer.sh –zookeeper 192.168.77.10:2181 –topic test2 –from-beginning
- Test the Failover Scenarios by killing one kafka broker at a time
Test Kafka Custom Scala Producers and Consumers
We will be doing these steps on “kfdmc6”
Download and install Scala
- cd /home/kafka
- wget https://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
- tar -xvf scala-2.11.8.tgz
Download and Install SBT
We will be using SBT for compiling the Scala code
- curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
- sudo yum install sbt
Scala Source Code
Please create the source using the below commands
- mkdir -p /home/kafka/producer/src/main/scala
- mkdir -p /home/kafka/consumer/src/main/scala
- vi /home/kafka/producer/src/main/scala/ProducerExample.scala
object ProducerExample extends App {
import java.util.Properties
import org.apache.kafka.clients.producer._
val props = new Properties()
props.put(“bootstrap.servers”, “192.168.77.10:9092,192.168.77.11:9092”)
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”)
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”)
val producer = new KafkaProducer[String, String](props)
val TOPIC=”test2″
for(i<- 1 to 50){
val record = new ProducerRecord(TOPIC, “key”+s”$i”, s”hello $i”)
Thread.sleep(1000)
producer.send(record)
}
val record = new ProducerRecord(TOPIC, “key”, “the end “+new java.util.Date)
producer.send(record)
producer.close()
}
- vi /home/kafka/consumer/src/main/scala/ConsumerExample.scala
import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object ConsumerExample extends App {
import java.util.Properties
val TOPIC=”test2″
val props = new Properties()
props.put(“bootstrap.servers”, “192.168.77.10:9092,192.168.77.11:9092”)
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
props.put(“group.id”, “something”)
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC))
while(true){
val records=consumer.poll(100)
for (record<-records.asScala){
println(“Received message: (key=\”” + record.key() + “\”, value=\”” + record.value() + “\”) at offset =\”” + record.offset() +”\””)
}
}
}
- vi /home/kafka/producer/build.sbt
name := “producer_sample”
version := “1.0”
scalaVersion := “2.11.0”
libraryDependencies += “org.apache.kafka” % “kafka_2.11” % “0.10.1.0”
- vi /home/kafka/consumer/build.sbt
name := “consumer_sample”
version := “1.0”
scalaVersion := “2.11.0”
libraryDependencies += “org.apache.kafka” % “kafka_2.11” % “0.10.1.0”
Compile the Source Code
- cd /home/kafka/producer
- sbt package
- Output:
- /home/kafka/producer/target/scala-2.11/producer_sample_2.11-1.0.jar
- cd /home/kafka/consumer
- sbt package
- Output:
- /home/kafka/consumer/target/scala-2.11/consumer_sample_2.11-1.0.jar
Run the Producer and Consumer on “kfdmc1” or “kfdmc2”
- Copy the files
- scp kafka@kfdmc6:/home/kafka/producer/target/scala-2.11/producer_sample_2.11-1.0.jar /home/kafka/.
- scp kafka@kfdmc6:/home/kafka/consumer/target/scala-2.11/consumer_sample_2.11-1.0.jar /home/kafka/.
- Start Producer
- java -cp “/home/kafka/kafka_2.11-0.11.0.0/libs/*”:/home/kafka/producer_sample_2.11-1.0.jar ProducerExample
- Start Consumer
- java -cp “/home/kafka/kafka_2.11-0.11.0.0/libs/*”:/home/kafka/consumer_sample_2.11-1.0.jar ConsumerExample
- Output Below:
[kafka@kfdmc2 ~]$ java -cp “/home/kafka/kafka_2.11-0.11.0.0/libs/*”:/home/kafka/consumer_sample_2.11-1.0.jar ConsumerExample
Received message: (key=”key1″, value=”hello 1″) at offset =”737″
Received message: (key=”key2″, value=”hello 2″) at offset =”738″
::::
Received message: (key=”key”, value=”the end Thu Jul 27 14:35:53 UTC 2017″) at offset =”787″
In the next post we will take this a bit further and read contents from twitter.
References:
https://kafka.apache.org/quickstart
http://armourbear.blogspot.com/2015/03/setting-up-multinode-kafka-cluster.html
https://github.com/smallnest/kafka-example-in-scala
https://gist.github.com/fancellu/f78e11b1808db2727d76