The first recipe explained how to implement the word count application without MapReduce, and limitations of the implementation. This recipe explains how to implement a word counting application with MapReduce and explains how it works.
This recipe assumes that you have access to a computer that has Java Development Kit (JDK) installed and the
JAVA_HOME
variable configured.Download a Hadoop distribution 1.1.x from http://hadoop.apache.org/releases.html page.
Unzip the distribution; we will call this directory
HADOOP_HOME
. Now you can run Hadoop jobs in local mode.Download the sample code for the book and download the data files as described in the first recipe. We call that directory as
DATA_DIR
.Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
Copy the
hadoop-microbook.jar
file fromSAMPLE_DIR
toHADOOP_HOME
.Run the MapReduce job through the following command from
HADOOP_HOME
:$bin/hadoop -cp hadoop-microbook.jar microbook.wordcount.WordCount amazon-meta.txt wordcount-output1
Your can find the results from output directory.
It will print the results as follows:
B00007ELF7=1 Vincent[412370]=2 35681=1
You can find the source code for the recipe at src/microbook/wordcount/WordCount.java
.
The word count job accepts an input directory, a mapper function, and a reducer function as inputs. We use the mapper function to process the data in parallel, and we use the reducer function to collect results of the mapper and produce the final results. Mapper sends its results to reducer using a key-value based model. Let us walk through a MapReduce execution in detail.
The following diagram depicts the MapReduce job execution, and the following code listing shows the mapper and reducer functions:
When you run the MapReduce job, Hadoop first reads the input files from the input directory line by line. Then Hadoop invokes the mapper once for each line passing the line as the argument. Subsequently, each mapper parses the line, and extracts words included in the line it received as the input. After processing, the mapper sends the word count to the reducer by emitting the word and word count as name value pairs.
public void map(Object key, Text value, Context context) { StringTokenizeritr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } }
Hadoop collects all name value pairs emitted from the mapper functions, and sorts them by the key. Here the key is the word and value is the number of occurrences of the word. Then it invokes the reducer once for each key passing all the values emitted against the same key as arguments. The reducer calculates the sum of those values and emits them against the key. Hadoop collects results from all reducers and writes them to the output file.
public void reduce(Text key, Iterable<IntWritable> values, Context context) { int sum = 0; for (IntWritableval : values) { sum += val.get(); } result.set(sum); context.write(key, result); }
The following code shows the main method that will invoke the job. It configures mapper, reducer, input and output formats, and input and output directories. Here, input and output of mapper and reducer match the values configured with setOutputKeyClass(..)
, setOutputValueClass(..)
, job.setMapOutputKeyClass(..)
, and job.setMapOutputValueClass(..)
:
JobConfconf = new JobConf(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: <in><out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
In the model, the map function is used to process data in parallel and distribute them to the reducers, and we use the reduce function to collect the results together.