Book Image

Apache Spark Graph Processing

Book Image

Apache Spark Graph Processing

Overview of this book

Table of Contents (16 chapters)
Apache Spark Graph Processing
Credits
Foreword
About the Author
About the Reviewer
www.PacktPub.com
Preface
Index

Getting started with GraphX


Now that we have installed Spark and experimented with the Spark shell, let's create our first graph in Spark by writing our code in the shell, and then building upon that code to develop and run a standalone program. We have three learning goals in this section:

  1. First, you will learn how to construct and explore graphs using the Spark Core and GraphX API through a concrete example.

  2. Second, you will review some important Scala programming features that are important to know when doing graph processing in Spark.

  3. Third, you will learn how to develop and run a standalone Spark application.

Building a tiny social network

Let's create a tiny social network and explore the relationships among the different people in the network. Again, the best way to learn Spark is inside the shell. Our workflow is therefore to first experiment in the shell and then migrate our code later into a standalone Spark application. Before launching the shell, make sure to change the current directory to $SPARKHOME.

First, we need to import the GraphX and RDD module, as shown, so that we can invoke its APIs with their shorter names:

scala> import org.apache.spark.graphx._
scala> import org.apache.spark.rdd.RDD

As said previously, SparkContext is the main point of entry into a Spark program and it is created automatically in the Spark shell. It also offers useful methods to create RDDs from local collections, to load data from a local or Hadoop file system into RDDs, and to save output data on disks.

Loading the data

In this example, we will work with two CSV files people.csv and links.csv, which are contained in the directory $SPARKHOME/data/. Let's type the following commands to load these files into Spark:

scala> val people = sc.textFile("./data/people.csv")
people: org.apache.spark.rdd.RDD[String] = ./data/people.csv MappedRDD[81] at textFile at <console>:33

scala> val links = sc.textFile("./data/links.csv")
links: org.apache.spark.rdd.RDD[String] = ./data/links.csv MappedRDD[83] at textFile at <console>:33

Loading the CSV files just gave us back two RDDs of strings. To create our graph, we need to parse these strings into two suitable collections of vertices and edges.

Note

It is important that your current directory inside the shell is $SPARKHOME. Otherwise, you get an error later because Spark cannot find the files.

The property graph

Before going further, let's introduce some key definitions and graph abstractions. In Spark, a graph is represented by a property graph, which is defined in the Graph class as:

class Graph[VD, ED] {
val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED,VD]
}

This means that the Graph class provides getters to access its vertices and its edges. These are later abstracted by the RDD subclasses VertexRDD[VD] and EdgeRDD[ED, VD]. Note that VD and ED here denote some Scala-type parameters of the classes VertexRDD, EdgeRDD, and Graph. These types of parameters can be primitive types, such as String, or also user-defined classes, such as the Person class, in our example of a social graph. It is important to note that the property graph in Spark is a directed multigraph. It means that the graph is permitted to have multiple edges between any pair of vertices. Moreover, each edge is directed and defines a unidirectional relationship. This is easy to grasp, for instance, in a Twitter graph where a user can follow another one but the converse does not need to be true. To model bidirectional links, such as a Facebook friendship, we need to define two edges between the nodes, and these edges should point in opposite directions. Additional properties about the relationship can be stored as an attribute of the edge.

Note

A property graph is a graph with user-defined objects attached to each vertex and edge. The classes of these objects describe the properties of the graph. This is done in practice by parameterizing the class Graph, VertexRDD, and EdgeRDD. Moreover, each edge of the graph defines a unidirectional relationship but multiple edges can exist between any pair of vertices.

Transforming RDDs to VertexRDD and EdgeRDD

