Kafka with Twitter

Kafka with Twitter

In the previous post, we setup a single node and then a multi node kafka cluster. We also built and ran custom Producers and Consumers in Scala. In this blog, we will take kafka one step further. We will read live twitter feeds and feed them to Kafa

Pre-Requisites

  • All the machines setup as per the previous kafka blog
  • Kafka Cluster Operational as per the previous kafka blog

Setup a Twitter Account

  • Refer to http://saurzcode.in/2015/02/kafka-producer-using-twitter-stream/ for details
    • Go to https://apps.twitter.com/app/new
    • Log in or create New Account if needed
    • Enter a new Application Name
    • Enter Description
    • Enter website address
    • You can leave the callback URL empty.
    • Accept Terms of Service
    • Use Button “Create Your Twitter Application”
    • You will need
      • Consumer Key [ On this page ]
    • Switch to Tab “Keys and Access Tokens”
      • Consumer Secret [ On Tab Keys and Access Tokens ]
      • Access token [ On Tab Keys and Access Tokens ]
      • Access Token Secret [ On Tab Keys and Access Tokens ]

Set up Maven to Compile

  • cd /home/kafka
  • wget: http://mirror.jax.hugeserver.com/apache/maven/maven-3/3.5.0/binaries/apache-maven-3.5.0-bin.tar.gz
  • tar -xvf apache-maven-3.5.0-bin.tar.gz

Set up the Source Code

  • pom.xml
    • mkdir -p /home/kafka/twitter-stream
    • download the pom.xml (see references)
    • vi /home/kafka/twitter-stream/pom.xml
    • Added the dependencies section so that the Jar has all the dependencies needed
  • producer
    • mkdir -p /home/kafka/twitter-stream/src/main/java/com/saurzcode/twitter
    • download the TwitterKafkaProducer.java (see references)
    • vi /home/kafka/twitter-stream/src/main/java/com/saurzcode/twitter/TwitterKafkaProducer.java
  • Main Changes in producer
    • Changed the name of the topic
    • Changed the pattern to be searched in twitter

Compile the Code

  • cd /home/kafka/twitter-stream
  • /home/kafka/apache-maven-3.5.0/bin/mvn clean compile package
  • Below Jar is produced
    • /home/kafka/twitter-stream/target/twitter-stream-0.0.1-SNAPSHOT-jar-with-dependencies.jar
  • The below class is found in the Jar file
    • jar -tvf /home/kafka/twitter-stream/target/twitter-stream-0.0.1-SNAPSHOT-jar-with-dependencies.jar | grep –color TwitterKafkaProducer
    • com/saurzcode/twitter/TwitterKafkaProducer

