Book Image

Scala and Spark for Big Data Analytics

By : Md. Rezaul Karim, Sridhar Alla
Book Image

Scala and Spark for Big Data Analytics

By: Md. Rezaul Karim, Sridhar Alla

Overview of this book

Scala has been observing wide adoption over the past few years, especially in the field of data science and analytics. Spark, built on Scala, has gained a lot of recognition and is being used widely in productions. Thus, if you want to leverage the power of Scala and Spark to make sense of big data, this book is for you. The first part introduces you to Scala, helping you understand the object-oriented and functional programming concepts needed for Spark application development. It then moves on to Spark to cover the basic abstractions using RDD and DataFrame. This will help you develop scalable and fault-tolerant streaming applications by analyzing structured and unstructured data using SparkSQL, GraphX, and Spark structured streaming. Finally, the book moves on to some advanced topics, such as monitoring, configuration, debugging, testing, and deployment. You will also learn how to develop Spark applications using SparkR and PySpark APIs, interactive data analytics using Zeppelin, and in-memory data processing with Alluxio. By the end of this book, you will have a thorough understanding of Spark, and you will be able to perform full-stack data analytics with a feel that no amount of data is too big.
Table of Contents (19 chapters)

Scala for Java programmers

Scala has a set of features that completely differ from Java. In this section, we will discuss some of these features. This section will be helpful for those who are from a Java background or are at least familiar with basic Java syntax and semantics.

All types are objects

As mentioned earlier, every value in Scala will look like an object. This statement means everything looks like an object, but some of them do not actually object and you will see the interpretation of this in the coming chapters (for example, the difference between the reference types and the primitive types still exists in Scala, but it hides it for the most part). For example, in Scala, strings are implicitly converted to collections of characters, but not in Java!

Type inference

If you are not familiar with the term, it is nothing but the deduction of types at compile time. Hold on, isn't that what dynamic typing means? Well, no. Notice that I said deduction of types; this is drastically different from what dynamically typed languages do, and another thing is, it is done at compile time and not runtime. Many languages have this built in, but the implementation varies from one language to another. This might be confusing at the beginning, but it will become clearer with code examples. Let's jump into the Scala REPL for some experimentation.

Scala REPL

The Scala REPL is a powerful feature that makes it more straightforward and concise to write Scala code on the Scala shell. REPL stands for Read-Eval-Print-Loop also called the Interactive Interpreter. This means it is a program for:

  1. Reading the expressions you type in.
  2. Evaluating the expression in step 1 using the Scala compiler.
  3. Printing out the result of the evaluation in step 2.
  4. Waiting (looping) for you to enter further expressions.
Figure 8: Scala REPL example 1

From the figure, it is evident that there is no magic, the variables are inferred automatically to the best types they deem fit at compile time. If you look even more carefully, when I tried to declare:

 i:Int = "hello"

Then, the Scala shell throws an error saying the following:

<console>:11: error: type mismatch;
found : String("hello")
required: Int
val i:Int = "hello"
^

According to Odersky, "Mapping a character to the character map over a RichString should again yield a RichString, as in the following interaction with the Scala REP". The preceding statement can be proved using the following line of code:

scala> "abc" map (x => (x + 1).toChar) 
res0: String = bcd

However, if someone applies a method from Char to Int to a String, then what happens? In that case, Scala converts them, as a vector of integer also called immutable is a feature of Scala collection, as shown in Figure 9. We will look at the details on Scala collection API in Chapter 4, Collections APIs.

"abc" map (x => (x + 1)) 
res1: scala.collection.immutable.IndexedSeq[Int] = Vector(98, 99, 100)

Both static and instance methods of objects are also available. For example, if you declare x as a string hello and then try to access both the static and instance methods of objects x, they are available. In the Scala shell, type x then . and <tab> and then you will find the available methods:

