Book Image

Spark Cookbook

By : Rishi Yadav
Book Image

Spark Cookbook

By: Rishi Yadav

Overview of this book

Table of Contents (19 chapters)
Spark Cookbook
Credits
About the Author
About the Reviewers
www.PacktPub.com
Preface
Index

Using Tachyon as an off-heap storage layer


Spark RDDs are a great way to store datasets in memory while ending up with multiple copies of the same data in different applications. Tachyon solves some of the challenges with Spark RDD management. A few of them are:

  • RDD only exists for the duration of the Spark application

  • The same process performs the compute and RDD in-memory storage; so, if a process crashes, in-memory storage also goes away

  • Different jobs cannot share an RDD even if they are for the same underlying data, for example, an HDFS block that leads to:

    • Slow writes to disk

    • Duplication of data in memory, higher memory footprint

  • If the output of one application needs to be shared with the other application, it's slow due to the replication in the disk

Tachyon provides an off-heap memory layer to solve these problems. This layer, being off-heap, is immune to process crashes and is also not subject to garbage collection. This also lets RDDs be shared across applications and outlive a specific job or session; in essence, one single copy of data resides in memory, as shown in the following figure:

How to do it...

  1. Let's download and compile Tachyon (Tachyon, by default, comes configured for Hadoop 1.0.4, so it needs to be compiled from sources for the right Hadoop version). Replace the version with the current version. The current version at the time of writing this book is 0.6.4:

    $ wget https://github.com/amplab/tachyon/archive/v<version>.zip
    
  2. Unarchive the source code:

    $ unzip  v-<version>.zip
    
  3. Remove the version from the tachyon source folder name for convenience:

    $ mv tachyon-<version> tachyon
    
  4. Change the directory to the tachyon folder:

    $ cd tachyon
    $ mvn -Dhadoop.version=2.4.0 clean package -DskipTests=true
    $ cd conf
    $ sudo mkdir -p /var/tachyon/journal
    $ sudo chown -R hduser:hduser /var/tachyon/journal
    $ sudo mkdir -p /var/tachyon/ramdisk
    $ sudo chown -R hduser:hduser /var/tachyon/ramdisk
    
    $ mv tachyon-env.sh.template tachyon-env.sh
    $ vi tachyon-env.sh
    
  5. Comment the following line:

    export TACHYON_UNDERFS_ADDRESS=$TACHYON_HOME/underfs
    
  6. Uncomment the following line:

    export TACHYON_UNDERFS_ADDRESS=hdfs://localhost:9000
    
  7. Change the following properties:

    -Dtachyon.master.journal.folder=/var/tachyon/journal/
    
    export TACHYON_RAM_FOLDER=/var/tachyon/ramdisk
    
    $ sudo mkdir -p /var/log/tachyon
    $ sudo chown -R hduser:hduser /var/log/tachyon
    $ vi log4j.properties
    
  8. Replace ${tachyon.home} with /var/log/tachyon.

  9. Create a new core-site.xml file in the conf directory:

    $ sudo vi core-site.xml
    <configuration>
    <property>
        <name>fs.tachyon.impl</name>
        <value>tachyon.hadoop.TFS</value>
      </property>
    </configuration>
    $ cd ~
    $ sudo mv tachyon /opt/infoobjects/
    $ sudo chown -R root:root /opt/infoobjects/tachyon
    $ sudo chmod -R 755 /opt/infoobjects/tachyon
    
  10. Add <tachyon home>/bin to the path:

    $ echo "export PATH=$PATH:/opt/infoobjects/tachyon/bin" >> /home/hduser/.bashrc
    
  11. Restart the shell and format Tachyon:

    $ tachyon format
    $ tachyon-start.sh local //you need to enter root password as RamFS needs to be formatted
    

    Tachyon's web interface is http://hostname:19999:

  12. Run the sample program to see whether Tachyon is running fine:

    $ tachyon runTest Basic CACHE_THROUGH
    
  13. You can stop Tachyon any time by running the following command:

    $ tachyon-stop.sh
    
  14. Run Spark on Tachyon:

    $ spark-shell
    scala> val words = sc.textFile("tachyon://localhost:19998/words")
    scala> words.count
    scala> words.saveAsTextFile("tachyon://localhost:19998/w2")
    scala> val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
    scala> import org.apache.spark.api.java._
    scala> person.persist(StorageLevels.OFF_HEAP)