Book Image

Learning Cascading

Book Image

Learning Cascading

Overview of this book

Table of Contents (18 chapters)
Learning Cascading
Credits
Foreword
About the Authors
About the Reviewers
www.PacktPub.com
Preface
7
Optimizing the Performance of a Cascading Application
Index

Reviewing Hadoop


Hadoop is very complex and has many low-level details and nuances that require a significant amount of documentation and explanation to cover. This chapter is a high-level overview of the Hadoop system and is not intended to be sufficient for anything other than laying the groundwork that you will need to begin Cascading programming. For a deeper dive into Hadoop, we recommend Hadoop Explained by Aravind Shenoy.

Hadoop is designed to provide massively parallel (often referred to as "embarrassingly parallel"), fault-tolerant, and high-performance computing. Hadoop is intended to solve one specific problem—how to reduce the duration of time that is required to read a very large amount of data from disk sequentially. We see that with most physical (mechanical spinning) disks, the amount of time to read a block of data is in the order of 2 ms-10 ms, depending on the characteristics of the disk. If we do a simple calculation using a disk that can read 25 MBps, we see that reading 1 TB of data sequentially will require 40 seconds. However, if this data were spread across multiple disks, say 20 of them, we could read this data parallelly in 2 seconds! This is the key element of Hadoop—using parallel I/O to improve performance.

We must also note that disk performance improvements have lagged far behind compute improvements. While disk performance (measured in megabytes per second) has only moderately increased, compute performance (such as instructions per second or other benchmark metrics) has improved by orders of magnitude, growing at an average rate of 50 percent annually since 1986, and a modern day processor now runs 1000 times faster than one of a 1986 vintage. This disparity between I/O and compute provides the basis for the invention of Hadoop. Hadoop provides compute parallelism through its MapReduce (MR) subsystem. It provides I/O parallelism through its Hadoop Distributed File System (HDFS) subsystem.

Hadoop itself is a very large system comprised of over approximately 2.4 million lines of code as of the current version. There is much to learn, and much of this knowledge is used daily. As a result, the learning curve is a steep one. One must become quite proficient at library structures, performing I/O in a cluster, accessing shared objects, Hadoop serialization, and a large number of specific classes that one must write to interact with execution and I/O subsystems. For instance, as we shall see in detail later in this chapter, just to do a multifield sort in MapReduce requires that five separate Java classes be written.

Some required knowledge is arcane, such as how to effectively process time ranges where files have been partitioned by date range, or how to only read portions of a file. After initially working with MapReduce, one can find that an enormous amount of code has been written, and that without careful planning, reusing the code is difficult to obtain. One typically ends up with many, many packages, hundreds of classes, and a significant amount of code overlap (this includes classes with minor variations that are rewritten using copied code).

Hadoop is almost universally deployed on Linux operating systems. However, it can be made to run on OS/X, many flavors of Unix, BSD, and now Microsoft Windows as well. It is also very amenable to Cloud deployment and can support a variety of Cloud-based file systems. Hadoop requires Java 1.6 or above. Versions that exist now support Java 1.7 and 1.8 as well.

Hadoop is inherently a batch processing system. Work is packaged into a job. The job is submitted and is run at the discretion of the Hadoop scheduler. The amount of time between starting the job and having results returned is not under the control of the issuer, and, in general, is difficult to use for any sort of processing where there is a desire to bound the time of execution. For instance, it is very difficult to connect a web server user request to a Hadoop job and then to wait for it to return results to achieve some service-level objective.

Hadoop itself is made more difficult to understand largely because of its history of growth and change. It now has three full versions, each with significantly different semantics. Initially, the first release of Hadoop used the class package prefix of mapred. It was superseded by a newer release that used the class package prefix of mapreduce. Later, we will see that the newest version, YARN, is now significantly different.