scala> val x = "hello"
x: java.lang.String = hello
scala> x.re<tab>
reduce reduceRight replaceAll reverse
reduceLeft reduceRightOption replaceAllLiterally reverseIterator
reduceLeftOption regionMatches replaceFirst reverseMap
reduceOption replace repr
scala>

Since this is all accomplished on the fly via reflection, even anonymous classes you've only just defined are equally accessible:

scala> val x = new AnyRef{def helloWord = "Hello, world!"}
x: AnyRef{def helloWord: String} = $anon$1@58065f0c
scala> x.helloWord
def helloWord: String
scala> x.helloWord
warning: there was one feature warning; re-run with -feature for details
res0: String = Hello, world!

The preceding two examples can be shown on the Scala shell, as follows:

Figure 9: Scala REPL example 2
"So it turns out that map yields different types depending on what the result type of the passed function argument is!"

- Odersky

Nested functions

Why will you require a nested functions support in your programming language? Most of the time, we want to maintain our methods to be a few lines and avoid overly large functions. A typical solution for this in Java would be to define all these small functions on a class level, but any other method could easily refer and access them even though they are helper methods. The situation is different in Scala, so you can use define functions inside each other, and this way, prevent any external access to these functions:

def sum(vector: List[Int]): Int = {
// Nested helper method (won't be accessed from outside this function
def helper(acc: Int, remaining: List[Int]): Int = remaining match {
case Nil => acc
case _ => helper(acc + remaining.head, remaining.tail)
}
// Call the nested method
helper(0, vector)
}

We are not expecting you to understand these code snippets, which show the difference between Scala and Java.

Import statements

In Java, you can only import packages at the top of your code file, right after the packages statement. The situation is not the same in Scala; you can write your import statements almost anywhere inside your source file (for example, you can even write your import statements inside a class or a method). You just need to pay attention to the scope of your import statement, because it inherits the same scope of the members of your class or local variables inside your method. The _ (underscore) in Scala is used for wildcard imports, which is similar to the * (asterisk) that you would use in java:

// Import everything from the package math 
import math._

You may also use these { } to indicate a set of imports from the same parent package, just in one line of code. In Java, you would use multiple lines of code to do so:

// Import math.sin and math.cos
import math.{sin, cos}

Unlike the Java, Scala does not have the concept of static imports. In other words, the concept of static doesn't exist in Scala. However, as a developer, obviously, you can import a member or more than one member of an object using a regular import statement. The preceding example already shows this, where we import the methods sin and cos from the package object named math. To demonstrate an example, the preceding code snippet can be defined from the Java programmer's perspective as follows:

import static java.lang.Math.sin;
import static java.lang.Math.cos;

Another beauty of Scala is that, in Scala, you can rename your imported packages as well. Alternatively, you can rename your imported packages to avoid the type conflicting with packages that have similar members. The following statement is valid in Scala:

// Import Scala.collection.mutable.Map as MutableMap 
import Scala.collection.mutable.{Map => MutableMap}

Finally, you may want to exclude a member of packages for collisions or other purposes. For this, you can use a wildcard to do so:

// Import everything from math, but hide cos 
import math.{cos => _, _}

Operators as methods

It's worth mentioning that Scala doesn't support the operator overloading. You might think that there are no operators at all in Scala.

An alternative syntax for calling a method taking a single parameter is the use of the infix syntax. The infix syntax provides you with a flavor just like you are applying an operator overloading, as like what you did in C++. For example:

val x = 45
val y = 75

In the following case, the + means a method in class Int. The following code is a non-conventional method calling syntax:

val add1 = x.+(y)

More formally, the same can be done using the infix syntax, as follows:

val add2 = x + y

Moreover, you can utilize the infix syntax. However, the method has only a single parameter, as follows:

val my_result = List(3, 6, 15, 34, 76) contains 5

There's one special case when using the infix syntax. That is, if the method name ends with a : (colon), then the invocation or call will be right associative. This means that the method is called on the right argument with the expression on the left as the argument, instead of the other way around. For example, the following is valid in Scala:

val my_list = List(3, 6, 15, 34, 76)

The preceding statement signifies that: my_list.+:(5) rather than 5.+:(my_list) and more formally:

val my_result = 5 +: my_list

Now, let's look at the preceding examples on Scala REPL:

scala> val my_list = 5 +: List(3, 6, 15, 34, 76)
my_list: List[Int] = List(5, 3, 6, 15, 34, 76)
scala> val my_result2 = 5+:my_list
my_result2: List[Int] = List(5, 5, 3, 6, 15, 34, 76)
scala> println(my_result2)
List(5, 5, 3, 6, 15, 34, 76)
scala>

In addition to the above, operators here are just methods, so that they can simply be overridden just like methods.

Methods and parameter lists

In Scala, a method can have multiple parameter lists or even no parameter list at all. On the other hand, in Java, a method always has one parameter list, with zero or more parameters. For example, in Scala, the following is the valid method definition (written in currie notation) where a method has two parameter lists:

def sum(x: Int)(y: Int) = x + y     

The preceding method cannot be written as:

def sum(x: Int, y: Int) = x + y

A method, let's say sum2, can have no parameter list at all, as follows:

def sum2 = sum(2) _

Now, you can call the method add2, which returns a function taking one parameter. Then, it calls that function with the argument 5, as follows:

val result = add2(5)

Methods inside methods

Sometimes, you would like to make your applications, code modular by avoiding too long and complex methods. Scala provides you this facility to avoid your methods becoming overly large so that you can split them up into several smaller methods.

On the other hand, Java allows you only to have the methods defined at class level. For example, suppose you have the following method definition:

def main_method(xs: List[Int]): Int = {
// This is the nested helper/auxiliary method
def auxiliary_method(accu: Int, rest: List[Int]): Int = rest match {
case Nil => accu
case _ => auxiliary_method(accu + rest.head, rest.tail)
}
}

Now, you can call the nested helper/auxiliary method as follows:

auxiliary_method(0, xs)

Considering the above, here's the complete code segment which is valid:

def main_method(xs: List[Int]): Int = {
// This is the nested helper/auxiliary method
def auxiliary_method(accu: Int, rest: List[Int]): Int = rest match {
case Nil => accu
case _ => auxiliary_method(accu + rest.head, rest.tail)
}
auxiliary_method(0, xs)
}

Constructor in Scala

One surprising thing about Scala is that the body of a Scala class is itself a constructor. However, Scala does so; in fact, in a more explicit way. After that, a new instance of that class is created and executed. Moreover, you can specify the arguments of the constructor in the class declaration line.

Consequently, the constructor arguments are accessible from all of the methods defined in that class. For example, the following class and constructor definition is valid in Scala:

class Hello(name: String) {
// Statement executed as part of the constructor
println("New instance with name: " + name)
// Method which accesses the constructor argument
def sayHello = println("Hello, " + name + "!")
}

The equivalent Java class would look like this:

public class Hello {
private final String name;
public Hello(String name) {
System.out.println("New instance with name: " + name);
this.name = name;
}
public void sayHello() {
System.out.println("Hello, " + name + "!");
}
}

Objects instead of static methods

As mentioned earlier, static does not exist in Scala. You cannot do static imports and neither can you cannot add static methods to classes. In Scala, when you define an object with the same name as the class and in the same source file, then the object is said to be the companion of that class. Functions that you define in this companion object of a class are like static methods of a class in Java:

class HelloCity(CityName: String) {
def sayHelloToCity = println("Hello, " + CityName + "!")
}

This is how you can define a companion object for the class hello:

object HelloCity { 
// Factory method
def apply(CityName: String) = new Hello(CityName)
}

The equivalent class in Java would look like this:

public class HelloCity { 
private final String CityName;
public HelloCity(String CityName) {
this.CityName = CityName;
}
public void sayHello() {
System.out.println("Hello, " + CityName + "!");
}
public static HelloCity apply(String CityName) {
return new Hello(CityName);
}
}

