Now that we have installed Spark and experimented with the Spark shell, let's create our first graph in Spark by writing our code in the shell, and then building upon that code to develop and run a standalone program. We have three learning goals in this section:
First, you will learn how to construct and explore graphs using the Spark Core and GraphX API through a concrete example.
Second, you will review some important Scala programming features that are important to know when doing graph processing in Spark.
Third, you will learn how to develop and run a standalone Spark application.
Let's create a tiny social network and explore the relationships among the different people in the network. Again, the best way to learn Spark is inside the shell. Our workflow is therefore to first experiment in the shell and then migrate our code later into a standalone Spark application. Before launching the shell, make sure to change the current directory to $SPARKHOME
.
First, we need to import the GraphX and RDD module, as shown, so that we can invoke its APIs with their shorter names:
scala> import org.apache.spark.graphx._ scala> import org.apache.spark.rdd.RDD
As said previously, SparkContext
is the main point of entry into a Spark program and it is created automatically in the Spark shell. It also offers useful methods to create RDDs from local collections, to load data from a local or Hadoop file system into RDDs, and to save output data on disks.
In this example, we will work with two CSV files people.csv
and links.csv
, which are contained in the directory $SPARKHOME/data/
. Let's type the following commands to load these files into Spark:
scala> val people = sc.textFile("./data/people.csv") people: org.apache.spark.rdd.RDD[String] = ./data/people.csv MappedRDD[81] at textFile at <console>:33 scala> val links = sc.textFile("./data/links.csv") links: org.apache.spark.rdd.RDD[String] = ./data/links.csv MappedRDD[83] at textFile at <console>:33
Loading the CSV files just gave us back two RDDs of strings. To create our graph, we need to parse these strings into two suitable collections of vertices and edges.
Before going further, let's introduce some key definitions and graph abstractions. In Spark, a graph is represented by a property graph, which is defined in the Graph
class as:
class Graph[VD, ED] { val vertices: VertexRDD[VD] val edges: EdgeRDD[ED,VD] }
This means that the Graph
class provides getters to access its vertices and its edges. These are later abstracted by the RDD subclasses VertexRDD[VD]
and EdgeRDD[ED, VD]
. Note that VD
and ED
here denote some Scala-type parameters of the classes VertexRDD
, EdgeRDD
, and Graph
. These types of parameters can be primitive types, such as String,
or also user-defined classes, such as the Person
class, in our example of a social graph. It is important to note that the property graph in Spark is a directed multigraph. It means that the graph is permitted to have multiple edges between any pair of vertices. Moreover, each edge is directed and defines a unidirectional relationship. This is easy to grasp, for instance, in a Twitter graph where a user can follow another one but the converse does not need to be true. To model bidirectional links, such as a Facebook friendship, we need to define two edges between the nodes, and these edges should point in opposite directions. Additional properties about the relationship can be stored as an attribute of the edge.
Note
A property graph is a graph with user-defined objects attached to each vertex and edge. The classes of these objects describe the properties of the graph. This is done in practice by parameterizing the class Graph
, VertexRDD
, and EdgeRDD
. Moreover, each edge of the graph defines a unidirectional relationship but multiple edges can exist between any pair of vertices.
Going back to our example, let's construct the graph in three steps, as follows:
We define a case class
Person
, which hasname
andage
as class parameters. Case classes are very useful when we need to do pattern matching on an objectPerson
later on:case class Person(name: String, age: Int)
Next, we are going to parse each line of the CSV texts inside people and links into new objects of type
Person
andEdge
respectively, and collect the results inRDD[(VertexId, Person)]
andRDD[Edge[String]]
:val peopleRDD: RDD[(VertexId, Person)] = people map { line => val row = line split ',' (row(0).toInt, Person(row(1), row(2).toInt)) } scala> type Connection = String scala> val linksRDD: RDD[Edge[Connection]] = links map {line => val row = line split ',' Edge(row(0).toInt, row(1).toInt, row(2)) }
Note
To paste or write code in multiple lines in the shell:
Type the command
:paste
Paste or write the given code
Evaluate the code by pressing the keys Cmd + D on Mac or Ctrl + D in Windows
VertexId
is simply a type alias forLong
as defined in GraphX. In addition, theEdge
class is defined inorg.apache.spark.graphx.Edge
as:class Edge(srcId: VertexId, dstId: VertexId, attr: ED)
The class parameters
srcId
anddstId
are the vertex IDs of the source and destination, which are linked by the edge. In our social network example, the link between two people is unidirectional and its property is described in theattr
of typeConnection
. Note that we definedConnection
as a type alias forString
. For clarity, it often helps to give a meaningful name to the type parameter ofEdge
.Now, we can create our social graph and name it
tinySocial
using the factory methodGraph(…)
:scala> val tinySocial: Graph[Person, Connection] = Graph(peopleRDD, linksRDD) tinySocial: org.apache.spark.graphx.Graph[Person,Connection] = org.apache.spark.graphx.impl.GraphImpl@128cd92a
There are two things to note about this constructor. I told you earlier that the member vertices and edges of the graph are instances of VertexRDD[VD]
and EdgeRDD[ED,VD]
. However, we passed RDD[(VertexId, Person)]
and RDD[Edge[Connection]]
into the above factory method Graph
. How did that work? It worked because VertexRDD[VD]
and EdgeRDD[ED,VD]
are subclasses of RDD[(VertexId, Person)]
and RDD[Edge[Connection]]
respectively. In addition, VertexRDD[VD]
adds the constraint that VertexID
occurs only once. Basically, two people in our social network cannot have the same vertex ID. Furthermore, VertexRDD[VD]
and EdgeRDD[ED,VD]
provide several other operations to transform vertex and edge attributes. We will see more of these in later chapters.
Finally, we are going to look at the vertices and edges in the network by accessing and collecting them:
scala> tinySocial.vertices.collect() res: Array[(org.apache.spark.graphx.VertexId, Person)] = Array((4,Person(Dave,25)), (6,Person(Faith,21)), (8,Person(Harvey,47)), (2,Person(Bob,18)), (1,Person(Alice,20)), (3,Person(Charlie,30)), (7,Person(George,34)), (9,Person(Ivy,21)), (5,Person(Eve,30))) scala> tinySocial.edges.collect() res: Array[org.apache.spark.graphx.Edge[Connection]] = Array(Edge(1,2,friend), Edge(1,3,sister), Edge(2,4,brother), Edge(3,2,boss), Edge(4,5,client), Edge(1,9,friend), Edge(6,7,cousin), Edge(7,9,coworker), Edge(8,9,father))
We used the edges
and vertices
getters in the Graph
class and used the RDD action collect
to put the result into a local collection.
Now, suppose we want to print only the professional connections that are listed in the following profLinks
list:
val profLinks: List[Connection] = List("Coworker", "Boss", "Employee","Client", "Supplier")
A bad way to arrive at the desired result is to filter the edges corresponding to professional connections, then loop through the filtered edges, extract the corresponding vertices' names, and print the connections between the source and destination vertices. We can write that method in the following code:
val profNetwork = tinySocial.edges.filter{ case Edge(_,_,link) => profLinks.contains(link)} for { Edge(src, dst, link) <- profNetwork.collect() srcName = (peopleRDD.filter{case (id, person) => id == src} first)._2.name dstName = (peopleRDD.filter{case (id, person) => id == dst} first)._2.name } println(srcName + " is a " + link + " of " + dstName) Charlie is a boss of Bob Dave is a client of Eve George is a coworker of Ivy
There are two problems with the preceding code. First, it could be more concise and expressive. Second, it is not efficient due to the filtering operations inside the for loop.
Luckily, there is a better alternative. The GraphX library provides two different ways to view data: either as a graph or as tables of edges, vertices, and triplets. For each view, the library offers a rich set operations whose implementations are optimized for execution. That means that we can often process a graph using a predefined graph operation or algorithm, easily. For instance, we could simplify the previous code and make it more efficient, as follows:
tinySocial.subgraph(profLinks contains _.attr). triplets.foreach(t => println(t.srcAttr.name + " is a " + t.attr + " of " + t.dstAttr.name)) Charlie is a boss of Bob Dave is a client of Eve George is a coworker of Ivy
We simply used the subgraph
operation to filter the professional links. Then, we used the triplet view to access the attributes of the edges and vertices simultaneously. In brief, the triplet operator returns an RDD of EdgeTriplet[Person, Connection]
. Note that EdgeTriplet
is simply an alias for the parameterized type of 3-tuple ((VertexId, Person), (VertexId, Person), Connection)
that contains all the information about the source node, the destination node, and the edge property.
Let's conclude this chapter by developing and running a standalone Spark application for our social network example.
Satisfied with our experiment in the shell, let's now write our first Spark program. Open your favorite text editor and create a new file named simpleGraph.scala
and put it in the folder $SPARKHOME/exercises/chap1
. A template for a Spark program looks like the following code:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.graphx._ object SimpleGraphApp { def main(args: Array[String]){ // Configure the program val conf = new SparkConf() .setAppName("Tiny Social") .setMaster("local") .set("spark.driver.memory", "2G") val sc = new SparkContext(conf) // Load some data into RDDs val people = sc.textFile("./data/people.csv") val links = sc.textFile("./data/links.csv") // After that, we use the Spark API as in the shell // ... } }
You can also see the entire code of our SimpleGraph.scala
file in the example files, which you can download from the Packt website.
Tip
Downloading the example code
You can download the example code files from your account at http://www.packtpub.com for all the Packt Publishing books you have purchased. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
Let's go over this code to understand what is required to create and configure a Spark standalone program in Scala.
As a Scala program, our Spark application should be constructed within a top-level Scala object, which must have a main
function that has the signature: def main(args: Array[String]): Unit
. In other words, the main program accepts an array of strings as a parameter and returns nothing. In our example, the top-level object is SimpleGraphApp
.
At the beginning of simpleGraph.scala
, we have put the following import statements:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf
The first two lines import the SparkContext
class as well as some implicit conversions defined in its companion object. It is not very important to know what the implicit conversions are. Just make sure you import both SparkContext
and SparContext._
Note
When we worked in the Spark shell, SparkContext
and SparContext._
were imported automatically for us.
The third line imports SparkConf
, which is a wrapper class that contains the configuration settings of a Spark application, such as its application name, the memory size of each executor, and the address of the master or cluster manager.
Next, we have imported some RDD and GraphX-specific class constructors and operators with these lines:
import org.apache.spark.rdd.RDD import org.apache.spark.graphx._
The underscore after org.apache.spark.graphx
makes sure that all public APIs in GraphX get imported.
Within main
, we had to first configure the Spark program. To do this, we created an object called SparkConf
and set the application settings through a chain of setter methods on the SparkConf
object. SparkConf
provides specific setters for some common properties, such as the application name or master. Alternatively, a generic set
method can be used to set multiple properties together by passing them as a sequence of key-value pairs. The most common configuration parameters are listed in the following table with their default values and usage. The extensive list can be found at https://spark.apache.org/docs/latest/configuration.html:
In our example, we initialized the program as follows:
val conf = new SparkConf() .setAppName("Tiny Social") .setMaster("local") .set("spark.driver.memory", "2G") val sc = new SparkContext(conf)
Precisely, we set the name of our application to "Tiny Social"
and the master to be the local machine on which we submit the application. In addition, the driver memory is set to 2 GB. Should we have set the master to be a cluster instead of local, we can specify the memory per executor by setting spark.executor.memory
instead of spark.driver.memory
.
Note
In principle, the driver and executor have different roles and, in general, they run on different processes except when we set the master to be local. The driver is the process that compiles our program into tasks, schedules these tasks to one of more executors, and maintains the physical location of every RDD. Each executor is responsible for executing the tasks, and storing and caching RDDs in memory.
It is not mandatory to set the Spark application settings in the SparkConf
object inside your program. Alternatively, when submitting our application, we could set these parameters as command-line options of the spark-submit
tool. We will cover that part in detail in the following sections. In this case, we will just create our SparkContext
object as:
val sc = new SparkContext(new SparkConf())
After configuring the program, the next task is to load the data that we want to process by calling utility methods such as sc.textFile
on the SparkContext
object sc
:
val people = sc.textFile("./data/people.csv") val links = sc.textFile("./data/links.csv")
Finally, the rest of the program consists of the same operations on RDDs and graphs that we have used in the shell.
Note
To avoid confusion when passing a relative file path to I/O actions such as sc.textFile()
, the convention used in this book is that the current directory of the command line is always set to the project root folder. For instance, if our Tiny Social app's root folder is $SPARKHOME/exercises/chap1
, then Spark will look for the data to be loaded in $SPARKHOME/exercises/chap1/data
. This assumes that we have put the files in that data
folder.
After writing our entire program, we are going to build it using the Scala Build Tool (SBT). If you do not have SBT installed on your computer yet, you need to install it first. Detailed instructions on how to install SBT are available at http://www.scala-sbt.org/0.13/tutorial/index.html for most operating systems. While there are different ways to install SBT, I recommend using a package manager instead of the manual installation. After the installation, execute the following command to append the SBT installation folder to the PATH
environment variable:
$ export PATH=$PATH:/usr/local/bin/sbtl
Once we have SBT properly installed, we can use it to build our application with all its dependencies inside a single JAR package file, also called uber jar. In fact, when running a Spark application on several worker machines, an error will occur if some machines do not have the right dependency JAR.
By packaging an uber jar with SBT, the application code and its dependencies are all distributed to the workers. Concretely, we need to create a build definition file in which we set the project settings. Moreover, we must specify the dependencies and the resolvers that help SBT find the packages that are needed by our program. A resolver indicates the name and location of the repository that has the required JAR file. Let's create a file called build.sbt
in the project root folder $SPARKHOME/exercises/chap1
and insert the following lines:
name := "Simple Project" version := "1.0" scalaVersion := "2.10.4" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.4.1", "org.apache.spark" %% "spark-graphx" % "1.4.1" ) resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
By convention, the settings are defined by Scala expressions and they need to be delimited by blank lines. Earlier, we set the project name, its version number, the version of Scala, as well as the Spark library dependencies. To build the program, we then enter the command:
$ sbt package
This will create a JAR file inside $SPARKHOME/exercises/chap1/target/scala-2.10/simple-project_2.10-1.0.jar
.
Finally, we are going to invoke the spark-submit
script in
$SPARKHOME/bin/
to run the program from the root directory $SPARKHOME/exercises/chap1
in the terminal:
$ ../../bin/spark-submit --class \ "SimpleGraphApp" \ ./target/scala-2.10/simple-project_2.10-1.0.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath Charlie is a boss of Bob Dave is a client of Eve George is a coworker of Ivy
The required options for the spark-submit
command are the Scala application object name and the JAR file that we previously built with SBT. In case we did not set the master setting when creating the SparkConf
object, we also would have to specify the --master
option in spark-submit
.
Tip
You can list all the available options for the spark-submit
script with the command:
spark-submit --help
More details about how to submit a Spark application to a remote cluster are available at http://spark.apache.org/docs/latest/submitting-applications.html.