Book Image

Learning Cascading

Book Image

Learning Cascading

Overview of this book

Table of Contents (18 chapters)
Learning Cascading
Credits
Foreword
About the Authors
About the Reviewers
www.PacktPub.com
Preface
7
Optimizing the Performance of a Cascading Application
Index

MapReduce execution framework


MapReduce jobs are orchestrated by two primary server types—the JobTracker and the TaskTracker. There is one JobTracker, and it uses one or more TaskTrackers running on slave nodes where it distributes the work (the mappers and reducers).

The JobTracker

The JobTracker is the manager of all the jobs that are submitted to Hadoop and it performs many tasks. It queues jobs into an input queue, and then determines the order they are allowed to run in. Hadoop starts with a very basic approach here, which is to run jobs in the order in which they are received, or first come first served (FCFS). Clearly, this can be very inefficient, so Hadoop allows this to be customized by providing several other types of schedulers that take into account system capacity, assigned job priority, assigned job class, and so on.

The TaskTracker

The TaskTracker is the manager of all the tasks that are started by the JobTracker. A TaskTracker is responsible for a lot of things:

  • It physically manages the JVM container that will run the task.

    • Note that typically, a JVM requires a few seconds to initialize. Therefore, Hadoop provides a, mapred.job.reuse.jvm.num.tasks parameter, that can be used to reuse an existing JVM. Its default value is 1.

    Note

    YARN does not support the reuse of JVM.

  • It starts the mapper or reducer that it has been assigned.

  • It handles I/O mapping for its assigned task.

  • It reports progress back to the JobTracker.

The TaskTracker also provides a "heartbeat" back to the JobTracker. This is a message sent by default every 3 seconds. It is configurable through the dfs.heartbeat.interval parameter.

Hadoop jobs

Every job that executes on the Hadoop cluster is represented by a Job object. Through this object, the job also has access to a Configuration object that can be used to pass information to its tasks as they execute. This object is essentially a HashMap of keys and values, where the key represents the parameter name, and the value represents the value of the parameter. In a job, you create this object, set values on it, and then use it to first create the Job object, which is then submitted to the cluster:

import org.apache.hadoop.conf.Configuration;

Configuration conf = new Configuration();
conf.set("parm", "value");
// Set other configuration items
Job job = Job.getInstance(conf);
// Set up your job
job.setJarByClass(MRJob.class);
job.setJobName("MRJob");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MRMapper.class);
job.setReducerClass(MRReducer.class);
job.setPartitionerClass(MRPartitioner.class);
job.setSortComparatorClass(MRComparator.class);
job.setMapOutputKeyClass(org.apache.hadoop.io.Text.class);
job.setMapOutputValueClass(org.apache.hadoop.io.Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);

int return_code = (job.waitForCompletion(true)) ? 1 : 0;
return return_code;

Distributed cache

One important thing to remember is that when your job is run and it submits your work, it will run on a boundary machine, but the components of your job (its mappers and reducers) will run on nodes within the cluster. Therefore, local files that can be accessed at the time the job is submitted are not automatically available to the actual running job. In these cases, in order for your job to run on the Hadoop cluster, there needs to be some way for these external files to be copied to the cluster. This is true when:

  • Additional JAR files are required to be in CLASSPATH

  • Various other files are needed, containing parameters, lookup data, and so on

Clearly, it would be possible to just copy these files to every node, but this quickly becomes unwieldy. The files could also be copied to HDFS, but this makes versioning difficult since a using job must know what file name to use. To solve this problem, Hadoop provides a facility called distributed cache, which allows these files to be automatically copied to wherever the task will run.

The distributed cache component allows one to specify various files that should be copied to nodes that will run our tasks. This includes JAR files, regular files, and compressed files (such as ZIP files that contain multiple files organized into directories). In order to use the DistributedCache, the following lines of code can be used:

