Book Image

Apache Spark 2.x for Java Developers

By : Sourav Gulati, Sumit Kumar
Book Image

Apache Spark 2.x for Java Developers

By: Sourav Gulati, Sumit Kumar

Overview of this book

Apache Spark is the buzzword in the big data industry right now, especially with the increasing need for real-time streaming and data processing. While Spark is built on Scala, the Spark Java API exposes all the Spark features available in the Scala version for Java developers. This book will show you how you can implement various functionalities of the Apache Spark framework in Java, without stepping out of your comfort zone. The book starts with an introduction to the Apache Spark 2.x ecosystem, followed by explaining how to install and configure Spark, and refreshes the Java concepts that will be useful to you when consuming Apache Spark's APIs. You will explore RDD and its associated common Action and Transformation Java APIs, set up a production-like clustered environment, and work with Spark SQL. Moving on, you will perform near-real-time processing with Spark streaming, Machine Learning analytics with Spark MLlib, and graph processing with GraphX, all using various Java packages. By the end of the book, you will have a solid foundation in implementing components in the Spark framework in Java to build fast, real-time applications.
Table of Contents (19 chapters)
Title Page
Credits
Foreword
About the Authors
About the Reviewer
www.PacktPub.com
Customer Feedback
Preface

What makes Hadoop so revolutionary?


In a classical sense, if we are to talk of Hadoop then it comprises of two components: a storage layer called HDFS and a processing layer called MapReduce. Resource management task prior to Hadoop 2.X was done using the MapReduce framework of Hadoop itself. However, that changed with the introduction of YARN. In Hadoop 2.0, YARN was introduced as the third component of Hadoop to manage the resources of the Hadoop cluster and make it more MapReduce agnostic.

Defining HDFS

The Hadoop Distributed File System (HDFS), as the name suggests, is a distributed filesystem based on the lines of the Google File System written in Java. In practice, HDFS resembles closely any other UNIX filesystem with support for common file operations such as ls, cp, rm, du, cat, and so on. However what makes HDFS stand out, despite its simplicity, is its mechanism to handle node failure in the Hadoop cluster without effectively changing the search time for accessing stored files. The HDFS cluster consists of two major components: DataNodes and NameNode.

HDFS has a unique way of storing data on HDFS clusters (cheap commodity networked commodity computers). It splits the regular file in smaller chunks called blocks and then makes an exact number of copies of such chunks depending on the replication factor for that file. After that, it copies such chunks to different DataNodes of the cluster.

NameNode

The NameNode is responsible for managing the metadata of the HDFS cluster, such as lists of files and folders that exist in a cluster, the number of splits each file is divided into, and their replication and storage at different DataNodes. It also maintains and manages the namespace and file permission of all the files available in the HDFS cluster. Apart from bookkeeping, NameNode also has a supervisory role that keeps a watch on the replication factor of all the files and if some block goes missing, then it issue commands to replicate the missing block of data. It also generates reports to ascertain cluster health. It is important to note that all the communication for a supervisory task happens from DataNode to NameNode; that is, DataNode sends reports (block reports) to NameNode and it is then that NameNode responds to them by issuing different commands or instructions as the need may be.

HDFS I/O

An HDFS read operation from a client involves the following:

  1. The client requests NameNode to determine where the actual data blocks are stored for a given file.
  2. NameNode obliges by providing the block IDs and locations of the hosts (DataNode) where the data can be found.
  3. The client contacts DataNode with the respective block IDs to fetch the data from DataNode while preserving the order of the block files.

An HDFS write operation from a client involves the following:

  1. The client contacts NameNode to update the namespace with the filename and verify the necessary permissions.
  2. If the file exists, then NameNode throws an error; otherwise, it returns the client FSDataOutputStream which points to the data queue.
  3. The data queue negotiates with the NameNode to allocate new blocks on suitable DataNodes.
  4. The data is then copied to that DataNode, and, as per the replication strategy, the data is further copied from that DataNode to the rest of the DataNodes.
  5. It's important to note that the data is never moved through the NameNode as it would caused a performance bottleneck.

YARN

