Book Image

Hands-On Deep Learning with Apache Spark

By : Guglielmo Iozzia
Book Image

Hands-On Deep Learning with Apache Spark

By: Guglielmo Iozzia

Overview of this book

Deep learning is a subset of machine learning where datasets with several layers of complexity can be processed. Hands-On Deep Learning with Apache Spark addresses the sheer complexity of technical and analytical parts and the speed at which deep learning solutions can be implemented on Apache Spark. The book starts with the fundamentals of Apache Spark and deep learning. You will set up Spark for deep learning, learn principles of distributed modeling, and understand different types of neural nets. You will then implement deep learning models, such as convolutional neural networks (CNNs), recurrent neural networks (RNNs), and long short-term memory (LSTM) on Spark. As you progress through the book, you will gain hands-on experience of what it takes to understand the complex datasets you are dealing with. During the course of this book, you will use popular deep learning frameworks, such as TensorFlow, Deeplearning4j, and Keras to train your distributed models. By the end of this book, you'll have gained experience with the implementation of your models on a variety of use cases.
Table of Contents (19 chapters)
Appendix A: Functional Programming in Scala
Appendix B: Image Data Preparation for Spark

Spark SQL, Datasets, and DataFrames

Spark SQL is the Spark module for structured data processing. The main difference between this API and the RDD API is that the provided Spark SQL interfaces give more information about the structure of both the data and the performed computation. This extra information is used by Spark internally to add extra optimizations through the Catalyst optimization engine, which is the same execution engine that's used regardless of whatever API or programming language is involved.

Spark SQL is commonly used to execute SQL queries (even if this isn't the only way to use it). Whatever programming language supported by Spark encapsulates the SQL code to be executed, the results of a query are returned as a Dataset. A Dataset is a distributed collection of data, and was added as an interface in Spark 1.6. It combines the benefits of RDDs (such as strong typing and the ability to apply useful lambda functions) with the benefits of Spark SQL's optimized execution engine (Catalyst, https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html). You can construct a Dataset by starting with Java/Scala objects and then manipulating it through the usual functional transformations. The Dataset API is available in Scala and Java, while Python doesn't have support for it. However, due to the dynamic nature of this programming language, many of the benefits of the Dataset API are already available for it.
Starting from Spark 2.0, the DataFrame and Dataset APIs have been merged into the Dataset API, so a DataFrame is just a Dataset that's been organized into named columns and is conceptually equivalent to a table in an RDBMS, but with better optimizations under the hood (being part of the Dataset API, the Catalyst optimization engine works behind the scenes for DataFrames, too). You can construct a DataFrame from diverse sources, such as structured data files, Hive tables, database tables, and RDDs, to name a few. Unlike the Dataset API, the DataFrame API is available in any of the programming languages that are supported by Spark.

Let's start and get hands-on so that we can better understand the concepts behind Spark SQL. The first full example I am going to show is Scala-based. Start a Scala Spark shell to run the following code interactively.

Let's use people.json as a data source. One of the files that's available as a resource for this example has been shipped along with the Spark distribution and can be used to create a DataFrame that's a Dataset of Rows (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row):

val df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

You can print the content of the DataFrame to the console to check that it is what you expected:

scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

Before you perform DataFrame operations, you need to import the implicit conversions (such as converting RDDs to DataFrames) and use the $ notation:

import spark.implicits._

Now, you can print the DataFrame schema in a tree format:

scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

Select a single column (let's say name):

scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+

Filter the data:

scala> df.filter($"age" > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

Then add a groupBy clause:

scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+

Select all rows and increment a numeric field:

scala> df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+

It is possible to run SQL queries programmatically through the sql function of SparkSession. This function returns the results of the query in a DataFrame, which, for Scala, is a Dataset[Row]. Let's consider the same DataFrame as for the previous example:

val df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

You can register it as an SQL temporary view:

df.createOrReplaceTempView("people")

Then, you can execute an SQL query there:

scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

The same things can be done in Python as well:

>>> df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

Resulting in the following:

>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

>>> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
>>> df.filter(df['age'] > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+

>>> df.select(df['name'], df['age'] + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+

>>> df.createOrReplaceTempView("people")
>>> sqlDF = spark.sql("SELECT * FROM people")
>>> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

Other features of Spark SQL and Datasets (data sources, aggregations, self-contained applications, and so on) will be covered in Chapter 3, Extract, Transform, Load.