Going back to our example, let's construct the graph in three steps, as follows:

  1. We define a case class Person, which has name and age as class parameters. Case classes are very useful when we need to do pattern matching on an object Person later on:

    case class Person(name: String, age: Int)
  2. Next, we are going to parse each line of the CSV texts inside people and links into new objects of type Person and Edge respectively, and collect the results in RDD[(VertexId, Person)] and RDD[Edge[String]]:

    val peopleRDD: RDD[(VertexId, Person)] = people map { line => 
      val row = line split ','
      (row(0).toInt, Person(row(1), row(2).toInt))
    }
    scala> type Connection = String
    scala> val linksRDD: RDD[Edge[Connection]] = links map {line => 
      val row = line split ','
      Edge(row(0).toInt, row(1).toInt, row(2))
    }
    

    Note

    To paste or write code in multiple lines in the shell:

    • Type the command :paste

    • Paste or write the given code

    • Evaluate the code by pressing the keys Cmd + D on Mac or Ctrl + D in Windows

    VertexId is simply a type alias for Long as defined in GraphX. In addition, the Edge class is defined in org.apache.spark.graphx.Edge as:

    class Edge(srcId: VertexId, dstId: VertexId, attr: ED)

    The class parameters srcId and dstId are the vertex IDs of the source and destination, which are linked by the edge. In our social network example, the link between two people is unidirectional and its property is described in the attr of type Connection. Note that we defined Connection as a type alias for String. For clarity, it often helps to give a meaningful name to the type parameter of Edge.

  3. Now, we can create our social graph and name it tinySocial using the factory method Graph(…):

    scala> val tinySocial: Graph[Person, Connection] = Graph(peopleRDD, linksRDD)
    tinySocial: org.apache.spark.graphx.Graph[Person,Connection] = org.apache.spark.graphx.impl.GraphImpl@128cd92a
    

There are two things to note about this constructor. I told you earlier that the member vertices and edges of the graph are instances of VertexRDD[VD] and EdgeRDD[ED,VD]. However, we passed RDD[(VertexId, Person)] and RDD[Edge[Connection]] into the above factory method Graph. How did that work? It worked because VertexRDD[VD] and EdgeRDD[ED,VD] are subclasses of RDD[(VertexId, Person)] and RDD[Edge[Connection]] respectively. In addition, VertexRDD[VD] adds the constraint that VertexID occurs only once. Basically, two people in our social network cannot have the same vertex ID. Furthermore, VertexRDD[VD] and EdgeRDD[ED,VD] provide several other operations to transform vertex and edge attributes. We will see more of these in later chapters.

Introducing graph operations

Finally, we are going to look at the vertices and edges in the network by accessing and collecting them:

scala> tinySocial.vertices.collect()
res: Array[(org.apache.spark.graphx.VertexId, Person)] = Array((4,Person(Dave,25)), (6,Person(Faith,21)), (8,Person(Harvey,47)), (2,Person(Bob,18)), (1,Person(Alice,20)), (3,Person(Charlie,30)), (7,Person(George,34)), (9,Person(Ivy,21)), (5,Person(Eve,30)))
scala> tinySocial.edges.collect()
res: Array[org.apache.spark.graphx.Edge[Connection]] = Array(Edge(1,2,friend), Edge(1,3,sister), Edge(2,4,brother), Edge(3,2,boss), Edge(4,5,client), Edge(1,9,friend), Edge(6,7,cousin), Edge(7,9,coworker), Edge(8,9,father))

We used the edges and vertices getters in the Graph class and used the RDD action collect to put the result into a local collection.

Now, suppose we want to print only the professional connections that are listed in the following profLinks list:

val profLinks: List[Connection] = List("Coworker", "Boss", "Employee","Client", "Supplier")

A bad way to arrive at the desired result is to filter the edges corresponding to professional connections, then loop through the filtered edges, extract the corresponding vertices' names, and print the connections between the source and destination vertices. We can write that method in the following code:

val profNetwork = 
tinySocial.edges.filter{ case Edge(_,_,link) => profLinks.contains(link)}
for {
  Edge(src, dst, link) <- profNetwork.collect() 
  srcName = (peopleRDD.filter{case (id, person) => id == src} first)._2.name
  dstName = (peopleRDD.filter{case (id, person) => id == dst} first)._2.name
} println(srcName + " is a " + link + " of " + dstName)

Charlie is a boss of Bob
Dave is a client of Eve
George is a coworker of Ivy

There are two problems with the preceding code. First, it could be more concise and expressive. Second, it is not efficient due to the filtering operations inside the for loop.