The simplest way to understand YARN (YetAnotherResourceManager) is to think of it as an operating system on a cluster; provisioning resources, scheduling jobs and node maintenance. With Hadoop 2.x, the MapReduce model of processing the data and managing the cluster (Job Tracker/Task Tracker) was divided. While data processing was still left to MapReduce, the cluster's resource allocation (or rather, scheduling) task was assigned to a new component called YARN. Another objective that YARN met was that it made MapReduce one of the techniques to process the data rather than being the only technology to process data on HDFS, as was the case in Hadoop 1.x systems. This paradigm shift opened the floodgates for the development of interesting applications around Hadoop and a new ecosystem other than the classical MapReduce processing system evolved. It didn't take much time after that for Apache Spark to break the hegemony of classical MapReduce and become arguably the most popular processing framework for parallel computing as far as active development and adoption is concerned.

In order to serve multi-tenancy, fault tolerance, and resource isolation in YARN, it developed the following components to manage the cluster seamlessly:

  • The ResourceManager: This negotiates resources for different compute programs on a Hadoop cluster while guaranteeing the following: resource isolation, data locality, fault tolerance, task prioritization, and effective cluster capacity utilization. A configurable scheduler allows Resource Manager the flexibility to schedule and prioritize different applications as per the requirements.
  • Tasks served by the RM while serving clients: A client or APIs user can submit or terminate an application. The user can also gather statistics on submitted applications cluster, and queue information. RM also priorities ADMIN tasks over any other task to perform a clean up or maintenance activities on a cluster, such as refreshing the node-list, the queues' configuration, and so on.
  • Tasks served by RM while serving cluster nodes: Provisioning and de-provisioning of new nodes forms an important task of RM. Each node sends a heartbeat at a configured interval, the default being 10 minutes. Any failure of a node in doing so is treated as a dead node. As a clean-up activity, all the supposedly running process, including containers, are marked as dead too.
  • Tasks served by the RM while serving the Application Master: The RM registers a new the AM while terminating the successfully executed ones. Just like cluster nodes, if the heartbeat of an AM is not received within a preconfigured duration, the default value being 10 minutes, then the AM is marked dead and all the associated containers are also marked dead. But since YARN is reliable as far as the application execution is concerned, a new AM is rescheduled to try another execution on a new container until it reaches the retry configurable default count of four.
  • Scheduling and other miscellaneous tasks served by the RM: RM maintains a list of running, submitted and executed applications along with its statistics such as execution time, status, and so on. The privileges of the user as well as of applications are maintained and compared while serving various requests of the user per application life cycle. The RM scheduler oversees the resource allocation for the application, such as memory allocation. Two common scheduling algorithms used in YARN are fair scheduling and capacity scheduling algorithms.
  • NodeManager: An NM exist per node of the cluster on a slightly similar fashion as to what slave nodes are in the master slave architecture. When an NM starts, it sends the information to RM for its availability to share its resources for upcoming jobs. Then NM sends a periodic signal, also called a heartbeat, to RM informing it of its status as being alive in the cluster. Primarily, an NM is responsible for launching containers that have been requested by an AM with certain resource requirements such as memory, disk, and so on. Once the containers are up and running, the NM keeps a watch not on the status of the container's task but on the resource utilization of the container and kills it if the container starts utilizing more resources than it has been provisioned for. Apart from managing the life cycle of the container, the NM also keeps RM informed about the node's health.
  • ApplicationMaster: An AM gets launched per submitted application and manages the life cycle of the submitted application. However, the first and foremost task an AM does is to negotiate resources from RM to launch task-specific containers at different nodes. Once containers are launched, the AM keeps track of all the container's task statuses. If any node goes down or the container gets killed because of using excess resources or otherwise, in such cases the AM renegotiates resources from RM and launches those pending tasks again. The AM also keeps reporting the status of the submitted application directly to the user and other such statistics to RM. ApplicationMaster implementation is framework specific and it is because of this reason that application/framework specific code is transferred to the AM and the AM that distributes it further. This important feature also makes YARN technology agnostic, as any framework can implement its ApplicationMaster and then utilize the resources of the YARN cluster seamlessly.
  • Containers: A container in an abstract sense is a set of minimal resources such as CPU, RAM, Disk I/O, disk space, and so on, that are required to run a task independently on a node. The first container after submitting the job is launched by RM to host ApplicationMaster. It is the AM which then negotiates resources from RM in the form of containers, which then gets hosted in different nodes across the Hadoop cluster.

