Book Image

Machine Learning with Spark. - Second Edition

By : Rajdeep Dua, Manpreet Singh Ghotra
Book Image

Machine Learning with Spark. - Second Edition

By: Rajdeep Dua, Manpreet Singh Ghotra

Overview of this book

This book will teach you about popular machine learning algorithms and their implementation. You will learn how various machine learning concepts are implemented in the context of Spark ML. You will start by installing Spark in a single and multinode cluster. Next you'll see how to execute Scala and Python based programs for Spark ML. Then we will take a few datasets and go deeper into clustering, classification, and regression. Toward the end, we will also cover text processing using Spark ML. Once you have learned the concepts, they can be applied to implement algorithms in either green-field implementations or to migrate existing systems to this new platform. You can migrate from Mahout or Scikit to use Spark ML. By the end of this book, you will acquire the skills to leverage Spark's features to create your own scalable machine learning applications and power a modern data-driven business.
Table of Contents (13 chapters)

The Spark programming model

Before we delve into a high-level overview of Spark's design, we will introduce the SparkContext object as well as the Spark shell, which we will use to interactively explore the basics of the Spark programming model.

While this section provides a brief overview and examples of using Spark, we recommend that you read the following documentation to get a detailed understanding:

Refer to the following URLs:

  • For the Spark Quick Start refer to, http://spark.apache.org/docs/latest/quick-start
  • For the Spark Programming guide, which covers Scala, Java, Python and R--, refer to, http://spark.apache.org/docs/latest/programming-guide.html

SparkContext and SparkConf

The starting point of writing any Spark program is SparkContext (or JavaSparkContext in Java). SparkContext is initialized with an instance of a SparkConf object, which contains various Spark cluster-configuration settings (for example, the URL of the master node).

It is a main entry point for Spark functionality. A SparkContext is a connection to a Spark cluster. It can be used to create RDDs, accumulators, and broadcast variables on the cluster.

Only one SparkContext is active per JVM. You must call stop(), which is the active SparkContext, before creating a new one.

Once initialized, we will use the various methods found in the SparkContext object to create and manipulate distributed datasets and shared variables. The Spark shell (in both Scala and Python, which is unfortunately not supported in Java) takes care of this context initialization for us, but the following lines of code show an example of creating a context running in the local mode in Scala:

val conf = new SparkConf() 
.setAppName("Test Spark App")
.setMaster("local[4]")
val sc = new SparkContext(conf)

This creates a context running in the local mode with four threads, with the name of the application set to Test Spark App. If we wish to use the default configuration values, we could also call the following simple constructor for our SparkContext object, which works in the exact same way:

val sc = new SparkContext("local[4]", "Test Spark App")
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book from any other source, you can visithttp://www.packtpub.com/support and register to have the files e-mailed directly to you.

SparkSession

SparkSession allows programming with the DataFrame and Dataset APIs. It is a single point of entry for these APIs.

First, we need to create an instance of the SparkConf class and use it to create the SparkSession instance. Consider the following example:

val spConfig = (new SparkConf).setMaster("local").setAppName("SparkApp")
val spark = SparkSession
.builder()
.appName("SparkUserData").config(spConfig)
.getOrCreate()

Next we can use spark object to create a DataFrame:

val user_df = spark.read.format("com.databricks.spark.csv")
.option("delimiter", "|").schema(customSchema)
.load("/home/ubuntu/work/ml-resources/spark-ml/data/ml-100k/u.user")
val first = user_df.first()

The Spark shell

Spark supports writing programs interactively using the Scala, Python, or R REPL (that is, the Read-Eval-Print-Loop, or interactive shell). The shell provides instant feedback as we enter code, as this code is immediately evaluated. In the Scala shell, the return result and type is also displayed after a piece of code is run.

To use the Spark shell with Scala, simply run ./bin/spark-shell from the Spark base directory. This will launch the Scala shell and initialize SparkContext, which is available to us as the Scala value, sc. With Spark 2.0, a SparkSession instance in the form of Spark variable is available in the console as well.

Your console output should look similar to the following:

$ ~/work/spark-2.0.0-bin-hadoop2.7/bin/spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-
defaults.properties

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:14:25 WARN NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes
where applicable