So, lot's of verbose in this simple class, isn't there? The apply method in Scala is treated in a different way, such that you can find a special shortcut syntax to call it. This is the familiar way of calling the method:

val hello1 = Hello.apply("Dublin")

Here's the shortcut syntax that is equivalent to the one earlier:

 val hello2 = Hello("Dublin")

Note that this only works if you used the apply method in your code because Scala treats methods that are named apply in this different way.

Traits

Scala provides a great functionality for you in order to extend and enrich your classes' behaviors. These traits are similar to the interface in which you define the function prototypes or signatures. So, with this, you can have mix-ins of functionality coming from different traits and, in this way, you enriched your classes' behavior. So, what's so good about traits in Scala? They enable the composition of classes from these traits, with traits being the building blocks. As always, let's look at in an example. This is how a conventional logging routine is set up in Java:

Note that, even though you can mix in any number of traits you want. Moreover, like Java, Scala does not have the support of multiple inheritances. However, in both Java and Scala, a subclass can only extend a single superclass. For example, in Java:

class SomeClass {
//First, to have to log for a class, you must initialize it
final static Logger log = LoggerFactory.getLogger(this.getClass());
...
//For logging to be efficient, you must always check, if logging level for current message is enabled
//BAD, you will waste execution time if the log level is an error, fatal, etc.
log.debug("Some debug message");
...
//GOOD, it saves execution time for something more useful
if (log.isDebugEnabled()) { log.debug("Some debug message"); }
//BUT looks clunky, and it's tiresome to write this construct every time you want to log something.
}

For a more detailed discussion, refer to this URL https://stackoverflow.com/questions/963492/in-log4j-does-checking-isdebugenabled-before-logging-improve-performance/963681#963681.

However, it's different with traits. It's very tiresome to always check for the log level being enabled. It would be good, if you could write this routine once and reuse it anywhere, in any class right away. Traits in Scala make this all possible. For example:

trait Logging {
lazy val log = LoggerFactory.getLogger(this.getClass.getName)
//Let's start with info level...
...
//Debug level here...
def debug() {
if (log.isDebugEnabled) log.info(s"${msg}")
}
def debug(msg: => Any, throwable: => Throwable) {
if (log.isDebugEnabled) log.info(s"${msg}", throwable)
}
...
//Repeat it for all log levels you want to use
}

If you look at the preceding code, you will see an example of using string starting with s. This way, Scala offers the mechanism to create strings from your data called String Interpolation.

String Interpolation, allows you to embed variable references directly in processed string literals. For example:
scala> val name = "John Breslin"
scala> println(s"Hello, $name") // Hello, John Breslin.

Now, we can get an efficient logging routine in a more conventional style as a reusable block. To enable logging for any class, we just mix in our Logging trait! Fantastic! Now that's all it takes to add a logging feature to your class:

class SomeClass extends Logging {
...
//With logging trait, no need for declaring a logger manually for every class
//And now, your logging routine is either efficient and doesn't litter the code!

log.debug("Some debug message")
...
}

It is even possible to mix-up multiple traits. For example, for the preceding trait (that is, Logging) you can keep extending in the following order:

trait Logging  {
override def toString = "Logging "
}
class A extends Logging {
override def toString = "A->" + super.toString
}
trait B extends Logging {
override def toString = "B->" + super.toString
}
trait C extends Logging {
override def toString = "C->" + super.toString
}
class D extends A with B with C {
override def toString = "D->" + super.toString
}

However, it is noted that a Scala class can extend multiple traits at once, but JVM classes can extend only one parent class.

Now, to invoke the above traits and classes, use new D() from Scala REPL, as shown in the following figure:

Figure 10: Mixing multiple traits

Everything has gone smoothly so far in this chapter. Now, let's move to a new section where we will discuss some topics for the beginner who wants to drive themselves into the realm of Scala programming.