Book Image

Instant MapReduce Patterns - Hadoop Essentials How-to

By : Liyanapathirannahelage H Perera
Book Image

Instant MapReduce Patterns - Hadoop Essentials How-to

By: Liyanapathirannahelage H Perera

Overview of this book

MapReduce is a technology that enables users to process large datasets and Hadoop is an implementation of MapReduce. We are beginning to see more and more data becoming available, and this hides many insights that might hold key to success or failure. However, MapReduce has the ability to analyze this data and write code to process it.Instant MapReduce Patterns – Hadoop Essentials How-to is a concise introduction to Hadoop and programming with MapReduce. It is aimed to get you started and give you an overall feel for programming with Hadoop so that you will have a well-grounded foundation to understand and solve all of your MapReduce problems as needed.Instant MapReduce Patterns – Hadoop Essentials How-to will start with the configuration of Hadoop before moving on to writing simple examples and discussing MapReduce programming patterns.We will start simply by installing Hadoop and writing a word count program. After which, we will deal with the seven styles of MapReduce programs: analytics, set operations, cross correlation, search, graph, Joins, and clustering. For each case, you will learn the pattern and create a representative example program. The book also provides you with additional pointers to further enhance your Hadoop skills.
Table of Contents (7 chapters)

Cross correlation with MapReduce (Intermediate)


Cross correlation detects the number of times two things occur together. For example, in the Amazon dataset, if two buyers have bought the same item, we say that they are cross correlated. Through cross correlation, we count the number of times two customers have bought a same item.

Getting ready

  1. This assumes that you have installed Hadoop and started it. Writing a word count application using Java (Simple) and Installing Hadoop in a distributed setup and running a word count application (Simple) recipes for more information. We will use the HADOOP_HOME to refer to the Hadoop installation directory.

  2. This recipe assumes you are aware of how Hadoop processing works. If you have not already done so, you should follow the Writing a word count application with MapReduce and running it (Simple) recipe.

  3. Download the sample code for the chapter and download the data files as described in the Writing a word count application with MapReduce and running it (Simple) recipe. Select a subset of data from the Amazon dataset if you are running this with few computers. You can find the smaller dataset with the sample directory.

How to do it...

  1. Upload the Amazon dataset to the HDFS filesystem using the following commands from HADOOP_HOME, if you have not already done so:

    > bin/hadoop dfs -mkdir /data/
    > bin/hadoop dfs -mkdir /data/amazon-dataset
    > bin/hadoop dfs -put <DATA_DIR>/amazon-meta.txt /data/amazon-dataset/
    > bin/hadoop dfs -ls /data/amazon-dataset
    
  2. Copy the hadoop-microbook.jar file from SAMPLE_DIR to HADOOP_HOME

  3. Run the MapReduce job to calculate the buying frequency using the following command from HADOOP_HOME:

    $ bin/hadoop jar hadoop-microbook.jar  microbook.crosscorrelation.CustomerCorrleationFinder /data/amazon-dataset /data/cor-output1
    
  4. You can find the results from the output directory /data/cor-output1.

How it works...

You can find the mapper and reducer code at src/microbook/Crosscorrelation/CustomerCorrleationFinder.java.

The preceding figure shows the execution of the MapReduce job. Also the following code listing shows the map function and the reduce function of the first job:

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    List<BuyerRecord> records =  
        BuyerRecord.parseAItemLine(value.toString());
    List<String> customers = new ArrayList<String>();

    for(BuyerRecord record: records){
        customers.add(record.customerID);
    }

    for(int i =0;i< records.size();i++){
        StringBuffer buf = new StringBuffer();
        int index = 0;
        for(String customer:customers){
            if(index != i){
                buf.append(customer).append(",");
            }
        }
        context.write(new Text(customers.get(i)), 
        new Text(buf.toString()));
    }
}

As shown by the figure, Hadoop will read the input file from the input folder and read records using the custom formatter we introduced in the Write a formatter (Intermediate) recipe. It invokes the mapper once per each record passing the record as input.

The map function reads the record of a date item and extracts the sales data from the data item. Buyers in the sales data have a cross correlation with each other because they have bought the same item.

Most trivial implementations of cross correlation will emit each pair of buyers that have a cross correlation from the map, and count the number of occurrences at the reduce function after the MapReduce step has grouped the same buyers together.

However, this would generate more than the square of the number of different buyers, and for a large dataset, this can be a very large number. Therefore, we will use a more optimized version, which limits the number of keys.

Instead the mapper emits the buyer as the key and emits all other buyers, paired with that mapper, as keys.

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
        InterruptedException {
    Set<String> customerSet = new HashSet<String>();
    for(Text text: values){
        String[] split = text.toString().split(",");
        for(String token:split){
            customerSet.add(token);
        }
    }

    StringBuffer buf = new StringBuffer();
    for(String customer: customerSet){
        if(customer.compareTo(key.toString()) < 0){
            buf.append(customer).append(",");    
        }
    }
      
    buf.append("|").append(Integer.MAX_VALUE)
        .append("|").append(SimilarItemsFinder.Color.White);
    context.write(new Text(key), new Text(buf.toString()));
}

MapReduce will sort the key-value pairs by the key and invoke the reducer once for each unique key passing the list of values emitted against that key as the input.

The reducer, then, calculates the pairs and counts how many times each pair has occurred. Given two buyers B1 and B2, we can emit B1, B2 or B2, B1 as pairs, thus generating duplicate data. We avoid that by only emitting a pair when B1 is lexicographically less than B2.

There's more...

Cross correlation is one of the hard problems for MapReduce as it generates large amount of pairs. It generally works with only a smaller-size dataset.