Hadoop is controlled by several XML configuration files. On a Linux system, these files generally reside within /etc/hadoop/conf, but the location of these files is actually dependent on the installation (and vendor, if you choose this route). Three primary files are used to control Hadoop:

  • core-site.xml: It contains basic parameters, such as server port numbers, location of servers, and so on

  • hdfs-site.xml: It contains information that controls the Hadoop Distributed File System, such as locations on local disk where data is to be stored

  • mapred-site.xml: It contains information about the MapReduce execution framework, such as the number of threads to use

A typical configuration file looks similar to this:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/tmp/hadoop-${user.name}</value>
  </property>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://server-name:54310</value>
  </property>
  <property>
    <name>mapred.job.tracker</name>
    <value>hdfs:// server-name:54311</value>
  </property>
  <property> 
    <name>dfs.replication</name>
    <value>3</value>
  </property>
  <property>
    <name>mapred.child.java.opts</name>
    <value>-Xmx512m</value>
  </property>
</configuration>

Additionally, several shell scripts are needed. A key script, such as hadoop-env.sh, is needed in instances where environment variables are set. Some important environment variables are:

  • JAVA_HOME: It sets the location of the default Java implementation

  • HADOOP_HOME: It sets the location of the Hadoop implementation

  • HADOOP_CONF_DIR: It specifies where the Hadoop configuration files are located

Often, the hadoop-env.sh script is placed into /etc/profile.d, so that it will be executed automatically at login. However, sometimes, they are placed in the user's local shell startup script, so this may be different for you.

Hadoop architecture

At its most basic level, Hadoop is a system that processes records represented as key-value pairs. Every record has an associated key field. This key need not be unique and is accompanied by a value field. This value field may be composite, so that it can be decomposed into multiple subfields. While this may sound limiting at first, especially when one is used to using a relational database where multiple indexes are available, we will see that this mechanism is sufficient to do most of the processing that one may require. We will also see that this simplified approach is fundamental to the performance gains that are required to process big data.

In a nutshell, Hadoop MapReduce processing can be visualized in the following diagram:

Figure 1.1 – Hadoop processing flow

While the flow does look simplistic, in this diagram, we can see some underlying similarities to how a relational database processes standard SQL queries. The Read phase looks a lot like SELECT *, retrieving all the records. The Map process is then capable of applying the WHERE criteria. The Sort and Group step applies both to GROUP BY and an ORDER BY. The Reduce process can then apply processing logic, such as COUNT, MIN, MAX, and so on. Lastly, the Write phase persists this data back to the disk.

One of the most interesting concepts behind Hadoop is that its data is schema-on-read. This means that while the data does have some sort of format, this format is not imposed until the data is read. This is in sharp contrast to the philosophy of a relational database management system (RDBMS), where the schema is defined long before any data can be placed in it. Schema-on-read facilitates faster development times, since no lead times are required to define the schema, prepare the data definition language (DDL—the CREATE TABLE statement), execute it, load the data, and correct records with errors. In Hadoop, all data, regardless of type or format, can be copied into its storage and processed.

Tip

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

What this means is that the interpretation of the input record itself, its key and value, is entirely dependent on the programmer. In one case, the key may be something that we would think of as an actual key, such as a name, SSN, order number, and so on. In other cases, the key may be nothing more than the record number, or maybe even the offset within the input file. This is also true of the value field. It could be a single value, such as a line of text, or it could be somehow parsable into smaller parts through some sort of record offset, or maybe just by performing a split() against some sort of delimiter character. Additionally, Hadoop provides many file formats that make handling data easier by supplying metadata to parse a record. Some examples of these formats are SequenceFile, Avro, and others. With these types of file structures, the actual format of the record is encoded into the dataset itself, and when the record is read, it is returned in a decomposed, record-like format, where fields can be retrieved by offsets or even by name.

The Hadoop architecture is relatively complex. It consists of two major subsystems—one that manages data and files, and the other that manages execution. The Hadoop system itself does much of the work for you, but the developer is still required to write a lot of code to perform the work.

Figure 1.2 – Hadoop logical architecture

The basic tasks that Hadoop performs are as follows:

  • Hadoop manages files in a familiar directory structure. However, these files are replicated in a cluster for fault tolerance and performance.

    By default, each file of data is replicated three times. This is a tunable parameter that can be set at the file, directory, or global level.

  • Hadoop treats a directory of files as a single file. The files are concatenated together and then split into large blocks for consumption by executing tasks.

    • The block size is a file-level parameter. The default size is 64 MB, but sometimes this size is even further increased. The rationale here is to provide each task that runs in the cluster a sufficiently large amount of data so that the ratio of task startup time (consisting mainly of transferring executable code to the target system and then starting the Java Virtual Machine) to program execution time is small.

    • Parallel tasks are created in the cluster, and each is given data to process. These processes run simultaneously, and the purpose here is to improve execution speed by processing these blocks in parallel. Hadoop tasks are divided into mapper tasks and reducer tasks.

      • Mapper tasks process records from the input that is supplied to it. Each mapper task is assigned one block of data to process, and it is handed one record of data from its assigned block. The mapper implements three basic method calls. Only map() actually needs to be defined.

        void setup(Context c)
        throws IOException, InterruptedException
        void map(<KEYIN> key,<VALUEIN> value, Context context)
        throws IOException, InterruptedException
        void cleanup(Context c)
        throws IOException, InterruptedException

        setup() is used to perform one-time processing that occurs before any data is sent to the mapper. Then, map() is called repeatedly for each record. The context object passed to the map() call, and contains methods to output data that is the result of its processing. Finally, after all records are processed, cleanup() is called once to allow the user any final processing that may be required.

        Note

        Note that in Hadoop, only mappers are required. The number of mappers used will be (total-bytes-processed/block-size).

        Additionally, note that Hadoop methods should be able to throw IOException if an I/O error occurs, and also an InterruptedException if they are somehow stopped by any one of many types of cluster-wide interruptions that could occur.

      • Reducer tasks receive data that has been outputted by the mappers. Each mapper task processes records from its assigned block and then outputs a result. The result is sent to one reduce task for final processing. Note that, as shown in Figure 1.1, each record may be sent to a different reducer. We will discuss this in a minute.

        Unlike what we saw with mappers, the number of reducers to use is specified by the programmer. This allows total control of parallelism during the final stage of aggregation. The assumption here is that the developer has some idea of the data itself and is, therefore, able to determine the degree of parallelism that is needed. It is important to note that the reducer can become a major bottleneck. For instance, if only one reducer is used, all the data generated by all the mappers will flow through it sequentially. Additionally, if one reducer receives significantly more data than others, a bottleneck will also occur.

        Similar to a mapper, a reducer has the following method calls:

        void setup(Context c)
        throws IOException, InterruptedException
        void reduce(<INKEY> key, Iterable<INVALUE> values, Context context)
        throws IOException, InterruptedException
        void cleanup(Context c)
        throws IOException, InterruptedException

        Reducer tasks receive data sent by the mappers that have been grouped by the Hadoop framework. This a record that consists of the grouping key followed by an Iterable<> object, containing every processed record that pertains to this key. Hadoop does this grouping for you through a Shuffle and Sort process that occurs after the mapper has sent records to the reducer(s).

        Now, how does a mapper know which reducer should receive the record? This decision is made by a partitioner. A partitioner analyzes each record and determines the reducer that should process it. A partitioner has one method call that must be implemented:

        void configure(Context c)
        int getPartition(<KEY> key, <VALUE> value,
          int numReduceTasks)

        Note that the partitioner passed a parameter telling it how many reducers it has to use. It must then return a value between 0 and numReduceTasks, and 1 to its caller (that is, the Hadoop framework). Hadoop will then handle the routing of this record to this reducer. The default partitioner simply looks at the key of the record and uses a modulo function, based on the number of reducers that have been defined to route it.

        return key.hashCode() % numReduceTasks;

        Hadoop allows for another component to be defined as well, and this is the combiner. When data is sent from the mappers to the reducers, a tremendous amount of data can be generated and sent across the wire. In some cases, bandwidth could be conserved if some preprocessing were to occur on the mapper side. This is precisely what a combiner does. It is a sort of "mini-reducer" that runs on the mapper side. It allows records to be combined (hence its name), and a consolidated record to be sent to the reducer. Care must be taken when using it, since mathematical operations that it performs should be both commutative and associative.

  • Hadoop manages all these tasks, allowing them to run, pass data to each other. Data flows forward from the mappers to the reducers. In order for the reducers to receive the input that they expect, Hadoop performs sorting and aggregation/consolidation to create the composite record.

  • Job execution is monitored and managed. Progress is reported back from the job so that it can be tracked for completion.

  • During job execution, various errors or transient conditions may occur that hinder the job's progress. In these cases, Hadoop will attempt to keep the job running and may take corrective actions:

    • Failing jobs may be restarted.

    • Slow running jobs may be restarted as well.

    • The same tasks may be run multiple times using what is called speculative execution. When this occurs, Hadoop starts multiple copies to see which one will finish first. This typically occurs when Hadoop determines that some sort of performance delay is occurring on one of the tasks. In this case, the task finishing first is used and the loser is unceremoniously terminated.

