Book Image

Machine Learning with Spark. - Second Edition

By : Rajdeep Dua, Manpreet Singh Ghotra
Book Image

Machine Learning with Spark. - Second Edition

By: Rajdeep Dua, Manpreet Singh Ghotra

Overview of this book

This book will teach you about popular machine learning algorithms and their implementation. You will learn how various machine learning concepts are implemented in the context of Spark ML. You will start by installing Spark in a single and multinode cluster. Next you'll see how to execute Scala and Python based programs for Spark ML. Then we will take a few datasets and go deeper into clustering, classification, and regression. Toward the end, we will also cover text processing using Spark ML. Once you have learned the concepts, they can be applied to implement algorithms in either green-field implementations or to migrate existing systems to this new platform. You can migrate from Mahout or Scikit to use Spark ML. By the end of this book, you will acquire the skills to leverage Spark's features to create your own scalable machine learning applications and power a modern data-driven business.
Table of Contents (13 chapters)

The first step to a Spark program in Java

The Java API is very similar in principle to the Scala API. However, while Scala can call the Java code quite easily, in some cases, it is not possible to call the Scala code from Java. This is particularly the case when Scala code makes use of Scala features such as implicit conversions, default parameters, and the Scala reflection API.

Spark makes heavy use of these features in general, so it is necessary to have a separate API specifically for Java that includes Java versions of the common classes. Hence, SparkContext becomes JavaSparkContext and RDD becomes JavaRDD.

Java versions prior to version 8 do not support anonymous functions and do not have succinct syntax for functional-style programming, so functions in the Spark Java API must implement a WrappedFunction interface with the call method signature. While it is significantly more verbose, we will often create one-off anonymous classes to pass to our Spark operations, which implement this interface and the call method to achieve much the same effect as anonymous functions in Scala.

Spark provides support for Java 8's anonymous function (or lambda) syntax. Using this syntax makes a Spark program written in Java 8 look very close to the equivalent Scala program.

In Scala, an RDD of key/value pairs provides special operators (such as reduceByKey and saveAsSequenceFile, for example) that are accessed automatically via implicit conversions. In Java, special types of JavaRDD classes are required in order to access similar functions. These include JavaPairRDD to work with key/value pairs and JavaDoubleRDD to work with numerical records.

In this section, we covered the standard Java API syntax. For more details and examples related to working RDDs in Java, as well as the Java 8 lambda syntax, refer to the Java sections of the Spark Programming Guide found at http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations.

We will see examples of most of these differences in the following Java program, which is included in the example code of this chapter in the directory named java-spark-app. The code directory also contains the CSV data file under the data subdirectory.

We will build and run this project with the Maven build tool, which we assume you have installed on your system.

Installing and setting up Maven is beyond the scope of this book. Usually, Maven can easily be installed using the package manager on your Linux system or HomeBrew or MacPorts on Mac OS X.
Detailed installation instructions can be found at http://maven.apache.org/download.cgi.

The project contains a Java file called JavaApp.java, which contains our program code:

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;


/**
* A simple Spark app in Java
*/
public class JavaApp {
public static void main(String[] args) {

As in our Scala example, we first need to initialize our context. Note that we will use the JavaSparkContext class here instead of the SparkContext class that we used earlier. We will use the JavaSparkContext class in the same way to access our data using textFile and then split each row into the required fields. Note how we used an anonymous class to define a split function that performs the string processing in the highlighted code:

JavaSparkContext sc = new JavaSparkContext("local[2]", 
"First Spark App");
// we take the raw data in CSV format and convert it into a
// set of records of the form (user, product, price)
JavaRDD<String[]> data = sc.textFile("data/UserPurchaseHistory.csv").map(s -> s.split(","));

Now, we can compute the same metrics as we did in our Scala example. Note how some methods are the same (for example, distinct and count) for the Java and Scala APIs. Also note the use of anonymous classes that we pass to the map function. This code is highlighted here:

// let's count the number of purchases 
long numPurchases = data.count();
// let's count how many unique users made purchases
long uniqueUsers = data.map(strings ->
strings[0]).distinct().count();
// let's sum up our total revenue
Double totalRevenue = data.map(strings ->
Double.parseDouble(strings[2])).reduce((Double v1,
Double v2) -> new Double(v1.doubleValue() + v2.doubleValue()));

In the following lines of code, we can see that the approach to compute the most popular product is the same as that in the Scala example. The extra code might seem complex, but it is mostly related to the Java code required to create the anonymous functions (which we have highlighted here). The actual functionality is the same:

// let's find our most popular product 
List<Tuple2<String, Integer>> pairs = data.mapToPair(strings -> new Tuple2<String, Integer>(strings[1], 1)).reduceByKey((Integer i1, Integer i2) -> i1 + i2).collect();

Map<String, Integer> sortedData = new HashMap<>();
Iterator it = pairs.iterator();
while (it.hasNext()) {
Tuple2<String, Integer> o = (Tuple2<String, Integer>) it.next();
sortedData.put(o._1, o._2);
}
List<String> sorted = sortedData.entrySet()
.stream()
.sorted(Comparator.comparing((Map.Entry<String, Integer>
entry) -> entry.getValue()).reversed())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
String mostPopular = sorted.get(0);
int purchases = sortedData.get(mostPopular);
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
System.out.println("Total revenue: " + totalRevenue);
System.out.println(String.format("Most popular product:
%s with %d purchases", mostPopular, purchases));
}
}

As can be seen, the general structure is similar to the Scala version, apart from the extra boilerplate code used to declare variables and functions via anonymous inner classes. It is a good exercise to work through both examples and compare lines of Scala code to those in Java to understand how the same result is achieved in each language.

This program can be run with the following command executed from the project's base directory:

  $ mvn exec:java -Dexec.mainClass="JavaApp"

You will see output that looks very similar to the Scala version with identical results of the computation:

...
14/01/30 17:02:43 INFO spark.SparkContext: Job finished: collect
at JavaApp.java:46, took 0.039167 s

Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases