Set operations are a useful tool we use when we want to understand a dataset. This recipe will explain how to use MapReduce to perform a set operation on a large dataset. The following MapReduce job calculates the set difference between the customers who have bought the items that have an amazon sales rank less than 100 and most frequent customers which we calculated in the earlier recipe.
This assumes that you have installed Hadoop and started it. Refer to the Writing a word count application using Java (Simple) and Installing Hadoop in a distributed setup and running a word count application (Simple) recipes for more information. We will use
HADOOP_HOME
to refer to the Hadoop installation directory.This recipe assumes you are aware of how Hadoop processing works. If you have not already done so, you should follow the Writing a word count application with MapReduce and running it (Simple) recipe.
Download the sample code for the chapter and download the data files as described in the Writing a word count application with MapReduce and running it (Simple) recipe. Select a subset of data from the Amazon dataset if you are running this with few computers. You can find the smaller dataset with the sample directory.
This sample uses the data created from earlier recipes. If you have not already run it, please run it first.
If you have not already done so, let us upload the amazon dataset to the HDFS filesystem using the following commands:
> bin/hadoop dfs -mkdir /data/ > bin/hadoop dfs -mkdir /data/amazon-dataset > bin/hadoop dfs -put <SMAPLE_DIR>/amazon-meta.txt /data/amazon-dataset > bin/Hadoop dfs –mkdir /data/set-input
Copy the output from earlier recipes to the output directory.
>bin/hadoop dfs -cp /data/frequency-output1/part-r-00000 /data/set-input/mostFrequentBuyers.data
Copy the
hadoop-microbook.jar
file fromSAMPLE_DIR
toHADOOP_HOME
.Run the first MapReduce job. To do that run the following command from
HADOOP_HOME
:$ bin/hadoop jar hadoop-microbook.jar microbook.set.FindCustomersBroughtFirst100Items /data/amazon-dataset /data/set-output1
Copy the output of the MapReduce job and output of the earlier recipe to the input directory.
> bin/hadoop dfs -cp /data/set-output1/part-r-00000 /data/set-input/first100ItemBuyers.data
Run the second MapReduce job. To do that run the following command from
HADOOP_HOME
:$ bin/hadoop jar hadoop-microbook.jar microbook.set.BuyersSetDifference /data/set-input /data/set-output2
You can find the results from the output directory at
/data/set-output2
.
You can find the mapper and reducer code at src/microbook/BuyersSetDifference.java
.
We define the set difference between the two sets S1 and S2, written as S1-S2, as the items that are in set S1 but not in set S2.
To perform set difference, we label each element at the mapper with the set it came from. Then send the search to a reducer, which emits an item only if it is in the first set, but not in the second set. The preceding figure shows the execution of the MapReduce job. Also the following code listing shows the map function and the reduce function.
Let us look at the execution in detail.
Here we put files for both sets into the same input directory. Hadoop will read the input files from the input folder and read records from each file. It invokes the mapper once per each record passing the record as input.
When the mapper receives an input, we finds out which line belongs to which set by getting the filename using InputSplit
available through the Hadoop context. Then we emit elements in the set as the key and the set name (1 or 2) as the value.
public void map(Object key, Text value, Context context) { String currentFile = ((FileSplit)context.getInputSplit()).getPath().getName(); Matcher matcher = parsingPattern.matcher(value.toString()); if (matcher.find()) { String propName = matcher.group(1); String propValue = matcher.group(2); if(currentFile.equals("first100ItemBuyers.data")){ context.write(new Text(propName), new IntWritable(1)); }else{ if(currentFile.equals("mostFrequentBuyers.data")){ int count = Integer.parseInt(propValue); if(count > 100){ context.write(new Text(propName), new IntWritable(2)); } }else{ throw new IOException("Unexpected file " + currentFile); } } else { System.out.println(currentFile + ":Unprocessed Line " + value); } }
Hadoop will sort the key-value pairs by the key and invoke the reducer once for each unique key, passing the list of values as the second argument. The reducer inspects the list of values, which contain the name of sets the values comes from, and then emits the key only if the given value is in the first set, but not in the second.
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { boolean has1 = false; boolean has2 = false; System.out.print(key + "="); for (IntWritable val : values) { switch(val.get()){ case 1: has1 = true; break; case 2: has2 = true; break; } System.out.println(val); } if(has1 && !has2){ context.write(key, new IntWritable(1)); } }
We can use MapReduce to implement most set operations by labeling elements against the sets they came from using a similar method and changing the reducer logic to emit only relevant elements. For example, we can implement the set intersection by changing the reducer to emit only elements that have both sets as values.