Book Image

Hands-On Deep Learning with Apache Spark

By : Guglielmo Iozzia
Book Image

Hands-On Deep Learning with Apache Spark

By: Guglielmo Iozzia

Overview of this book

Deep learning is a subset of machine learning where datasets with several layers of complexity can be processed. Hands-On Deep Learning with Apache Spark addresses the sheer complexity of technical and analytical parts and the speed at which deep learning solutions can be implemented on Apache Spark. The book starts with the fundamentals of Apache Spark and deep learning. You will set up Spark for deep learning, learn principles of distributed modeling, and understand different types of neural nets. You will then implement deep learning models, such as convolutional neural networks (CNNs), recurrent neural networks (RNNs), and long short-term memory (LSTM) on Spark. As you progress through the book, you will gain hands-on experience of what it takes to understand the complex datasets you are dealing with. During the course of this book, you will use popular deep learning frameworks, such as TensorFlow, Deeplearning4j, and Keras to train your distributed models. By the end of this book, you'll have gained experience with the implementation of your models on a variety of use cases.
Table of Contents (19 chapters)
Appendix A: Functional Programming in Scala
Appendix B: Image Data Preparation for Spark

Spark Streaming

Spark Streaming is another Spark module that extends the core Spark API and provides a scalable, fault-tolerant, and efficient way of processing live streaming data. By converting streaming data into micro batches, Spark's simple batch programming model can be applied in streaming use cases too. This unified programming model makes it easy to combine batch and interactive data processing with streaming. Diverse sources that ingest data are supported (Kafka, Kinesis, TCP sockets, S3, or HDFS, just to mention a few of the popular ones), as well as data coming from them, and can be processed using any of the high-level functions available in Spark. Finally, the processed data can be persisted to RDBMS, NoSQL databases, HDFS, object storage systems, and so on, or consumed through live dashboards. Nothing prevents other advanced Spark components, such as MLlib or GraphX, being applied to data streams:

Figure 1.8

The following diagram shows how Spark Streaming works internally—it receives live input data streams and divides them into batches; these are processed by the Spark engine to generate the final batches of results:

Figure 1.9

The higher-level abstraction of Spark Streaming is the DStream (short for Discretized Stream), which is a wrapper around a continuous flow of data. Internally, a DStream is represented as a sequence of RDDs. A DStream contains a list of other DStreams that it depends on, a function to convert its input RDDs into output ones, and a time interval at which to invoke the function. DStreams are created by either manipulating existing ones, for example, applying a map or filter function (which internally creates MappedDStreams and FilteredDStreams, respectively), or by reading from an external source (the base class in these cases is InputDStream).

Let's implement a simple Scala example—a streaming word count self-contained application. The code used for this class can be found among the examples that are bundled with the Spark distribution. To compile and package it, you need to add the dependency to Spark Streaming to your Maven, Gradle, or sbt project descriptor, along with the dependencies from Spark Core and Scala.

First, we have to create the SparkConf and a StreamingContext (which is the main entry point for any streaming functionality) from it:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))

The batch interval has been set to 1 second. A DStream representing streaming data from a TCP source can be created using the ssc streaming context; we need just to specify the source hostname and port, as well as the desired storage level:

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

The returned lines DStream is the stream of data that is going to be received from the server. Each record will be a single line of text that we want to split into single words, thus specifying the space character as a separator:

val words = lines.flatMap(_.split(" "))

Then, we will count those words:

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

The words DStream is mapped (a one-to-one transformation) to a DStream of (word, 1) pairs, which is then reduced to get the frequency of words in each batch of data. The last command will print a few of the counts that are generated every second. Each RDD in a DStream contains data from a certain interval any operation applied on a DStream translates to operations on the underlying RDDs:

Figure 1.10

To start the processing after all the transformations have been set up, use the following code:

ssc.start()
ssc.awaitTermination()

Before running this example, first you will need to run netcat (a small utility found in most Unix-like systems) as a data server:

nc -lk 9999

Then, in a different Terminal, you can start the example by passing the following as arguments:

localhost 9999

Any line that's typed into the Terminal and run with the netcat server will be counted and printed on the application screen every second.

Regardless of whether nc shouldn't be available in the system where you run this example, you can implement your own data server in Scala:

import java.io.DataOutputStream
import java.net.{ServerSocket, Socket}
import java.util.Scanner

object SocketWriter {
def main(args: Array[String]) {
val listener = new ServerSocket(9999)
val socket = listener.accept()

val outputStream = new DataOutputStream(socket.getOutputStream())
System.out.println("Start writing data. Enter close when finish");
val sc = new Scanner(System.in)
var str = ""
/**
* Read content from scanner and write to socket.
*/
while (!(str = sc.nextLine()).equals("close")) {
outputStream.writeUTF(str);
}
//close connection now.
outputStream.close()
listener.close()
}
}

