Book Image

Mastering Spark for Data Science

By : Andrew Morgan, Antoine Amend, Matthew Hallett, David George
Book Image

Mastering Spark for Data Science

By: Andrew Morgan, Antoine Amend, Matthew Hallett, David George

Overview of this book

Data science seeks to transform the world using data, and this is typically achieved through disrupting and changing real processes in real industries. In order to operate at this level you need to build data science solutions of substance –solutions that solve real problems. Spark has emerged as the big data platform of choice for data scientists due to its speed, scalability, and easy-to-use APIs. This book deep dives into using Spark to deliver production-grade data science solutions. This process is demonstrated by exploring the construction of a sophisticated global news analysis service that uses Spark to generate continuous geopolitical and current affairs insights.You will learn all about the core Spark APIs and take a comprehensive tour of advanced libraries, including Spark SQL, Spark Streaming, MLlib, and more. You will be introduced to advanced techniques and methods that will help you to construct commercial-grade data products. Focusing on a sequence of tutorials that deliver a working news intelligence service, you will learn about advanced Spark architectures, how to work with geographic data in Spark, and how to tune Spark algorithms so they scale linearly.
Table of Contents (22 chapters)
Mastering Spark for Data Science
Credits
Foreword
About the Authors
About the Reviewer
www.PacktPub.com
Customer Feedback
Preface

Companion tools


Now that we have established a technology stack to use, let's describe each of the components and explain why they are useful in a Spark environment. This part of the book is designed as a reference rather than a straight read. If you're familiar with most of the technologies, then you can refresh your knowledge and continue to the next section, Chapter 2, Data Acquisition.

Apache HDFS

The Hadoop Distributed File System (HDFS) is a distributed filesystem with built-in redundancy. It is optimized to work on three or more nodes by default (although one will work fine and the limit can be increased), which provides the ability to store data in replicated blocks. So not only is a file split into a number of blocks but three copies of those blocks exist at any one time. This cleverly provides data redundancy (if one is lost two others still exist) but also provides data locality. When a distributed job is run against HDFS, not only will the system attempt to gather all of the blocks required for the data input to that job, it will also attempt to only use the blocks which are physically close to the server running that job; so it has the ability to reduce network bandwidth using only the blocks on its local storage, or those on nodes close to itself. This is achieved in practice by allocating HDFS physical disks to nodes, and nodes to racks; blocks are written in a node-local, rack-local, and cluster-local method. All instructions to HDFS are passed through a central server called NameNode, so this provides a possible central point of failure; there are various methods for providing NameNode redundancy.

Furthermore, in a multi-tenanted HDFS scenario, where many processes are accessing the same file at the same time, load balancing can also be achieved through the use of multiple blocks; for example, if a file takes up one block, this block is replicated three times and, therefore, potentially can be read from three different physical locations concurrently. Although this may not seem like a big win, on clusters of hundreds or thousands of nodes the network IO is often the single most limiting factor to a running job–the authors have certainly experienced times on multi-thousand node clusters where jobs have had to wait hours to complete purely because the network bandwidth has been maxed out due to the large number of other threads calling for data.

If you are running a laptop, require data to be stored locally, or wish to use the hardware you already have, then HDFS is a good option.

Advantages

The following are the advantages of using HDFS:

  • Redundancy: Configurable replication of blocks provides tolerance for node and disk failure

  • Load balancing: Block replication means the same data can be accessed from different physical locations

  • Data locality: Analytics try to access the closest relevant physical block, reducing network IO.

  • Data balance: An algorithm is available to re-balance the data blocks as they become too clustered or fragmented.

  • Flexible storage: If more space is needed, further disks and nodes can be added; although this is not a hot process, the cluster will require outage to add these resources

  • Additional costs: No third-party costs are involved

  • Data encryption: Implicit encryption (when turned on)

Disadvantages

The following are the disadvantages:

  • The NameNode provides for a central point of failure; to mitigate this, there are secondary and high availability options available

  • A cluster requires basic administration and potentially some hardware effort

Installation

To use HDFS, we should decide whether to run Hadoop in a local, pseudo-distributed or fully-distributed manner; for a single server, pseudo-distributed is useful as analytics should translate directly from this machine to any Hadoop cluster. In any case, we should install Hadoop with at least the following components:

  • NameNode

  • Secondary NameNode (or High Availability NameNode)

  • DataNode

Hadoop can be installed via http://hadoop.apache.org/releases.html.