Most typically today, Hadoop configurations consist of a set of physical or virtual nodes, which are complete standalone systems running some version of the Linux operating system, an installed version of the Java JDK, and are all networked together with high speed Ethernet (such as InfiniBand) connectivity. Nodes are then divided into the following types:

  • Head nodes: These are the controlling systems that contain the servers required to submit jobs, manage data, monitor a system, provide error recovery, failovers, and software distribution.

  • Slave nodes: These are the nodes that do the actual work. They contain local disk storage (usually a lot of it), and run programs that perform the work required. They report the status of the work performed back to several of the head nodes.

  • Boundary nodes: These are the nodes where users submit units of work to a cluster. Typically, these nodes are not part of an actual cluster, but have networked access to it. These nodes are also sometimes referred to as gateway nodes.

    Figure 1.3 – Hadoop cluster physical architecture

Figure 1.3 shows a typical Hadoop cluster. The user sits at a boundary node system and creates/submits jobs to the cluster. The JobTracker server receives a request and places it into its Job Queue. Later, the JobTracker will schedule the job for execution and distribute it to one or more Task Tracker servers to start the Java Virtual Machine (JVM) and execute the program. Task Trackers are configured with slots, which represent the number of job components that they are allowed to start.

In the above diagram, the following points about the cluster should be noted.

  • The JobTracker and NameNode reside on a head node that manages the execution of the jobs.

  • Every other node shown is a slave node.

  • Typically, many more head nodes are used than the single one shown in the preceding figure.

  • Note that there are asymmetric nodes that have differing numbers of slots (and maybe even different hardware configurations, such as number of processor cores, and so on). While this is possible, it is discouraged, since it will lead to another level of diagnostics when analyzing performance delays.