The same self-contained application in Python could be as follows:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

DStreams support most parts of the transformations that are available for RDDs. This means that data from input DStreams can be modified in the same way as the data in RDDs. The following table lists some of the common transformations supported by Spark DStreams:

Transformation Purpose
map(func) Returns a new DStream. The func map function is applied to each element of the source DStream.
flatMap(func) The same as for map. The only difference is that each input item in the new DStream can be mapped to 0 or more output items.
filter(func) Returns a new DStream containing only the elements of the source DStream for which the func filter function returned true.
repartition(numPartitions) This is used to set the level of parallelism by creating a different number of partitions.
union(otherStream) Returns a new DStream. It contains the union of the elements in the source DStream and the input otherDStream DStream.
count() Returns a new DStream. It contains single element RDDs that are obtained by counting the number of elements contained in each RDD arriving from the source.
reduce(func) Returns a new DStream. It contains single element RDDs that are obtained by aggregating those in each RDD of the source by applying the func function (which should be associative and commutative to allow for correct parallel computation).
countByValue() Returns a new DStream of (K, Long) pairs, where K is the type of the elements of the source. The value of each key represents its frequency in each RDD of the source.
reduceByKey(func, [numTasks]) Returns a new DStream of (K, V) pairs (for a source DStream of (K, V) pairs). The values for each key are aggregated by applying the reduce func function. To do the grouping, this transformation uses Spark's default number of parallel tasks (which is two in local mode, while it is determined by the config property spark.default.parallelism in cluster mode), but this can be changed by passing an optional numTasks argument.
join(otherStream, [numTasks]) Returns a new DStream of (K, (V, W)) pairs when called on two DStreams of (K, V) and (K, W) pairs, respectively.
cogroup(otherStream, [numTasks]) Returns a new DStream of (K, Seq[V], Seq[W]) tuples when called on two DStreams of (K, V) and (K, W) pairs, respectively.
transform(func) Returns a new DStream. It applies an RDD-to-RDD func function to every RDD of the source.
updateStateByKey(func) Returns a new state DStream. The state for each key in the new DStream is updated by applying the func input function to the previous state and the new values for the key.

Windowed computations are provided by Spark Streaming. As shown in the following diagram, they allow you to apply transformations over sliding windows of data:

Figure 1.11

When a window slides over a source DStream, all its RDDs that fall within that window are taken into account and transformed to produce the RDDs of the returned windowed DStream. Looking at the specific example that's shown in the preceding diagram, the window-based operation is applied over three time units of data and it slides by two. Two parameters need to be specified by any window operation that's used:

  • Window length: The duration of the window
  • Sliding interval: The interval at which the window operation is performed

These two parameters must be multiples of the batch interval of the source DStream.

Let's see how this could be applied to the application that was presented at the beginning of this section. Suppose you want to generate a word count every 10 seconds over the last 60 seconds of data. The reduceByKey operation needs to be applied on the (word, 1) pairs of the DStream over the last 60 seconds of data. This can be achieved with the reduceByKeyAndWindow operation. When translated into Scala code, this is as follows:

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(10))

For Python, it is as follows:

windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 60, 10)

The following table lists some of the common window operations supported by Spark for DStreams:

Transformation Purpose
window(windowLength, slideInterval) Returns a new DStream. It is based on windowed batches of the source.
countByWindow(windowLength, slideInterval) Returns a sliding window count (based on the windowLength and slideInterval parameters) of elements in the source DStream.
reduceByWindow(func, windowLength, slideInterval) Returns a new single element DStream. It is created by aggregating elements in the source DStream over a sliding interval by applying the func reduce function (which, to allow for correct parallel computation, is associative and commutative).
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) Returns a new DStream of (K, V) pairs (the same K and V as for the source DStream). The values for each key are aggregated using the func input function over batches (defined by the windowLength and slideInterval arguments) in a sliding window. The number of parallel tasks to do the grouping is two (default) in local mode, while in cluster mode this is given by the Spark configuration property spark.default.parallelism. numTask, which is an optional argument to specify a custom number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) This is a more efficient version of the reduceByKeyAndWindow transformation. This time, the reduce value of the current window is calculated incrementally using the reduce values of the previous one. This happens by reducing the new data that enters a window while inverse reducing the old data that leaves the same one. Please note that this mechanism only works if the func reduce function has a corresponding inverse reduce function, invFunc.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) Returns a DStream of (K, Long) pairs (whatever (K, V) pairs the source DStream is made of). The value of each key in the returned DStream is its frequency within a given sliding window (defined by the windowLength and slideInterval arguments). numTask is an optional argument to specify a custom number of tasks.