Spark needs to know the location of the Hadoop configuration, specifically the following files: hdfs-site.xml, core-site.xml. This is then set in the configuration parameter HADOOP_CONF_DIR in your Spark configuration.

HDFS will then be available natively, so the file hdfs://user/local/dir/text.txt can be addressed in Spark simply using /user/local/dir/text.txt.

Amazon S3

S3 abstracts away all of the issues related to parallelism, storage restrictions, and security allowing very large parallel read/write operations along with a great Service Level Agreement (SLA) for a very small cost. This is perfect if you need to get up and running quickly, can't store data locally, or don't know what your future storage requirements might be. It should be recognized that s3n and S3a utilize an object storage model, not file storage, and therefore there are some compromises:

  • Eventual consistency is where changes made by one application (creation, updates, and deletions) will not be visible until some undefined time, although most AWS regions now support read-after-write consistency.

  • s3n and s3a utilize nonatomic rename and delete operations; therefore, renaming or deleting large directories takes time proportional to the number of entries. However, target files can remain visible to other processes during this time, and indeed, until the eventual consistency has been resolved.

S3 can be accessed through command-line tools (s3cmd) via a webpage and via APIs for most popular languages; it has native integration with Hadoop and Spark through a basic configuration.

Advantages

The following are the advantages:

  • Infinite storage capacity

  • No hardware considerations

  • Encryption available (user stored keys)

  • 99.9% availability

  • Redundancy

Disadvantages

The following are the disadvantages:

  • Cost to store and transfer data

  • No data locality

  • Eventual consistency

  • Relatively high latency

Installation

You can create an AWS account: https://aws.amazon.com/free/. Through this account, you will have access to S3 and will simply need to create some credentials.

The current S3 standard is s3a; to use it through Spark requires some changes to the Spark configuration:

spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem 
spark.hadoop.fs.s3a.access.key=MyAccessKeyID 
spark.hadoop.fs.s3a.secret.key=MySecretKey

If using HDP, you may also need:

spark.driver.extraClassPath=${HADOOP_HOME}/extlib/hadoop-aws-currentversion.jar:${HADOOP_HOME}/ext/aws-java-sdk-1.7.4.jar

All S3 files will then be accessible within Spark using the prefix s3a:// to the S3 object reference:

val rdd = spark.sparkContext.textFile("s3a://user/dir/text.txt") 

We can also use the AWS credentials inline assuming that we have set spark.hadoop.fs.s3a.impl:

spark.sparkContext.textFile("s3a://AccessID:SecretKey@user/dir/file") 

However, this method will not accept the forward-slash character / in either of the keys. This is usually solved by obtaining another key from AWS (keep generating a new one until there are no forward-slashes present).

We can also browse the objects through the web interface located under the S3 tab in your AWS account.

Apache Kafka

Apache Kafka is a distributed, message broker written in Scala and available under the Apache Software Foundation license. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. The result is essentially a massively scalable publish-subscribe message queue, making it highly valuable for enterprise infrastructures to process streaming data.

Advantages

The following are the advantages:

  • Publish-subscribe messaging

  • Fault-tolerant

  • Guaranteed delivery

  • Replay messages on failure

  • Highly-scalable, shared-nothing architecture

  • Supports back pressure

  • Low latency

  • Good Spark-streaming integration

  • Simple for clients to implement

Disadvantages

The following are the disadvantages:

  • At least once semantics - cannot provide exactly-once messaging due to lack of a transaction manager (as yet)

  • Requires Zookeeper for operation

Installation

As Kafka is a pub-sub tool, its purpose is to manage messages (publishers) and direct them to the relevant endpoints (subscribers). This is done using a broker, which is installed when implementing Kafka. Kafka is available through the Hortonworks HDP platform, or can be installed independently from this link http://kafka.apache.org/downloads.html.

Kafka uses Zookeeper to manage leadership election (as Kafka can be distributed thus allowing for redundancy), the quick start guide found in the preceding link can be used to set up a single node Zookeeper instance, and also provide a client and consumer to publish and subscribe to topics, which provide the mechanism for message handling.

Apache Parquet

Since the inception of Hadoop, the idea of columnar-based formats (as opposed to row based) has been gaining increasing support. Parquet has been developed to take advantage of compressed, efficient columnar data representation and is designed with complex nested data structures in mind; taking the lead from algorithms discussed in the Apache Dremel paper http://research.google.com/pubs/pub36632.html. Parquet allows compression schemes to be specified on a per-column level, and is future-proofed for adding more encodings as they are implemented. It has also been designed to provide compatibility throughout the Hadoop ecosystem and, like Avro, stores the data schema with the data itself.