16/08/06 22:14:25 WARN Utils: Your hostname, ubuntu resolves to a
loopback address: 127.0.1.1; using 192.168.22.180 instead (on
interface eth1)

16/08/06 22:14:25 WARN Utils: Set SPARK_LOCAL_IP if you need to
bind to another address

16/08/06 22:14:26 WARN Utils: Service 'SparkUI' could not bind on
port 4040. Attempting port 4041.

16/08/06 22:14:27 WARN SparkContext: Use an existing SparkContext,
some configuration may not take effect.

Spark context Web UI available at http://192.168.22.180:4041
Spark context available as 'sc' (master = local[*], app id = local-
1470546866779).

Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / ______/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.0.0
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM,
Java 1.7.0_60)

Type in expressions to have them evaluated.
Type :help for more information.

scala>

To use the Python shell with Spark, simply run the ./bin/pyspark command. Like the Scala shell, the Python SparkContext object should be available as the Python variable, sc. Your output should be similar to this:

~/work/spark-2.0.0-bin-hadoop2.7/bin/pyspark 
Python 2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more
information.

Using Spark's default log4j profile: org/apache/spark/log4j-
defaults.properties

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:16:15 WARN NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes
where applicable

16/08/06 22:16:15 WARN Utils: Your hostname, ubuntu resolves to a
loopback address: 127.0.1.1; using 192.168.22.180 instead (on
interface eth1)

16/08/06 22:16:15 WARN Utils: Set SPARK_LOCAL_IP if you need to
bind to another address

16/08/06 22:16:16 WARN Utils: Service 'SparkUI' could not bind on
port 4040. Attempting port 4041.

Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / ______/ __/ '_/
/__ / .__/_,_/_/ /_/_ version 2.0.0
/_/

Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
SparkSession available as 'spark'.
>>>

R is a language and has a runtime environment for statistical computing and graphics. It is a GNU project. R is a different implementation of S (a language developed by Bell Labs).

R provides statistical (linear and nonlinear modeling, classical statistical tests, time-series analysis, classification, and clustering) and graphical techniques. It is considered to be highly extensible.

To use Spark using R, run the following command to open Spark-R shell:

$ ~/work/spark-2.0.0-bin-hadoop2.7/bin/sparkR
R version 3.0.2 (2013-09-25) -- "Frisbee Sailing"
Copyright (C) 2013 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

Launching java with spark-submit command /home/ubuntu/work/spark-
2.0.0-bin-hadoop2.7/bin/spark-submit "sparkr-shell"
/tmp/RtmppzWD8S/backend_porta6366144af4f

Using Spark's default log4j profile: org/apache/spark/log4j-
defaults.properties

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:26:22 WARN NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes
where applicable

16/08/06 22:26:22 WARN Utils: Your hostname, ubuntu resolves to a
loopback address: 127.0.1.1; using 192.168.22.186 instead (on
interface eth1)

16/08/06 22:26:22 WARN Utils: Set SPARK_LOCAL_IP if you need to
bind to another address

16/08/06 22:26:22 WARN Utils: Service 'SparkUI' could not bind on
port 4040. Attempting port 4041.


Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ ____/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.0.0
/_/
SparkSession available as 'spark'.
During startup - Warning message:
package 'SparkR' was built under R version 3.1.1
>

Resilient Distributed Datasets

The core of Spark is a concept called the Resilient Distributed Dataset (RDD). An RDD is a collection of records (strictly speaking, objects of some type) that are distributed or partitioned across many nodes in a cluster (for the purposes of the Spark local mode, the single multithreaded process can be thought of in the same way). An RDD in Spark is fault-tolerant; this means that if a given node or task fails (for some reason other than erroneous user code, such as hardware failure, loss of communication, and so on), the RDD can be reconstructed automatically on the remaining nodes and the job will still be completed.

Creating RDDs

RDDs can be Scala Spark shells that you launched earlier:

val collection = List("a", "b", "c", "d", "e") 
val rddFromCollection = sc.parallelize(collection)

RDDs can also be created from Hadoop-based input sources, including the local filesystem, HDFS, and Amazon S3. A Hadoop-based RDD can utilize any input format that implements the Hadoop InputFormat interface, including text files, other standard Hadoop formats, HBase, Cassandra, tachyon, and many more.

