Simple Kafka Demo

Simple Kafka Demo

In this blog, we will walk you through a simple Kafka Demo

Create 3 Virtual Machines

  • We will create a two node kafka cluster
  • We will use the third node for compile the Java / Scala producers and consumers
  • Please refer to the Vagrant Page if you need more details
  • We used the below vagrant file

======================================================

Vagrantfile

======================================================

Vagrant.configure(“2”) do |config|

##  config.vm.box = “bento/centos-7.2”

   config.vm.define “kfdmc1” do |kfdmc1|

       kfdmc1.vm.box = “bento/centos-7.2”

       kfdmc1.vm.network “private_network”, ip:”192.168.77.10″

       kfdmc1.vm.hostname = “kfdmc1”

       kfdmc1.vm.provider :virtualbox do |vb|

          vb.name = “kfdmc1”

                                vb.memory = 2048

                                vb.cpus = 1

       end

   end    

   config.vm.define “kfdmc2” do |kfdmc2|

       kfdmc2.vm.box = “bento/centos-7.2”

       kfdmc2.vm.network “private_network”, ip:”192.168.77.11″

       kfdmc2.vm.hostname = “kfdmc2”

       kfdmc2.vm.provider :virtualbox do |vb|

          vb.name = “kfdmc2”

                                vb.memory = 2048

                                vb.cpus = 1

       end

   end    

   config.vm.define “kfdmc6” do |kfdmc6|

       kfdmc6.vm.box = “bento/centos-7.2”

       kfdmc6.vm.network “private_network”, ip:”192.168.77.15″

       kfdmc6.vm.hostname = “kfdmc6”

       kfdmc6.vm.provider :virtualbox do |vb|

          vb.name = “kfdmc6”

                                vb.memory = 2048

                                vb.cpus = 1

       end

   end    

end

  • We used the below to update the /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.77.10 kfdmc1

192.168.77.11 kfdmc2

192.168.77.15 kfdmc6

 

  • We used the below commands to setup a user account

su – root

groupadd kafka

useradd kafka -g kafka

passwd kafka

su – kafka

 

Download and Install Java (on all machines)

sudo yum install -y java-1.8.0-openjdk-devel

Install and Setup and Configure a Kafka 2 Node Cluster

Download latest kafka from kafka website (https://kafka.apache.org/quickstart )

download kafka_2.11-0.11.0.0.tgz to /vagrant as /vagrant/kafka_2.11-0.11.0.0.tgz

on both below machines “kfdmc1” and “kfdmc2”

  • cd /home/kafka
  • tar -xvf /vagrant/kafka_2.11-0.11.0.0.tgz
  • cd kafka_2.11-0.11.0.0
  • vi config/server.properties
    • zookeeper.connect=192.168.77.10:2181

Start Zookeeper

  • bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka Broker

  • bin/kafka-server-start.sh config/server.properties

Test Kafka Single Node Setup

  • bin/kafka-topics.sh –create –zookeeper 192.168.77.10:2181 –replication-factor 1 –partitions 1 –topic test
  • bin/kafka-topics.sh –list –zookeeper 192.168.77.10:2181
  • bin/kafka-console-producer.sh –broker-list 192.168.77.10:9092 –topic test
    • This is a message
    • This is another message
  • bin/kafka-console-consumer.sh –bootstrap-server 192.168.77.10:9092 –topic test –from-beginning
    • All the text you type in the producer will appear in the consumer window

Kafka Multi Node Setup

on “kfdmc2”

  • cd /home/kafka
  • vi config/server.properties
    • broker.id=1
    • zookeeper.connect=192.168.77.10:2181
  • Start the Second Kafka Broker
    • bin/kafka-server-start.sh config/server.properties

Test Kafka Multi Node Setup

  • bin/kafka-topics.sh –create –zookeeper 192.168.77.10:2181 –replication-factor 2 –partitions 1 –topic test2
  • bin/kafka-topics.sh –list –zookeeper 192.168.77.10:2181
  • Producer on “kfdmc1” and consumer on “kfdmc2”
  • On “kfdmc1”
    • bin/kafka-console-producer.sh –broker-list 192.168.77.10:9092,192.168.77.11:9092 –topic test2
  • On “kfdmc2”
    • bin/kafka-console-consumer.sh –zookeeper 192.168.77.10:2181 –topic test2 –from-beginning
  • Test the Failover Scenarios by killing one kafka broker at a time

 

Test Kafka Custom Scala Producers and Consumers

We will be doing these steps on “kfdmc6”

Download and install Scala

  • cd /home/kafka
  • wget https://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
  • tar -xvf scala-2.11.8.tgz

 

Download and Install SBT

We will be using SBT for compiling the Scala code

  • curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
  • sudo yum install sbt

Scala Source Code

Please create the source using the below commands

  • mkdir -p /home/kafka/producer/src/main/scala
  • mkdir -p /home/kafka/consumer/src/main/scala
  • vi /home/kafka/producer/src/main/scala/ProducerExample.scala

object ProducerExample extends App {

import java.util.Properties

import org.apache.kafka.clients.producer._

val  props = new Properties()

props.put(“bootstrap.servers”, “192.168.77.10:9092,192.168.77.11:9092”)

props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”)

props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”)

val producer = new KafkaProducer[String, String](props)

val TOPIC=”test2″

for(i<- 1 to 50){

 val record = new ProducerRecord(TOPIC, “key”+s”$i”, s”hello $i”)

 Thread.sleep(1000)

 producer.send(record)

}

val record = new ProducerRecord(TOPIC, “key”, “the end “+new java.util.Date)

producer.send(record)

producer.close()

}

 

  • vi /home/kafka/consumer/src/main/scala/ConsumerExample.scala

import java.util

import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConverters._

object ConsumerExample extends App {

 import java.util.Properties

 val TOPIC=”test2″

 val  props = new Properties()

 props.put(“bootstrap.servers”, “192.168.77.10:9092,192.168.77.11:9092”)

 props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)

 props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)

 props.put(“group.id”, “something”)

 val consumer = new KafkaConsumer[String, String](props)

 consumer.subscribe(util.Collections.singletonList(TOPIC))

 while(true){

   val records=consumer.poll(100)

   for (record<-records.asScala){

    println(“Received message: (key=\”” + record.key() + “\”, value=\”” + record.value() + “\”) at offset =\”” + record.offset() +”\””)

   }

 }

}

  • vi /home/kafka/producer/build.sbt

name := “producer_sample”

version := “1.0”

scalaVersion := “2.11.0”

libraryDependencies += “org.apache.kafka” % “kafka_2.11” % “0.10.1.0”

  • vi /home/kafka/consumer/build.sbt

name := “consumer_sample”

version := “1.0”

scalaVersion := “2.11.0”

libraryDependencies += “org.apache.kafka” % “kafka_2.11” % “0.10.1.0”

 

Compile the Source Code

  • cd /home/kafka/producer
  • sbt package
    • Output:
    • /home/kafka/producer/target/scala-2.11/producer_sample_2.11-1.0.jar
  • cd /home/kafka/consumer
  • sbt package
    • Output:
    • /home/kafka/consumer/target/scala-2.11/consumer_sample_2.11-1.0.jar

Run the Producer and Consumer on “kfdmc1” or “kfdmc2”

  • Copy the files
  • scp kafka@kfdmc6:/home/kafka/producer/target/scala-2.11/producer_sample_2.11-1.0.jar /home/kafka/.
  • scp kafka@kfdmc6:/home/kafka/consumer/target/scala-2.11/consumer_sample_2.11-1.0.jar /home/kafka/.
  • Start Producer
    • java -cp “/home/kafka/kafka_2.11-0.11.0.0/libs/*”:/home/kafka/producer_sample_2.11-1.0.jar ProducerExample
  • Start Consumer
    • java -cp “/home/kafka/kafka_2.11-0.11.0.0/libs/*”:/home/kafka/consumer_sample_2.11-1.0.jar ConsumerExample
    • Output Below:

[kafka@kfdmc2 ~]$ java -cp “/home/kafka/kafka_2.11-0.11.0.0/libs/*”:/home/kafka/consumer_sample_2.11-1.0.jar ConsumerExample

Received message: (key=”key1″, value=”hello 1″) at offset =”737″

Received message: (key=”key2″, value=”hello 2″) at offset =”738″

::::

Received message: (key=”key”, value=”the end Thu Jul 27 14:35:53 UTC 2017″) at offset =”787″

 

In the next post we will take this a bit further and read contents from twitter.

 

References:

https://kafka.apache.org/quickstart

http://armourbear.blogspot.com/2015/03/setting-up-multinode-kafka-cluster.html

https://github.com/smallnest/kafka-example-in-scala

https://gist.github.com/fancellu/f78e11b1808db2727d76

 

Author: Pathik Paul

Webmaster: Ella Paul

Leave a Reply

Your email address will not be published. Required fields are marked *