Advantages

The following are the advantages:

  • Columnar storage

  • Highly storage efficient

  • Per column compression

  • Supports predicate pushdown

  • Supports column pruning

  • Compatible with other formats, for example, Avro

  • Read efficient, designed for partial data retrieval

Disadvantages

The following are the disadvantages:

  • Not good for random access

  • Potentially computationally intensive for writes

Installation

Parquet is natively available in Spark and can be accessed directly as follows:

val ds = Seq(1, 2, 3, 4, 5).toDS 
ds.write.parquet("/data/numbers.parquet") 
val fromParquet = spark.read.parquet("/data/numbers.parquet")

Apache Avro

Apache Avro is a data serialization framework originally developed for Hadoop. It uses JSON for defining data types and protocols (although there is an alternative IDL), and serializes data in a compact binary format. Avro provides both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services. Another useful feature is its ability to store the data schema along with the data itself, so any Avro file can always be read without the need for referencing external sources. Further, Avro supports schema evolution and therefore backwards compatibility between Avro files written with older schema versions being read with a newer schema version.

Advantages

The following are the advantages:

  • Schema evolution

  • Disk space savings

  • Supports schemas in JSON and IDL

  • Supports many languages

  • Supports compression

Disadvantages

The following are the disadvantages:

  • Requires schema to read and write data

  • Serialization computationally heavy

Installation

As we are using Scala, Spark, and Maven environments in this book, Avro can be imported as follows:

<dependency>   
   <groupId>org.apache.avro</groupId>   
   <artifactId>avro</artifactId>   
   <version>1.7.7</version> 
</dependency> 

It is then a matter of creating a schema and producing the Scala code to write data to Avro using the schema. This is explained in detail in Chapter 3, Input Formats and Schema.

Apache NiFi

Apache NiFi originated from the United States National Security Agency (NSA) where it was released to open source in 2014 as part of their Technology Transfer Program. NiFi enables the production of scalable directed graphs of data routing and transformation, within a simple user interface. It also supports data provenance, a wide range of prebuilt processors and the ability to build new processors quickly and efficiently. It has prioritization, tunable delivery tolerances, and back-pressure features included, which allow the user to tune processors and pipelines for specific requirements, even allowing flow modification at runtime. All of this adds up to an incredibly flexible tool for building everything from one-off file download data flows through to enterprise grade ETL pipelines. It is generally quicker to build a pipeline and download files with NiFi than even writing a quick bash script, adding in the feature-rich processors used for this and it makes for a compelling proposition.

Advantages

The following are the advantages:

  • Wide range of processors

  • Hub and spoke architecture

  • Graphical User Interface (GUI)

  • Scalable

  • Simplifies parallel processing

  • Simplifies thread handling

  • Allows runtime modifications

  • Redundancy through clusters

Disadvantages

The following are the disadvantages:

  • No cross-cutting error handler

  • Expression language is only partially implemented

  • Flowfile version management lacking

Installation

Apache NiFi can be installed with Hortonworks and is known as Hortonworks Dataflow. It is also available as a standalone install from Apache, https://nifi.apache.org/. There is an introduction to NiFi in Chapter 2, Data Acquisition.

Apache YARN

YARN is the principle component of Hadoop 2.0, which essentially allows Hadoop to plug in processing paradigms rather than being limited to just the original MapReduce. YARN consists of three main components: the resource manager, node manager, and application manager. It is out of the scope of this book to dive into YARN; the main thing to understand is that if we are running a Hadoop cluster, then our Spark jobs can be executed using YARN in client mode, as follows:

spark-submit --class package.Class /  
             --master yarn / 
             --deploy-mode client [options] <app jar> [app options] 

Advantages

The following are the advantages:

  • Supports Spark

  • Supports prioritized scheduling

  • Supports data locality

  • Job history archive

  • Works out of the box with HDP

Disadvantages

The following are the disadvantages:

  • No CPU resource control

  • No support for data lineage

Installation

YARN is installed as part of Hadoop; this could either be Hortonworks HDP, Apache Hadoop, or one of the other vendors. In any case, we should install Hadoop with at least the following components:

  • ResourceManager

  • NodeManager (1 or more)

To ensure that Spark can use YARN, it simply needs to know the location of yarn-site.xml, which is set using the YARN_CONF_DIR parameter in your Spark configuration.

Apache Lucene