Luckily, there is a better alternative. The GraphX library provides two different ways to view data: either as a graph or as tables of edges, vertices, and triplets. For each view, the library offers a rich set operations whose implementations are optimized for execution. That means that we can often process a graph using a predefined graph operation or algorithm, easily. For instance, we could simplify the previous code and make it more efficient, as follows:

tinySocial.subgraph(profLinks contains _.attr).
     triplets.foreach(t => println(t.srcAttr.name + " is a " + t.attr + " of " + t.dstAttr.name))
  Charlie is a boss of Bob
  Dave is a client of Eve
  George is a coworker of Ivy

We simply used the subgraph operation to filter the professional links. Then, we used the triplet view to access the attributes of the edges and vertices simultaneously. In brief, the triplet operator returns an RDD of EdgeTriplet[Person, Connection]. Note that EdgeTriplet is simply an alias for the parameterized type of 3-tuple ((VertexId, Person), (VertexId, Person), Connection) that contains all the information about the source node, the destination node, and the edge property.

Building and submitting a standalone application

Let's conclude this chapter by developing and running a standalone Spark application for our social network example.

Writing and configuring a Spark program

Satisfied with our experiment in the shell, let's now write our first Spark program. Open your favorite text editor and create a new file named simpleGraph.scala and put it in the folder $SPARKHOME/exercises/chap1. A template for a Spark program looks like the following code:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
object SimpleGraphApp {
  def main(args: Array[String]){

    // Configure the program 
    val conf = new SparkConf()
          .setAppName("Tiny Social")
          .setMaster("local")
          .set("spark.driver.memory", "2G")
    val sc = new SparkContext(conf)

    // Load some data into RDDs
    val people = sc.textFile("./data/people.csv")
    val links = sc.textFile("./data/links.csv")
 
    // After that, we use the Spark API as in the shell
    // ...
  }
}

You can also see the entire code of our SimpleGraph.scala file in the example files, which you can download from the Packt website.

Tip

Downloading the example code

You can download the example code files from your account at http://www.packtpub.com for all the Packt Publishing books you have purchased. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Let's go over this code to understand what is required to create and configure a Spark standalone program in Scala.

As a Scala program, our Spark application should be constructed within a top-level Scala object, which must have a main function that has the signature: def main(args: Array[String]): Unit. In other words, the main program accepts an array of strings as a parameter and returns nothing. In our example, the top-level object is SimpleGraphApp.

At the beginning of simpleGraph.scala, we have put the following import statements:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

The first two lines import the SparkContext class as well as some implicit conversions defined in its companion object. It is not very important to know what the implicit conversions are. Just make sure you import both SparkContext and SparContext._

Note

When we worked in the Spark shell, SparkContext and SparContext._ were imported automatically for us.

The third line imports SparkConf, which is a wrapper class that contains the configuration settings of a Spark application, such as its application name, the memory size of each executor, and the address of the master or cluster manager.

Next, we have imported some RDD and GraphX-specific class constructors and operators with these lines:

import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._

The underscore after org.apache.spark.graphx makes sure that all public APIs in GraphX get imported.

Within main, we had to first configure the Spark program. To do this, we created an object called SparkConf and set the application settings through a chain of setter methods on the SparkConf object. SparkConf provides specific setters for some common properties, such as the application name or master. Alternatively, a generic set method can be used to set multiple properties together by passing them as a sequence of key-value pairs. The most common configuration parameters are listed in the following table with their default values and usage. The extensive list can be found at https://spark.apache.org/docs/latest/configuration.html:

Spark property name

Usage and default value

spark.app.name

This is the name of your application. This will appear in the UI and in the log data.

spark.master

This is the cluster manager to connect to, for example, spark://host:port, mesos://host:port, yarn, or local.

spark.executor.memory

This is the amount of memory to use per executor process, in the same format as JVM memory strings (for example, 512 M, 2 G). The default value is 1 G.

spark.driver.memory

When you run Spark locally with spark.master=local, your executor becomes the driver and you need to set this parameter instead of spark.executor.memory. The default value is 512 M.

spark.storage.memoryFraction

This is the fraction of Java heap to use for Spark's memory cache. The default is 0.6.

spark.serializer

This is the class used to serialize objects to be sent over the network or to be cached in serialized form. This is the subclass of the default class org.apache.spark.serializer.JavaSerializer.