Processing the flow of application submission in YARN

The following steps follow the flow of application submission in YARN:

  1. Using a client or APIs, the user submits the application; let's say a Spark job jar. ResourceManager, whose primary task is to gather and report all the applications running on the entire Hadoop cluster and available resources on respective Hadoop nodes, depending on the privileges of the user submitting the job, accepts the newly submitted task.
  2. After this RM delegates the task to a scheduler, the scheduler then searches for a container which can host the application-specific Application Master. While the scheduler does take into consideration parameters such as availability of resources, task priority, data locality, and so on, before scheduling or launching an Application Master, it has no role in monitoring or restarting a failed job. It is the responsibility of RM to keep track of an AM and restart it in a new container if it fails.
  3. Once the ApplicationMaster gets launched it becomes the prerogative of the AM to oversee the resources negotiation with RM for launching task-specific containers. Negotiations with RM are typically over:
    • The priority of the tasks at hand.
    • The number of containers to be launched to complete the tasks.
    • The resources needed to execute the tasks, such as RAM and CPU (since Hadoop 3.x).
    • The available nodes where job containers can be launched with the required resources.

Depending on the priority and availability of resources the RM grants containers represented by the container ID and hostname of the node on which it can be launched.

  1. The AM then requests the NM of the respective hosts to launch the containers with specific IDs and resource configuration. The NM then launches the containers but keeps a watch on the resources usage of the task. If, for example, the container starts utilizing more resources than it has been provisioned then that container is killed by the NM. This greatly improves the job isolation and fair sharing of resources guarantee that YARN provides as, otherwise, it would have impacted the execution of other containers. However, it is important to note that the job status and application status as a whole are managed by the AM. It falls in the domain of the AM to continuously monitor any delay or dead containers, simultaneously negotiating with RM to launch new containers to reassign the task of dead containers.
  2. The containers executing on different nodes send application-specific statistics to the AM at specific intervals.
  3. The AM also reports the status of the application directly to the client that submitted the specific application, in our case a Spark job.
  4. The NM monitors the resources being utilized by all the containers on the respective nodes and keeps sending a periodic update to RM.
  5. The AM sends periodic statistics such application status, task failure, and log information to RM.

Overview of MapReduce

Before delving deep into MapReduce implementation in Hadoop, let's first understand MapReduce as a concept in parallel computing and why it is a preferred way of computing. MapReduce comprises two mutually exclusive but dependent phases, each capable of running on two different machines or nodes:

  • Map: In the Map phase, the transformation of the data takes place. It splits data into key value pairs by splitting it on a keyword.
    • Suppose we have a text file and we would want to do an analysis such as counting the total number of words or even the frequency with which the word has occurred in the text file. This is the classical word count problem of MapReduce. To address this problem, first we will have to identify the splitting keyword so that the data can be spilt and be converted into a key value pair.

Let's begin with John Lennon's song, Imagine.

Sample text:

Imagine there's no heaven 
It's easy if you try 
No hell below us 
Above us only sky 
Imagine all the people living for today 

After running the Map phase on the sampled text and splitting it over <space>, it will get converted to a key value pair as shown here:

<imagine, 1> <there's, 1> <no, 1> <heaven, 1> <it's, 1> <easy, 1> <if, 1> <you, 1> <try, 1> <no, 1> <hell, 1> <below, 1> <us, 1> <above, 1> <us, 1> <only, 1> <sky, 1> <imagine, 1> <all, 1> <the, 1> <people, 1> <living, 1> <for, 1> <today, 1>] 

The key here represents the word and the value represents the count. Also it should be noted that we have converted all the keys to lowercase to reduce any further complexity arising out of matching case sensitive keys.

  • Reduce: The Reduce phase deals with aggregation of the Map phase results and hence all the key value pairs are aggregated over the key.
    • So the Map output of the text would get aggregated as follows:
[<imagine, 2> <there's, 1> <no, 2> <heaven, 1> <it's, 1> <easy, 1> <if, 1> <you, 1> <try, 1> <hell, 1> <below, 1> <us, 2> <above, 1> <only, 1> <sky, 1>  <all, 1> <the, 1> <people, 1> <living, 1> <for, 1> <today, 1>] 

As we can see, both the Map and Reduce phases can be run exclusively and hence can use independent nodes in the cluster to process the data. This approach of separation of tasks into smaller units called Map and Reduce has revolutionized general purpose distributed/parallel computing, which we now know as MapReduce.

Apache Hadoop's MapReduce has been implemented pretty much the same way as discussed, except for adding extra features into how the data from the Map phase of each node gets transferred to their designated Reduce phase node.

Hadoop's implementation of MapReduce enriches the Map and Reduce phases by adding a few more concrete steps in between to make it fault tolerant and truly distributed. We can describe MR jobs on YARN in five stages:

  1. Job Submission Stage: When a client submits an MR job, the following things happen:
    • The RM is requested for an application ID
    • The input data location is checked and if present then the file split size is computed
    • The job's output location needs to exist as well

If all the three conditions are met, then the MR job jar along with its configuration details of input split are copied to HDFS in a directory named the application ID provided by RM. Then the job is submitted to RM to launch a job-specific Application Master, MRAppMaster.

  1. MAP Stage: Once RM receives the client's request for launching MRAppMaster, a call is made to the YARN scheduler for assigning a container. As per the resource availability, the container is granted and hence the MRAppMaster is launched at the designated node with provisioned resources. After this, MRAppMaster fetches input split information from the HDFS path that was submitted by the client and computes the number of mapper tasks that will be launched based on the splits. Depending on the number of mappers, it also calculates the required number of reducers as per the configuration, If MRAppMaster now finds the number of mapper, reducer and size of input files to be small enough to be run in the same JVM, then it goes ahead in doing so. Such tasks are called Uber tasks. However, in other scenarios, MRAppMaster negotiates container resources from RM for running these tasks, albeit mapper tasks have a higher order and priority. This is why Mapper tasks must finish before the sorting phase can start.

Data locality is another concern for containers hosting mappers, as local data nodes are preferred over rack locals, with the least preference being given to remote node hosted data. But when it comes to the Reduce phase no such preference of data locality exists for containers. Containers hosting function mappers first copy mapReduce JAR and configuration files locally and then launch a class called YarnChild in the JVM. The mapper then starts reading the input file, processes them by making key value pairs, and writes them in a circular buffer.

  1. Shuffle andSort Phase: Considering that circular buffers have a size constraint, after a certain percentage, the default being 80, a thread gets spawned which spills the data from the buffer. But, before copying the spilled data to disk, it is first partitioned with respect to its reducer and then the background thread also sorts the partitioned data on a key and if the combiner is mentioned it then combines the data too. This process optimizes the data once it is copied to its respective partitioned folder. This process is continued until all the data from circular buffer gets written to disk. A background thread again checks if the number of spilled files in each partition is within the range of the configurable parameter or else the files are merged and the combiner is run over them until it falls within the limit of the parameter.

A Map task keeps updating the status to ApplicationMaster for its entire life cycle. It is only when 5 percent of a Map task has been completed that the Reduce task starts. An auxiliary service in the NodeManager serving the Reduce task starts a Netty web server that makes a request to MRAppMaster for Mapper hosts having specific Mapper partitioned files. All the partitioned files that pertain to the Reducer are copied to their respective nodes in a similar fashion. Since multiple files get copied as data from various nodes representing that Reduce nods gets collected, a background thread merges the sorted map file and again sorts them and if the combiner is configured, then combines the result too.

  1. Reduce Stage: It is important to note here that at this stage every input file of each reducer should have been sorted by key. This is the presumption with which the reducer starts processing these records and converts the key value pair into an aggregated list. Once the reducer has processed the data, it writes them to the output folder as was mentioned during the job submission.
  2. Clean-up Stage: Each reducer sends a periodic update to MRAppMaster about the task completion. Once the Reduce task is over, the ApplicationMaster starts the clean-up activity. The submitted job status is changed from running to successful, and all the temporary and intermediate files and folders are deleted .The application statistics are archived to a job history server.