There are some key points to be made from this diagram:

  • Hadoop replicates its data in the cluster. The default is that every block of data is written three times (the replicated block is shown in red). This aids in redundancy, performance, and scheduling. This is a definable parameter and is controlled globally by specifying dfs.replication in the Hadoop configuration files.

  • Blocks of data are very large with the default size being 64 MB. This size is controllable and is typically much larger, usually 128 MB and beyond. Often, this data is compressed, and the method of compression is also controllable. Remember that we have a massive amount of compute at our disposal, and we are trying to address the I/O latency to improve performance.

  • Hadoop is "rack aware." A rack is a hardware unit that contains multiple servers, typically, in a single container (a rack). HDFS, when replicating data will attempt to place blocks on different racks, if possible. The rationale is that should an entire rack fail, no data will be lost, and I/O across servers that reside in the same rack is generally faster than I/O across different racks, because of rack-level network optimization (that is, high speed backplanes).

    Note

    Note that the definition of a rack is a manual task. A rack number must be defined for a node. It is not automatically detected.

  • Hadoop jobs are Java or Java-based programs. They are packaged in Java JAR files (Java Archives). The JAR files are delivered to the data. The rationale here is simple: it is less costly to move the relatively smaller JAR file to where the data lives, rather than transfer it through the network to an executing program. It is often said that "moving computation is cheaper than moving data."

  • Being rack aware, the JobTracker can be smarter about the node it uses to dispatch a task. Its first choice is to pick a node with a free slot where the data resides locally. Its second choice is to use a node with a free slot where data exists on the same rack. Its third and final choice is to use any node that has a free slot.

HDFS – the Hadoop Distributed File System

Hadoop comes with its own form of file storage called the Hadoop distributed file system (HDFS). HDFS is designed to do several things:

  1. Provide a namespace that can control, read, write, update, and delete actions performed on files using a POSIX style of file system. A typical HDFS file locator (a URI) for a file named file.tsv, owned by a user named mcovert, is of the hdfs://users/mcovert/data/file.tsv form.

  2. Provide redundancy so that losing a small section of data will not break the cluster.

  3. Provide high speed and parallel access to data, thereby feeding the execution framework with data as quickly as possible.

  4. Provide utilities (the balancer) that can rectify any imbalances that may exist in the cluster. For instance, if a node fails, typically all of its blocks will be lost, but the balancer will assign new nodes where these missing blocks can be copied from the surviving two nodes.

HDFS is implemented in the form of several server processes that handle I/O requests for data stored in the cluster. These server processes are explained in the next section.

The NameNode

The NameNode is where information about the data in the cluster is stored. It represents a catalog of all the files and directories that are managed by the cluster. The NameNode is a very complex server process. It is memory-intensive, since it caches most file metadata. Also, note that a file that resides in HDFS is spread across data nodes in the cluster, and also that each block of the file is replicated, so that the data catalog tends to be very large for even a moderate sized system.

The NameNode itself is a Java server. Hadoop provides an application programming interface (API) to access data and to perform various file management tasks. When a program runs, it is assigned a block of data to process, and the NameNode is queried to find the location of the server (see DataNode in the DataNodes section that follows) where the data resides, and also to obtain various metadata about the file and the data block. When data is written back into HDFS, the NameNode is notified, so that it can record the block metadata into its catalog, and to subsequently handle replication as well. Later, when we discuss the DataNode, we will complete this understanding.

The secondary NameNode

The secondary NameNode is where a backup copy of NameNode data is stored. This server provides recoverability in case of catastrophic failures. While the mechanics of catastrophic recovery are beyond the intentions of this discussion, there are two modes of recovery that can be configured. A basic mode of recovery can occur when the NameNode fails (for instance, due to a hard drive error), and then the redundant copy of the NameNode metadata is used to restart the failed NameNode. In this case, the cluster itself fails, and all running jobs are lost and must be rerun. A more sophisticated capability exists, called High Availability (HA), where the secondary NameNode assumes control of HDFS storage. In this case, the cluster will continue to run, and active jobs generally can be completed without requiring a restart.

DataNodes

Every node in the cluster that is assigned the task of handling data runs a DataNode. The DataNode performs local I/O, which means that when a block of data belonging to a file in the cluster has been requested, the NameNode finds it, and then assigns the DataNode that owns the data block to deliver it to the requesting program.

The DataNode is largely unaware of the activities of the NameNode and is only responsible for storing data locally. It is not stored by the actual file name though. The DataNode stores blocks as files that are spread across directories. This prevents a directory from becoming overloaded and makes the time required to open files occur faster. The DataNode is only responsible for passing this information back to the NameNode so that it can be stored in the catalog.