Before MapReduce, relational operations like filter, join, sorting, and grouping were the primary operations used for processing large datasets. MapReduce can very easily support operations like filter and sorting. For more information, refer to 2.3.3 Relational-Algebra Operations of the free available book Mining of Massive Datasets, Anand Rajaraman and Jeffrey D. Ullman, Cambridge University Press, 2011.
This recipe explains how to use MapReduce to join two datasets. It will join 100 customers who have bought most items against the dataset that provides items bought by each customer and produce a list of items brought by the 100 most-frequent customers as output.
This assumes that you have installed Hadoop and started it. Refer to the Writing a word count application using Java (Simple) and Installing Hadoop in a distributed setup and running a word count application (Simple) recipes for more information. We will use
HADOOP_HOME
to refer to the Hadoop installation directory.This recipe assumes you are aware of how Hadoop processing works. If you have not already done so, you should follow the Writing a word count application with MapReduce and running it (Simple) recipe.
Download the sample code for the chapter and download the data files as described in the Writing a word count application with MapReduce and running it (Simple) recipe. Select a subset of data from the Amazon dataset if you are running this with few computers. You can find the smaller dataset with sample directory.
This sample uses the data created from earlier recipes. If you have not already run it, please run it first.
Upload the amazon dataset to the HDFS filesystem using the following commands, if not already done so:
> bin/hadoop dfs -mkdir /data/ > bin/hadoop dfs -mkdir /data/amazon-dataset > bin/hadoop dfs -put <SAMPLE_DIR>/amazon-meta.txt /data/amazon-dataset/
Copy the
hadoop-microbook.jar
file fromSAMPLE_DIR
toHADOOP_HOME
.Run the following MapReduce job to create the dataset that provides items brought by customers. To do that run the following command from
HADOOP_HOME
:$ bin/hadoop jar hadoop-microbook.jar microbook.join.Customer2ItemCalculater /data/amazon-dataset /data/join-output1
Copy the output of MapReduce job and output of the earlier recipe to the input directory. Note that the names of the files must be
mostFrequentBuyers.data
anditemsByCustomer.data
.>bin/hadoop dfs -mkdir /data/join-input > bin/hadoop dfs -cp /data/join-output1/part-r-00000 /data/join-input/itemsByCustomer.data > bin/hadoop dfs -cp /data/frequency-output1/part-r-00000 /data/join-input/mostFrequentBuyers.data
Run the second MapReduce job. To do that run the following command from
HADOOP_HOME
:$ bin/hadoop jar hadoop-microbook.jar microbook.join.BuyerRecordJoinJob /data/join-input /data/join-output2
Your can find the results from the output directory,
/data/join-output2
.
You can find the mapper and reducer code at src/microbook/join/BuyerRecordJoinJob.java
.
The first MapReduce job emits the items brought against the customer ID. The mapper emits customer ID as the key and item IDs as values. The reducer receives customer IDs as keys and item IDs emitted against that customer ID as values. It emits key and value without any changes.
We join the two datasets using the customer IDs. Here we put files for both sets into the same input directory. Hadoop will read the input files from the input folder and read records from the file. It invokes the mapper once per each record passing the record as input.
When the mapper receives an input, we find out which line belongs to which dataset by getting the filename using InputSplit
available through the Hadoop context. For the list of frequent customers, we emit customer ID as both key and the value and for the other dataset, we emit customer ID as the key and list of items as the value.
public void map(Object key, Text value, Context context){ String currentFile = ((FileSplit)context .getInputSplit()).getPath().getName(); Matcher matcher = parsingPattern .matcher(value.toString()); if (matcher.find()) { String propName = matcher.group(1); String propValue = matcher.group(2); if(currentFile.contains("itemsByCustomer.data")){ context.write(new Text(propName), new Text(propValue)); }else if(currentFile.equals("mostFrequentBuyers.data")){ context.write(new Text(propName), new Text(propValue)); }else{ throw new IOException("Unexpected file " + currentFile); } } }
Hadoop will sort the key-value pairs by the key and invokes the reducer once for each unique key passing the list of values as the second argument. The reducer inspects the list of values, and if the values also contain the customer ID, it emits customer ID as the key and list of items as the value.
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { boolean isPresent = false; String itemList = null; for (Text val : values) { if(val.toString().equals(key.toString())){ isPresent = true; }else{ itemList = val.toString(); } } if(isPresent && itemList != null){ context.write(key, new Text(itemList)); } }