Lucene is an indexing and search library tool originally built with Java, but now ported to several other languages, including Python. Lucene has spawned a number of subprojects in its time, including Mahout, Nutch, and Tika. These have now become top-level Apache projects in their own right while Solr has more recently joined as a subproject. Lucene has a comprehensive capability, but is particularly known for its use in Q&A search engines and information-retrieval systems.

Advantages

The following are the advantages:

  • Highly efficient full-text searches

  • Scalable

  • Multilanguage support

  • Excellent out-of-the-box functionality

Disadvantages

The disadvantage is databases are generally better for relational operations.

Installation

Lucene can be downloaded from https://lucene.apache.org/ if you wish to learn more and interact with the library directly.

When utilizing Lucene, we only really need to include lucene-core-<version>.jar in our project. For example, when using Maven:

<dependency> 
    <groupId>org.apache.lucene</groupId> 
    <artifactId>lucene-core</artifactId> 
    <version>6.1.0</version> 
</dependency> 

Kibana

Kibana is an analytics and visualization platform that also provides charting and streaming data summarization. It uses Elasticsearch for its data source (which in turn uses Lucene) and can therefore leverage very powerful search and indexing capabilities at scale. Kibana can be used to visualize data in many different ways, including bar charts, histograms, and maps. We have mentioned Kibana briefly towards the end of this chapter and it will be used extensively throughout this book.

Advantages

The following are the advantages:

  • Visualize data at scale

  • Intuitive interface to quickly develop dashboards

Disadvantages

The following are the disadvantages:

  • Only integrates with Elasticsearch

  • Kibana releases are tied to specific Elasticsearch versions

Installation

Kibana can easily be installed as a standalone piece since it has its own web server. It can be downloaded from https://www.elastic.co/downloads/kibana. As Kibana requires Elasticsearch, this will also need to be installed; see preceding link for more information. The Kibana configuration is handled in config/kibana.yml, if you have installed a standalone version of Elasticsearch, then no changes are required, it will work out of the box!

Elasticsearch

Elasticsearch is a web-based search engine based on Lucene (see previously). It provides a distributed, multitenant-capable full-text search engine with schema-free JSON documents. It is built in Java but can be utilized from any language due to its HTTP web interface. This makes it particularly useful for transactions and/or data-intensive instructions that are to be displayed via web pages.

Advantages

The advantages are as follows:

  • Distributed

  • Schema free

  • HTTP interface

Disadvantages

The disadvantages are as follows

  • Unable to perform distributed transactions

  • Lack of frontend tooling

Installation

Elasticsearch can be installed from https://www.elastic.co/downloads/elasticsearch. To provide access to the Rest API, we can import the Maven dependency:

<dependency> 
    <groupId>org.elasticsearch</groupId> 
    <artifactId>elasticsearch-spark_2.10</artifactId> 
    <version>2.2.0-m1</version> 
</dependency> 

There is also a great tool to help with administering Elasticsearch content. Search for the Chrome extension, Sense, at https://chrome.google.com/webstore/category/extensions. With a further explanation found at: https://www.elastic.co/blog/found-sense-a-cool-json-aware-interface-to-elasticsearch. Alternatively, it is available for Kibana at https://www.elastic.co/guide/en/sense/current/installing.html.

Accumulo

Accumulo is a no-sql database based on Google's Bigtable design and was originally developed by the American National Security Agency, subsequently being released to the Apache community in 2011. Accumulo offers us the usual big data advantages such as bulk loading and parallel reading but also has some additional capabilities; iterators, for efficient server and client side pre-computation, data aggregation and, most importantly, cell level security. The security aspect of Accumulo makes it very useful for Enterprise usage as it enables flexible security in a multitenant environment. Accumulo is powered by Apache Zookeeper, in the same way as Kafka, and also leverages Apache Thrift, https://thrift.apache.org/, which enables a cross language Remote Procedural Call (RPC) capability.

Advantages

The advantages are as follows:

  • Pure implementation of Google Bigtable

  • Cell level security

  • Scalable

  • Redundancy

  • Provides iterators for server-side computation

Disadvantages

The disadvantages are as follows:

  • Zookeeper not universally popular with DevOps

  • Not always the most efficient choice for bulk relational operations

Installation

Accumulo can be installed as part of the Hortonworks HDP release, or may be installed as a standalone instance from https://accumulo.apache.org/. The instance should then be configured using the installation documentation, at the time of writing https://accumulo.apache.org/1.7/accumulo_user_manual#_installation.

In Chapter 7, Building Communities, we demonstrate the use of Accumulo with Spark, along with some of the more advanced features such as Iterators and InputFormats. We also show how to work with data between Elasticsearch and Accumulo.