The following code is an example of creating an RDD from a text file located on the local filesystem:

val rddFromTextFile = sc.textFile("LICENSE")

The preceding textFile method returns an RDD where each record is a String object that represents one line of the text file. The output of the preceding command is as follows:

rddFromTextFile: org.apache.spark.rdd.RDD[String] = LICENSE   
MapPartitionsRDD[1] at textFile at <console>:24

The following code is an example of how to create an RDD from a text file located on the HDFS using hdfs:// protocol:

val rddFromTextFileHDFS = sc.textFile("hdfs://input/LICENSE ")

The following code is an example of how to create an RDD from a text file located on the Amazon S3 using s3n:// protocol:

val rddFromTextFileS3 = sc.textFile("s3n://input/LICENSE ")

Spark operations

Once we have created an RDD, we have a distributed collection of records that we can manipulate. In Spark's programming model, operations are split into transformations and actions. Generally speaking, a transformation operation applies some function to all the records in the dataset, changing the records in some way. An action typically runs some computation or aggregation operation and returns the result to the driver program where SparkContext is running.

Spark operations are functional in style. For programmers familiar with functional programming in Scala, Python, or Lambda expressions in Java 8, these operations should seem natural. For those without experience in functional programming, don't worry; the Spark API is relatively easy to learn.

One of the most common transformations that you will use in Spark programs is the map operator. This applies a function to each record of an RDD, thus mapping the input to some new output. For example, the following code fragment takes the RDD we created from a local text file and applies the size function to each record in the RDD. Remember that we created an RDD of Strings. Using map, we can transform each string to an integer, thus returning an RDD of Ints:

val intsFromStringsRDD = rddFromTextFile.map(line => line.size)

You should see output similar to the following line in your shell; this indicates the type of the RDD:

intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] = 
MapPartitionsRDD[2] at map at <console>:26

In the preceding code, we saw the use of the => syntax. This is the Scala syntax for an anonymous function, which is a function that is not a named method (that is, one defined using the def keyword in Scala or Python, for example).

While a detailed treatment of anonymous functions is beyond the scope of this book, they are used extensively in Spark code in Scala and Python, as well as in Java 8 (both in examples and real-world applications), so it is useful to cover a few practicalities.
The line => line.size syntax means that we are applying a function where => is the operator, and the output is the result of the code to the right of the => operator. In this case, the input is line, and the output is the result of calling line.size. In Scala, this function that maps a string to an integer is expressed as String => Int.
This syntax saves us from having to separately define functions every time we use methods such as map; this is useful when the function is simple and will only be used once, as in this example.

Now, we can apply a common action operation, count, to return the number of records in our RDD:

intsFromStringsRDD.count

The result should look something like the following console output:

res0: Long = 299

Perhaps we want to find the average length of each line in this text file. We can first use the sum function to add up all the lengths of all the records and then divide the sum by the number of records:

val sumOfRecords = intsFromStringsRDD.sum 
val numRecords = intsFromStringsRDD.count
val aveLengthOfRecord = sumOfRecords / numRecords

The result will be as follows:

scala> intsFromStringsRDD.count
res0: Long = 299

scala> val sumOfRecords = intsFromStringsRDD.sum
sumOfRecords: Double = 17512.0

scala> val numRecords = intsFromStringsRDD.count
numRecords: Long = 299

scala> val aveLengthOfRecord = sumOfRecords / numRecords
aveLengthOfRecord: Double = 58.5685618729097

Spark operations, in most cases, return a new RDD, with the exception of most actions, which return the result of a computation (such as Long for count and Double for sum in the preceding example). This means that we can naturally chain together operations to make our program flow more concise and expressive. For example, the same result as the one in the preceding line of code can be achieved using the following code:

val aveLengthOfRecordChained = rddFromTextFile.map(line => line.size).sum / rddFromTextFile.count

An important point to note is that Spark transformations are lazy. That is, invoking a transformation on an RDD does not immediately trigger a computation. Instead, transformations are chained together and are effectively only computed when an action is called. This allows Spark to be more efficient by only returning results to the driver when necessary so that the majority of operations are performed in parallel on the cluster.

This means that if your Spark program never uses an action operation, it will never trigger an actual computation, and you will not get any results. For example, the following code will simply return a new RDD that represents the chain of transformations:

val transformedRDD = rddFromTextFile.map(line => line.size).filter(size => size > 10).map(size => size * 2)

This returns the following result in the console:

transformedRDD: org.apache.spark.rdd.RDD[Int] = 
MapPartitionsRDD[6] at map at <console>:26

Notice that no actual computation happens and no result is returned. If we now call an action, such as sum, on the resulting RDD, the computation will be triggered:

val computation = transformedRDD.sum

You will now see that a Spark job is run, and it results in the following console output:

computation: Double = 35006.0
The complete list of transformations and actions possible on RDDs, as well as a set of more detailed examples, are available in the Spark programming guide (located at http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations), and the API documentation (the Scala API documentation) is located at (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD).

Caching RDDs

One of the most powerful features of Spark is the ability to cache data in memory across a cluster. This is achieved through the use of the cache method on an RDD:

rddFromTextFile.cache
res0: rddFromTextFile.type = MapPartitionsRDD[1] at textFile at
<console>:27

Calling cache on an RDD tells Spark that the RDD should be kept in memory. The first time an action is called on the RDD that initiates a computation, the data is read from its source and put into memory. Hence, the first time such an operation is called, the time it takes to run the task is partly dependent on the time it takes to read the data from the input source. However, when the data is accessed the next time (for example, in subsequent queries in analytics or iterations in a machine learning model), the data can be read directly from memory, thus avoiding expensive I/O operations and speeding up the computation, in many cases, by a significant factor.

If we now call the count or sum function on our cached RDD, the RDD is loaded into memory:

val aveLengthOfRecordChained = rddFromTextFile.map(line => 
line.size).sum / rddFromTextFile.count
Spark also allows more fine-grained control over caching behavior. You can use the persist method to specify what approach Spark uses to cache data. More information on RDD caching can be found here:
http://spark.apache.org/docs/latest/programmingguide.html#rdd-persistence

Broadcast variables and accumulators

Another core feature of Spark is the ability to create two special types of variables--broadcast variables and accumulators.

A broadcast variable is a read-only variable that is created from the driver program object and made available to the nodes that will execute the computation. This is very useful in applications that need to make the same data available to the worker nodes in an efficient manner, such as distributed systems. Spark makes creating broadcast variables as simple as calling a method on SparkContext, as follows:

val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))

A broadcast variable can be accessed from nodes other than the driver program that created it (that is, the worker nodes) by calling value on the variable:

sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++  
x).collect

This code creates a new RDD with three records from a collection (in this case, a Scala List) of ("1", "2", "3"). In the map function, it returns a new collection with the relevant rom our new RDD appended to the broadcastAList that is our broadcast variable:

...
res1: Array[List[Any]] = Array(List(a, b, c, d, e, 1), List(a, b,
c, d, e, 2), List(a, b, c, d, e, 3))

Notice the collect method in the preceding code. This is a Spark action that returns the entire RDD to the driver as a Scala (or Python or Java) collection.

We will often use when we wish to apply further processing to our results locally within the driver program.

Note that collect should generally only be used in cases where we really want to return the full result set to the driver and perform further processing. If we try to call collect on a very large dataset, we might run out of memory on the driver and crash our program.
It is preferable to perform as much heavy-duty processing on our Spark cluster as possible, preventing the driver from becoming a bottleneck. In many cases, however, such as during iterations in many machine learning models, collecting results to the driver is necessary.

On inspecting the result, we will see that for each of the three records in our new RDD, we now have a record that is our original broadcasted List, with the new element appended to it (that is, there is now "1", "2", or "3" at the end):

An accumulator is also a variable that is broadcasted to the worker nodes. The key difference between a broadcast variable and an accumulator is that while the broadcast variable is read-only, the accumulator can be added to. There are limitations to this, that is, in particular, the addition must be an associative operation so that the global accumulated value can be correctly computed in parallel and returned to the driver program. Each worker node can only access and add to its own local accumulator value, and only the driver program can access the global value. Accumulators are also accessed within the Spark code using the value method.

For more details on broadcast variables and accumulators, refer to the Shared Variables section of the Spark Programming Guide at http://spark.apache.org/docs/latest/programming-guide.html#shared-variables.