In our example, we initialized the program as follows:

val conf = new SparkConf()
      .setAppName("Tiny Social")
      .setMaster("local")
      .set("spark.driver.memory", "2G")
val sc = new SparkContext(conf)

Precisely, we set the name of our application to "Tiny Social" and the master to be the local machine on which we submit the application. In addition, the driver memory is set to 2 GB. Should we have set the master to be a cluster instead of local, we can specify the memory per executor by setting spark.executor.memory instead of spark.driver.memory.

Note

In principle, the driver and executor have different roles and, in general, they run on different processes except when we set the master to be local. The driver is the process that compiles our program into tasks, schedules these tasks to one of more executors, and maintains the physical location of every RDD. Each executor is responsible for executing the tasks, and storing and caching RDDs in memory.

It is not mandatory to set the Spark application settings in the SparkConf object inside your program. Alternatively, when submitting our application, we could set these parameters as command-line options of the spark-submit tool. We will cover that part in detail in the following sections. In this case, we will just create our SparkContext object as:

val sc = new SparkContext(new SparkConf())

After configuring the program, the next task is to load the data that we want to process by calling utility methods such as sc.textFile on the SparkContext object sc:

val people = sc.textFile("./data/people.csv")
val links = sc.textFile("./data/links.csv")

Finally, the rest of the program consists of the same operations on RDDs and graphs that we have used in the shell.

Note

To avoid confusion when passing a relative file path to I/O actions such as sc.textFile(), the convention used in this book is that the current directory of the command line is always set to the project root folder. For instance, if our Tiny Social app's root folder is $SPARKHOME/exercises/chap1, then Spark will look for the data to be loaded in $SPARKHOME/exercises/chap1/data. This assumes that we have put the files in that data folder.

Building the program with the Scala Build Tool

After writing our entire program, we are going to build it using the Scala Build Tool (SBT). If you do not have SBT installed on your computer yet, you need to install it first. Detailed instructions on how to install SBT are available at http://www.scala-sbt.org/0.13/tutorial/index.html for most operating systems. While there are different ways to install SBT, I recommend using a package manager instead of the manual installation. After the installation, execute the following command to append the SBT installation folder to the PATH environment variable:

$ export PATH=$PATH:/usr/local/bin/sbtl

Once we have SBT properly installed, we can use it to build our application with all its dependencies inside a single JAR package file, also called uber jar. In fact, when running a Spark application on several worker machines, an error will occur if some machines do not have the right dependency JAR.

By packaging an uber jar with SBT, the application code and its dependencies are all distributed to the workers. Concretely, we need to create a build definition file in which we set the project settings. Moreover, we must specify the dependencies and the resolvers that help SBT find the packages that are needed by our program. A resolver indicates the name and location of the repository that has the required JAR file. Let's create a file called build.sbt in the project root folder $SPARKHOME/exercises/chap1 and insert the following lines:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1",
  "org.apache.spark" %% "spark-graphx" % "1.4.1"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

By convention, the settings are defined by Scala expressions and they need to be delimited by blank lines. Earlier, we set the project name, its version number, the version of Scala, as well as the Spark library dependencies. To build the program, we then enter the command:

$ sbt package

This will create a JAR file inside $SPARKHOME/exercises/chap1/target/scala-2.10/simple-project_2.10-1.0.jar.

Deploying and running with spark-submit

Finally, we are going to invoke the spark-submit script in $SPARKHOME/bin/ to run the program from the root directory $SPARKHOME/exercises/chap1 in the terminal:

$ ../../bin/spark-submit --class \ 
"SimpleGraphApp" \       
./target/scala-2.10/simple-project_2.10-1.0.jar 
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Charlie is a boss of Bob
Dave is a client of Eve
George is a coworker of Ivy

The required options for the spark-submit command are the Scala application object name and the JAR file that we previously built with SBT. In case we did not set the master setting when creating the SparkConf object, we also would have to specify the --master option in spark-submit.

Tip

You can list all the available options for the spark-submit script with the command:

spark-submit --help

More details about how to submit a Spark application to a remote cluster are available at http://spark.apache.org/docs/latest/submitting-applications.html.