Start the Consumer (same one we used earlier)

  • java -cp “/home/kafka/kafka_2.11-0.11.0.0/libs/*”:/home/kafka/consumer_sample_2.11-1.0.jar ConsumerExample
    • Output will be produced in this window

Copy and Start the Producer

  • scp kafka@kfdmc6:/home/kafka/twitter-stream/target/twitter-stream-0.0.1-SNAPSHOT-jar-with-dependencies.jar /home/kafka/.
  • java -cp “/home/kafka/kafka_2.11-0.11.0.0/libs/*”:/home/kafka/twitter-stream-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.saurzcode.twitter.itterKafkaProducer Ehw……..CeUF ETd………..ZM25 714..K26A ybw….EQ4
  • The Producer takes the below inputs in this order
    • **consumerKey** **consumerSecret** **token** **secret**
    • Please provide the actual appropriate values per your application
  • I sent the tweet “testing #segintech”
  • The below O/P was produced on the Consumer Window when I sent a Tweet using my account
    • Received message: (key=”null”, value=”{“created_at”:”Thu Jul 27 03:03:44 +0000 2017″,”id”:890407329560227841,”id_str”:”890407329560227841″,”text”:”testing #segintech”,”source”:”\u003ca href=\”http:\/\/twitter.com\” rel=\”nofollow\”\u003eTwitter Web Client\u003c\/a\u003e”,”truncated”:false,”in_reply_to_status_id”:null,”in_reply_to_status_id_str”:null,”in_reply_to_user_id”:null,”in_reply_to_user_id_str”:null,”in_reply_to_screen_name”:null,”user”:{“id”:714084585601179648,”id_str”:”714084585601179648″,”name”:”Pathik Paul”,”screen_name”:”segintech”,”location”:null,”url”:null,”description”:null,”protected”:false,”verified”:false,”followers_count”:2,”friends_count”:5,”listed_count”:0,”favourites_count”:0,”statuses_count”:3,”created_at”:”Sun Mar 27 13:40:07 +0000 2016″,”utc_offset”:null,”time_zone”:null,”geo_enabled”:false,”lang”:”en”,”contributors_enabled”:false,”is_translator”:false,”profile_background_color”:”F5F8FA”,”profile_background_image_url”:””,”profile_background_image_url_https”:””,”profile_background_tile”:false,”profile_link_color”:”1DA1F2″,”profile_sidebar_border_color”:”C0DEED”,”profile_sidebar_fill_color”:”DDEEF6″,”profile_text_color”:”333333″,”profile_use_background_image”:true,”profile_image_url”:”http:\/\/abs.twimg.com\/sticky\/default_profile_images\/default_profile_normal.png”,”profile_image_url_https”:”https:\/\/abs.twimg.com\/sticky\/default_profile_images\/default_profile_normal.png”,”default_profile”:true,”default_profile_image”:false,”following”:null,”follow_request_sent”:null,”notifications”:null},”geo”:null,”coordinates”:null,”place”:null,”contributors”:null,”is_quote_status”:false,”retweet_count”:0,”favorite_count”:0,”entities”:{“hashtags”:[{“text”:”segintech”,”indices”:[8,18]}],”urls”:[],”user_mentions”:[],”symbols”:[]},”favorited”:false,”retweeted”:false,”filter_level”:”low”,”lang”:”en”,”timestamp_ms”:”1501124624992″}”) at offset =”736″

    The Files: TwitterKafkaProducer.java

    $ cat /home/kafka/twitter-stream/src/main/java/com/saurzcode/twitter/TwitterKafkaProducer.java
    package com.saurzcode.twitter;
     
    import java.util.Properties;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
     
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
     
    import com.google.common.collect.Lists;
    import com.twitter.hbc.ClientBuilder;
    import com.twitter.hbc.core.Client;
    import com.twitter.hbc.core.Constants;
    import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
    import com.twitter.hbc.core.processor.StringDelimitedProcessor;
    import com.twitter.hbc.httpclient.auth.Authentication;
    import com.twitter.hbc.httpclient.auth.OAuth1;
     
    public class TwitterKafkaProducer {
     
           //private static final String topic = “twitter-topic”;
           private static final String topic = “test2”;
     
           public static void run(String consumerKey, String consumerSecret,
                           String token, String secret) throws InterruptedException {
     
                   Properties properties = new Properties();
                   properties.put(“metadata.broker.list”, “192.168.77.10:9092,192.168.77.11:9092”);
                   properties.put(“serializer.class”, “kafka.serializer.StringEncoder”);
                   properties.put(“client.id”,”camus”);
                   ProducerConfig producerConfig = new ProducerConfig(properties);
                   kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(
                                   producerConfig);
     
                   BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
                   StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
                   // add some track terms
                   endpoint.trackTerms(Lists.newArrayList(“#segintech”));
     
                   Authentication auth = new OAuth1(consumerKey, consumerSecret, token,
                                   secret);
                   // Authentication auth = new BasicAuth(username, password);
     
                   // Create a new BasicClient. By default gzip is enabled.
                   Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
                                   .endpoint(endpoint).authentication(auth)
                                   .processor(new StringDelimitedProcessor(queue)).build();
     
                   // Establish a connection
                   client.connect();
     
                   // Do whatever needs to be done with messages
                   for (int msgRead = 0; msgRead < 1000; msgRead++) {
                           KeyedMessage<String, String> message = null;
                           try {
                                   message = new KeyedMessage<String, String>(topic, queue.take());
                           } catch (InterruptedException e) {
                                   e.printStackTrace();
                           }
                           producer.send(message);
                   }
                   producer.close();
                   client.stop();
     
           }
     
           public static void main(String[] args) {
                   try {
                           TwitterKafkaProducer.run(args[0], args[1], args[2], args[3]);
                   } catch (InterruptedException e) {
                           System.out.println(e);
                   }
           }
    }
    

    The Files: pom.xml

     

    $ cat /home/kafka/twitter-stream/pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
           <modelVersion>4.0.0</modelVersion>
           <groupId>com.saurzcode.twitter</groupId>
           <artifactId>twitter-stream</artifactId>
           <version>0.0.1-SNAPSHOT</version>
    <dependencies>
                   <dependency>
                           <groupId>com.twitter</groupId>
                           <artifactId>hbc-core</artifactId>
                           <version>2.2.0</version>
                   </dependency>
                   <dependency>
                           <groupId>org.apache.kafka</groupId>
                           <artifactId>kafka_2.8.0</artifactId>
                           <version>0.8.1.1</version>
                   </dependency>
                   <dependency>
                           <groupId>log4j</groupId>
                           <artifactId>log4j</artifactId>
                           <version>1.2.16</version>
                           <exclusions>
                                   <exclusion>
                                           <groupId>javax.jms</groupId>
                                           <artifactId>jms</artifactId>
                                   </exclusion>
                           </exclusions>
                   </dependency>
                   <dependency>
                           <groupId>org.slf4j</groupId>
                           <artifactId>slf4j-simple</artifactId>
                           <version>1.6.4</version>
                   </dependency>
                   <dependency>
                           <groupId>com.google.guava</groupId>
                           <artifactId>guava</artifactId>
                           <version>18.0</version>
                   </dependency>
    </dependencies>
    <build>
               <plugins>
               <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-assembly-plugin</artifactId>
                 <executions>
                   <execution>
                     <phase>package</phase>
                     <goals>
                       <goal>single</goal>
                     </goals>
                     <configuration>
                       <descriptorRefs>
                         <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                     </configuration>
                   </execution>
                 </executions>
               </plugin>
               </plugins>
    </build>
    </project>
    

     

    References:

    http://saurzcode.in/2015/02/kafka-producer-using-twitter-stream/
    https://github.com/saurzcode/twitter-stream/blob/master/src/main/java/com/saurzcode/twitter/TwitterKafkaProducer.java
    https://raw.githubusercontent.com/saurzcode/twitter-stream/master/pom.xml

    Author: Pathik Paul

    Webmaster: Ella Paul

Leave a Reply

Your email address will not be published. Required fields are marked *