Word Count Example Core Java to Hadoop 1 and Hadoop 2
Simple Java Program that Counts words
In this blog we will build a simple java program and run the same using first traditional technology and then using Hadoop. We are using wordcount, the program counts the number of words in all the files in a directory and produces a list of words with their counts.
- vi MyWordCount.java
import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.TreeMap; import java.util.Scanner; import java.util.Iterator; import java.util.Map; public class MyWordCount{ public static void main (String[] args) { int i = 0; String filename; Scanner scanFile = null; String theword; TreeMap<String, Integer> map = new TreeMap <String, Integer> (); if ( args.length != 1 ) { System.err.println ( "Please provide the input directory on the command line"); System.exit (1); } File dir = new File (args[0]); File[] listOfFiles = dir.listFiles (); for ( i = 0 ; i < listOfFiles.length; i++ ) { filename = listOfFiles[i]+""; try { scanFile = new Scanner ( new FileReader (filename)); } catch (FileNotFoundException e) { e.printStackTrace (); } while (scanFile.hasNext() ) { theword = scanFile.next().trim(); if (map.containsKey (theword)) { map.put(theword, map.get(theword) + 1); } else { map.put(theword, 1); } } } Iterator it = map.entrySet().iterator() ; while (it.hasNext () ) { Map.Entry pair = (Map.Entry)it.next(); System.out.println(pair.getKey()+"\t"+pair.getValue()); it.remove (); } } } // Thanks to Ankur Raj for providing help with this program</li>
- We Compile / Build it using below Step
- javac MyWordCount.java
- We run using below step ( output goes to screen in this case)
- java -cp . MyWordCount ~/input
Hadoop 1 / Map Reduce Version 1
http://www.cs.cmu.edu/~abeutel/WordCount.java
https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html
- $ hadoop version
- Hadoop 1.2.1
- $ vi WordCountH1.java
import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCountH1 { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCountH1.class); conf.setJobName("wordcount_Hadoop1"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
- We Compile / Build it using below Steps.
- $ mkdir wch1
- $ javac -classpath /home/hadoop/hadoop1/hadoop-core-1.2.1.jar WordCountH1.java -d wch1
- $ jar -cvf wch1.jar -C wch1/ .
- We run using below step ( output goes to hdfs)
- $ hadoop jar ~/work/wch1.jar WordCountH1 /input /outputh1
- 16/02/14 01:19:32 INFO mapred.JobClient: map 0% reduce 0%
- 16/02/14 01:20:19 INFO mapred.JobClient: map 100% reduce 100%
- $ hadoop dfs -cat /outputh1/part-00000 | tail -5
- would 7
- writing, 6
- written 1
- xmlns:xsl=”http://www.w3.org/1999/XSL/Transform” 1
- you 8
- $
Hadoop 2 / Map Reduce v2 / Yarn
- As a First step we will verify that the same Jar works in Hadoop2
- $ hadoop version
- Hadoop 2.6.3
- $ hadoop jar ~/work/wch1.jar WordCountH1 /input /outputh1
- 16/02/14 01:30:39 INFO mapreduce.Job: map 0% reduce 0%
- 16/02/14 01:40:12 INFO mapreduce.Job: map 100% reduce 100%
- Yes it works and produces the same output
- Now We will use a new program:
- https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html
- $ vi WordCountH2.java
import java.io.*; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCountH2 extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive = true; private long numRecords = 0; private String inputFile; public void configure(JobConf job) { caseSensitive = job.getBoolean("wordcount.case.sensitive", true); inputFile = job.get("map.input.file"); } public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCountH2.class); conf.setJobName("wordcount_hadoop2"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0] )); FileOutputFormat.setOutputPath(conf, new Path(args[1] )); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCountH2(), args); System.exit(res); } }
- We Compile / Build it using below Steps.
- $ mkdir wch2
- $ javac -classpath `hadoop classpath` WordCountH2.java -d wch2
- $ jar -cvf wch2.jar -C wch2/ .
- We run using below step ( output goes to hdfs)
- $ hadoop jar ~/work/wch2.jar WordCountH2 /input /outputh2
- 16/02/14 02:22:38 INFO mapreduce.Job: map 16% reduce 0%
- 16/02/14 02:32:21 INFO mapreduce.Job: map 100% reduce 100%
- $ hdfs dfs -ls -R /outputh2
- -rw-r–r– 1 hadoop supergroup 0 2016-02-14 02:32 /outputh2/_SUCCESS
- -rw-r–r– 1 hadoop supergroup 15992 2016-02-14 02:32 /outputh2/part-00000
- $ hdfs dfs -cat /outputh2/part-00000 | tail -5
- would 7
- writing, 6
- written 1
- xmlns:xsl=”http://www.w3.org/1999/XSL/Transform” 1
- you 8
- $
Author: Pathik Paul