URI parms = new URI("parms.txt#parms");
DistributedCache.addCacheFile(parms, job);
DistributedCache.addCacheArchive(new URI("map.zip", job);
DistributedCache.addFileToClassPath(new Path("lib.jar"), job);

Note, the movement of parms.txt into a distributed cache and the strange #parms syntax at the end. This causes Hadoop to copy this file to each node where processes will run, and then to symbolically link it to a simple file name of parms, which can then be opened and read typically in a setup() method. This is powerful because it frees our tasks from needing to know the real file name. We could easily send a newparms.txt file call by only changing our job code, and then the underlying mappers and reducers would never need to know this.

When dealing with multiple JAR files, there is another technique, which is often used, called a FAT JAR. This is a single JAR file where all the required JAR files are unpacked and repacked. This can even include the required Cascading JAR files. While unwieldy, this is still a very useful technique, and it can save you debugging time when you get the dreaded java.lang.ClassNotFound exception.

Counters

Hadoop also provides instrumentation: the ability to define counters that can be used for debugging, performance assessment, and general visibility into the workload being processed. Here, we must note that jobs that run in a cluster are largely out of your control. They cannot use typical development techniques, such as debugging breakpoints. Additionally, since there are a lot of them running simultaneously, even using the ubiquitous System.out.println function is problematic, since so many output consoles are being captured. Counters are easy to use, as shown in the following example:

public enum COUNTERS {
  RECORDS_WITH_ERRORS
}

Then in a mapper, reducer, and so on:

context.getCounter(RECORDS_WITH_ERRORS).increment(1L);

And later in our job:

System.out.printf("Errors: %d\n",
  counters.findCounter(COUNTERS.RECORDS_WITH_ERRORS).getValue());

YARN – MapReduce version 2

As can be seen in the architecture, the JobTracker is a single point of failure. Additionally, it performs a tremendous amount of work. It handles job submission, job scheduling (including finding and allocating slots where the parallel parts of each job run), and also tracking the progress of the job itself. It became a major bottleneck over time. As a result, version 2 of Hadoop now splits job scheduling and job execution into separate components.

In YARN, jobs are submitted to the Resource Manager. This server tracks all active nodes and their resources (such as CPU, memory, disk, and so on). When a job is to be dispatched, the job itself gives an indication of the resources it would like to use. This is a negotiation though, and the job is given what can be made available to it, in the form of a set of nodes where its tasks can run and some other guidance for what it can use. Then, the Resource Manager starts an Application Master. This process runs on a slave node and manages the entire job. It dispatches the mappers and the reducers. It also receives progress notifications. Each mapper and reducer runs inside a JVM container.

So, we can see that the old JobTracker is now split into Application Master and Node Manager. The Application Master offloads the management of the running job. This reduces the points of failure, since the failure of an Application Master will only kill its tasks.

One other significant aspect of YARN is that it does much more than just run MapReduce. In fact, YARN was designed to run arbitrary frameworks. For instance, both Spark and Tez can run on YARN.

What we are seeing here is the emergence of an application stack. YARN now forms the basis for resource management and scheduling/dispatching functions. Frameworks, such as MapReduce, Tez, Spark, and others, provide an application execution framework. After this, application development frameworks, such as Cascading, run within the application execution environment. It is this separation of concerns that is driving innovation, reducing the complexity of development, and providing upward compatibility by freeing the tight coupling that the original MapReduce imposed.

Figure 1.4 – YARN architecture

Make no mistake, YARN is more complicated. However, frameworks are also emerging to aid developers. Additionally, existing MapReduce jobs can run without change on YARN. So, the addition of YARN, has provided a major enhancement to Hadoop. It allows scalability far beyond what version 1 could achieve. It has all but eliminated the single points of failure, and it now provides consideration for the resources that are being requested by the job. Also, YARN is being adopted rapidly. First, adopters chose it to build infrastructure that required persistent servers and better scalability. For instance, Spark and Tez (see the Beyond MapReduce section that follows) can now run on top of YARN. Given that YARN can seamlessly support legacy MapReduce applications, its adoption is now occurring at the application level.

A simple MapReduce job

Let's take a look at a simple MapReduce job. Let's look at a simple task. We are going to compute a weighted average of prices for some products. We will have data that represents price changes over time for some set of products. This is how the data looks:

product-id,date,price
1,1/1/2014,10.00
1,6/1/2014,11.00
1,11/26/2014,9.99
1,12/31/2014,11.99

Product number 1 has gone through several markups and markdowns. We seek to determine the average price over the low date and high date within the data for each product. In order to compute our average, we will need records that arrive at our reducer and are sorted both by the product ID date. For this, we will use the following classes:

  • MRJob: This is a program that submits a job to the Hadoop cluster for execution

  • MRMapper: This is a mapper that handles each data split and formats the data, in such a way that reducers can perform counting

  • MRReducer: This is a reducer that performs a summary

  • MRPartitioner: This is a class that ensures that all identical keys get routed to the same reducer

  • MRComparator: This is a class that compares the correct portions of the composite keys so that they can be sorted correctly

Let's start by looking at a job that will submit work to Hadoop. This job is provided with command-line arguments that specify input and output directories, as well as any other parameters that may be required:

// Package and import statements omitted
public class MRJob extends Configured implements Tool {
  /**
  * @param args  input_directory_name  (required, HDFS)
  *              output_directory_name (required, HDFS)
  */
  public static void main(String[] args) throws Exception {
    int rc = ToolRunner.run(new MRJob(), args);
    System.exit(rc);
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = Job.getInstance(conf);
    job.setJarByClass(MRJob.class);
    job.setJobName("MRJob");
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(MRMapper.class);
    job.setReducerClass(MRReducer.class);
    job.setPartitionerClass(MRPartitioner.class);
    job.setMapOutputKeyClass(org.apache.hadoop.io.Text.class);
    job.setMapOutputValueClass(org.apache.hadoop.io.Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setNumReduceTasks(8);
    int rc = (job.waitForCompletion(true)) ? 1 : 0;
    return rc;
  }
}

We can make a few notes here now. The job itself looks much like the standard Java program by implementing a static main() method. It is passed String args[], containing the parsed command line. Only two parameters are specified: the input directory and the output directory. Both are assumed to be HDFS by default. However, this class implements the Tool interface, and uses a static ToolRunner call to start execution. Use this form to save yourself a lot of trouble. ToolRunner implements a lot of good functionalities, and specifically, it is responsible for handling generic command-line arguments. These are specialized parameters that can specify additional JAR files that must be placed into the Java CLASSPATH. Other files and compressed archives should also be copied there, and also various parameters that are placed directly into the Hadoop Configuration container. The actual job submission then occurs in the run() method of this class. We can see that this is where each class is defined and will be used as mappers, reducers, partitioners, and so on.

First, let's look at a mapper. Note the usage of LongWritable as the key. This effectively passes the record offset as the key of the data record being processed.

// Package and import statements omitted
public class MRMapper extends Mapper<LongWritable,Text,Text,Text> {
  private static String INSEP = ",";
  private static String OUTSEP = "#";
  private Map<String, String> recordMap;
  @Override
  void map(LongWritable key, Text value, Context context)
  throws IOException, InterruptedException {
    // Field 0 is product ID, field 1 is date, field 3 is price
    String[] fields[] = value.toString().split(INSEP);
    // Our KEY is product-ID#date
    // Our VALUE is date,price
    context.write(new Text(fields[0] + OUTSEP + fields[1]),
                  new Text(fields[1]+ INSEP + fields[2]));
  }
}

Note

This input definition and supplying the record offset as the key is the default behavior of the FileInputFormat setting in the job. Different input formats can be used to change this behavior.

Now, let's look at a reducer. It will receive a single product key with an iterator of all the date,price values that are sorted by increasing the date:

// Package and import statements omitted
public class MRReducer extends Reducer<Text,Text,Text,Text> {
  // Allocate our objects once and reuse them!
  Text outKey = new Text();
  Text outText = new Text();
  int sum;
  @Override
  void reduce(Text key, Iterable<Text> values,
              Context context)
    throws IOException, InterruptedException {
      // Split our composite key to get just the product ID.
      String[] prodDef = key.toString().split(SEP);
      double average_price;
      // Compute the weighted average of prices based on number of
      // days difference between sorted records
      for (Text v : values) {
      // Computation of weighted average
    }
    outKey.set(prodDef[0]);
    outText.set(new Text(Double.toString(average_price)));
    context.write(outKey, outText);
  }
}

Next, let's look at the partitioner. It will route records based just on the product number so that all identical products arrive at the same reducer together to be group:

// Package and import statements omitted
public class MRPartitioner extends Partitioner<Text,Text> 
implements Configurable 
{
  @Override
  public int getPartition(Text key, Text value, int numReduceTasks) {
    private static String SEP = "#";
    String[] partStr = key.toString().split(SEP);
    int prodNum = Integer.parseInt(partStr[0]);
    return prodNum % numReduceTasks;
  }
}

Lastly, we have the comparator. This compares two composite keys. If the product numbers are different, it performs just like a normal compare. If they are the same, it compares their date fields so that the results will be sorted by the date:

// Package and import statements omitted
public class MRComparator implements RawComparator<Text> {
  private static String SEP = "#";
  @Override
  public int compare(byte[] b1, int s1, int l1,
                     byte[] b2, int s2, int l2)
  {
    return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
  }

  /**
  * Compares the two objects
  */
  @Override
  public int compare(Text o1, Text o2) {
    String s1[], s2[];
    s1 = o1.toString().split(SEP);
    s2 = o2.toString().split(SEP);
    int prod1 = Integer.parseInt(s1[0]);
    int prod2 = Integer.parseInt(s2[0]);
    if(prod1 == prod2) {
      return MRUtil.compareDates(s1[1], s2[1]);  // See code
    }
    if (prod1 < prod2) 
      return -1;
    else
      return 1;    
  }
}

Whew! This is a lot of code to do something that seems pretty easy. There must be a better way! Cascading provides us with a mechanism that will greatly simplify these tasks.

Beyond MapReduce

With Hadoop, there are really only two primary job types that are supported by the framework: mapper-only jobs and mapper/reducer jobs. This has become limiting as we attempt to maximize performance within a cluster. During many of the Hadoop phases of processing, files are written to disk and then immediately reread. This is inefficient. Also, all job structures usually exist as Map -> Reduce -> Map -> Reduce …. Every job must start with a mapper, even if it's IdentityMapper that simply reads a record and writes the same record. In this case, the only purpose that the mapper actually serves is to perform the file split.

Tip

Other job sequences do exist that allow Map -> Map -> … -> Map -> Reduce, but discussion of these is beyond the scope of this book. See the ChainMapper class for more information on this.

As a result, some new systems are seeking to become replacements for Hadoop, or at the very least, to become part of the "plumbing" that Hadoop provides. There are many of these, but two are noteworthy; they're gaining adoption and are proving to provide major performance improvements. We provide a very brief description of these here:

  • Apache Tez is a new architecture that follows the basic paradigm of MapReduce, but allows for more generic processing than simple mapper and reducer functionalities, and it also provides finer grained control over how data is interchanged and persisted.

  • Apache Spark is also a new paradigm that is a total replacement for MapReduce. It has its own controlling framework of servers that are persistent, that is, they can remain operational and wait for new data when it becomes available. It provides very generic processors, and includes in-memory computing through its resilient distributed datasets (RDD). Interestingly enough, Spark now can run using YARN as its resource manager. Spark is written in Scala, a very powerful programming language that runs in a JVM.