Book Image

Instant MapReduce Patterns - Hadoop Essentials How-to

By : Liyanapathirannahelage H Perera
Book Image

Instant MapReduce Patterns - Hadoop Essentials How-to

By: Liyanapathirannahelage H Perera

Overview of this book

MapReduce is a technology that enables users to process large datasets and Hadoop is an implementation of MapReduce. We are beginning to see more and more data becoming available, and this hides many insights that might hold key to success or failure. However, MapReduce has the ability to analyze this data and write code to process it.Instant MapReduce Patterns – Hadoop Essentials How-to is a concise introduction to Hadoop and programming with MapReduce. It is aimed to get you started and give you an overall feel for programming with Hadoop so that you will have a well-grounded foundation to understand and solve all of your MapReduce problems as needed.Instant MapReduce Patterns – Hadoop Essentials How-to will start with the configuration of Hadoop before moving on to writing simple examples and discussing MapReduce programming patterns.We will start simply by installing Hadoop and writing a word count program. After which, we will deal with the seven styles of MapReduce programs: analytics, set operations, cross correlation, search, graph, Joins, and clustering. For each case, you will learn the pattern and create a representative example program. The book also provides you with additional pointers to further enhance your Hadoop skills.
Table of Contents (7 chapters)

Relational operations – join two datasets with MapReduce (Advanced)


Before MapReduce, relational operations like filter, join, sorting, and grouping were the primary operations used for processing large datasets. MapReduce can very easily support operations like filter and sorting. For more information, refer to 2.3.3 Relational-Algebra Operations of the free available book Mining of Massive Datasets, Anand Rajaraman and Jeffrey D. Ullman, Cambridge University Press, 2011.

This recipe explains how to use MapReduce to join two datasets. It will join 100 customers who have bought most items against the dataset that provides items bought by each customer and produce a list of items brought by the 100 most-frequent customers as output.

Getting ready

  1. This assumes that you have installed Hadoop and started it. Refer to the Writing a word count application using Java (Simple) and Installing Hadoop in a distributed setup and running a word count application (Simple) recipes for more information. We will use HADOOP_HOME to refer to the Hadoop installation directory.

  2. This recipe assumes you are aware of how Hadoop processing works. If you have not already done so, you should follow the Writing a word count application with MapReduce and running it (Simple) recipe.

  3. Download the sample code for the chapter and download the data files as described in the Writing a word count application with MapReduce and running it (Simple) recipe. Select a subset of data from the Amazon dataset if you are running this with few computers. You can find the smaller dataset with sample directory.

  4. This sample uses the data created from earlier recipes. If you have not already run it, please run it first.

How to do it...

  1. Upload the amazon dataset to the HDFS filesystem using the following commands, if not already done so:

    > bin/hadoop dfs -mkdir /data/
    > bin/hadoop dfs -mkdir /data/amazon-dataset
    > bin/hadoop dfs -put <SAMPLE_DIR>/amazon-meta.txt /data/amazon-dataset/
    
  2. Copy the hadoop-microbook.jar file from SAMPLE_DIR to HADOOP_HOME.

  3. Run the following MapReduce job to create the dataset that provides items brought by customers. To do that run the following command from HADOOP_HOME:

    $ bin/hadoop jar hadoop-microbook.jar  microbook.join.Customer2ItemCalculater /data/amazon-dataset /data/join-output1
    
  4. Copy the output of MapReduce job and output of the earlier recipe to the input directory. Note that the names of the files must be mostFrequentBuyers.data and itemsByCustomer.data.

    >bin/hadoop dfs -mkdir /data/join-input
    > bin/hadoop dfs -cp /data/join-output1/part-r-00000 /data/join-input/itemsByCustomer.data
    > bin/hadoop dfs -cp /data/frequency-output1/part-r-00000 /data/join-input/mostFrequentBuyers.data
    
  5. Run the second MapReduce job. To do that run the following command from HADOOP_HOME:

    $ bin/hadoop jar hadoop-microbook.jar  microbook.join.BuyerRecordJoinJob /data/join-input /data/join-output2
    
  6. Your can find the results from the output directory, /data/join-output2.

How it works...

You can find the mapper and reducer code at src/microbook/join/BuyerRecordJoinJob.java.

The first MapReduce job emits the items brought against the customer ID. The mapper emits customer ID as the key and item IDs as values. The reducer receives customer IDs as keys and item IDs emitted against that customer ID as values. It emits key and value without any changes.

We join the two datasets using the customer IDs. Here we put files for both sets into the same input directory. Hadoop will read the input files from the input folder and read records from the file. It invokes the mapper once per each record passing the record as input.

When the mapper receives an input, we find out which line belongs to which dataset by getting the filename using InputSplit available through the Hadoop context. For the list of frequent customers, we emit customer ID as both key and the value and for the other dataset, we emit customer ID as the key and list of items as the value.

public void map(Object key, Text value, Context context){
    String currentFile =  ((FileSplit)context
         .getInputSplit()).getPath().getName();
    Matcher matcher = parsingPattern
         .matcher(value.toString());
    if (matcher.find()) {
        String propName = matcher.group(1);
        String propValue = matcher.group(2);
        if(currentFile.contains("itemsByCustomer.data")){
            context.write(new Text(propName), 
               new Text(propValue));
        }else 
               if(currentFile.equals("mostFrequentBuyers.data")){
    context.write(new Text(propName), 
                   new Text(propValue));
         }else{
             throw new IOException("Unexpected file "
                + currentFile); 
         }
    } 
}

Hadoop will sort the key-value pairs by the key and invokes the reducer once for each unique key passing the list of values as the second argument. The reducer inspects the list of values, and if the values also contain the customer ID, it emits customer ID as the key and list of items as the value.

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
        InterruptedException {
    boolean isPresent = false;
    String itemList = null; 
    for (Text val : values) {
        if(val.toString().equals(key.toString())){
                isPresent = true;
        }else{
            itemList = val.toString();
        }
    }
    if(isPresent && itemList != null){
        context.write(key, new Text(itemList));    
    }
}

There's more...

Here the main idea is to send the information needed to be joined to the same reducer using the same key at the mapper and performing the joining logic at the reducer. The same idea can be used to join any kind of dataset.