Book Image

Storm Real-time Processing Cookbook

By : Quinton Anderson
Book Image

Storm Real-time Processing Cookbook

By: Quinton Anderson

Overview of this book

<p>Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!<br />Storm Real Time Processing Cookbook will have basic to advanced recipes on Storm for real-time computation.<br /><br />The book begins with setting up the development environment and then teaches log stream processing. This will be followed by real-time payments workflow, distributed RPC, integrating it with other software such as Hadoop and Apache Camel, and more.</p>
Table of Contents (16 chapters)
Storm Real-time Processing Cookbook
Credits
About the Author
About the Reviewers
www.packtpub.com
Preface
Index

Creating a "Hello World" topology


The "Hello World" topology, as with all "Hello World" applications, is of no real use to anyone, except to illustrate some really basic concepts. The "Hello World" topology will show how to create a Storm project including a simple spout and bolt, build it, and execute it in the local cluster mode.

How to do it…

  1. Create a new project folder and initialize your Git repository.

    mkdir HelloWorld
    cd HelloWorld
    git init
    
  2. We must then create the Maven project file as follows:

    vim pom.xml
    
  3. Using vim, or any other text editor, you need to create the basic XML tags and project metadata for the "Hello World" project.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>storm.cookbook</groupId>
      <artifactId>hello-world</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>hello-world</name>
      <url>https://bitbucket.org/[user]/hello-world</url>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
      
    </project>
  4. We then need to declare which Maven repositories we need to fetch our dependencies from. Add the following to the pom.xml file within the project tags:

    <repositories>
    
        <repository>
          <id>github-releases</id>
          <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
    
        <repository>
          <id>clojars.org</id>
          <url>http://clojars.org/repo</url>
        </repository>
    
        <repository>
          <id>twitter4j</id>
          <url>http://twitter4j.org/maven2</url>
        </repository>
    </repositories>

    Tip

    You can override these repositories using your .m2 and settings.xml files, the details of which are outside the scope of this book; however, this is extremely useful within development teams where dependency management is the key.

  5. We then need to declare our dependencies by adding them within the project tags:

    <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
    
        <dependency>
          <groupId>storm</groupId>
          <artifactId>storm</artifactId>
          <version>0.8.1</version>
          <!-- keep storm out of the jar-with-dependencies -->
          <scope>provided</scope>
        </dependency>
    
        <dependency>
          <groupId>com.googlecode.json-simple</groupId>
          <artifactId>json-simple</artifactId>
          <version>1.1</version>
        </dependency>
    
    </dependencies>
  6. Finally we need to add the build plugin definitions for Maven:

    <build>
      <plugins>
          <!-- 
          bind the maven-assembly-plugin to the package phase
          this will create a jar file without the Storm dependencies suitable for deployment to a cluster.
           -->
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass></mainClass>
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
    
        <plugin>
          <groupId>com.theoryinpractise</groupId>
          <artifactId>clojure-maven-plugin</artifactId>
          <version>1.3.8</version>
          <extensions>true</extensions>
          <configuration>
           <sourceDirectories>
            <sourceDirectory>src/clj</sourceDirectory>
           </sourceDirectories>
          </configuration>
          <executions>
          <execution>
              <id>compile</id>
              <phase>compile</phase>
              <goals>
                  <goal>compile</goal>
              </goals>
            </execution>
              <execution>
              <id>test</id>
              <phase>test</phase>
              <goals>
                 <goal>test</goal>
              </goals>
            </execution>
          </executions>
    
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <configuration>
            <source>1.6</source>
            <target>1.6</target>
          </configuration>
        </plugin>
      </plugins>
    </build>
  7. With the POM file complete, save it using the Esc + : + wq + Enter key sequence and complete the required folder structure for the Maven project:

    mkdir src
    cd src
    mkdir test
    mkdir main
    cd main
    mkdir java
    
  8. Then return to the project root folder and generate the Eclipse project files using the following:

    mvn eclipse:eclipse
    

    Tip

    The Eclipse project files are a generated artifact, much as a .class file, and should not be included in your Git checkins, especially since they contain client-machine-specific paths.

  9. You must now start your Eclipse environment and import the generated project files into the workspace:

  10. You must then create your first spout by creating a new class named HelloWorldSpout, which extends from BaseRichSpout and is located in the storm.cookbook package. Eclipse will generate a default spouts method for you. The spout will simply generate tuples based on random probability. Create the following member variables and construct the object:

    private SpoutOutputCollector collector;
      private int referenceRandom;
      private static final int MAX_RANDOM = 10;
      public HelloWorldSpout(){
        final Random rand = new Random();
        referenceRandom = rand.nextInt(MAX_RANDOM);
      }
  11. After construction, the Storm cluster will open the spout; provide the following implementation for the open method:

    public void open(Map conf, TopologyContext context,
          SpoutOutputCollector collector) {
        this.collector = collector;
      }
  12. The Storm cluster will repeatedly call the nextTuple method, which will do all the work of the spout. Provide the following implementation for the method:

    Utils.sleep(100);
        final Random rand = new Random();
        int instanceRandom = rand.nextInt(MAX_RANDOM);
        if(instanceRandom == referenceRandom){
          collector.emit(new Values("Hello World"));
        } else {
          collector.emit(new Values("Other Random Word"));
        }
  13. Finally, you need to tell the Storm cluster which fields this spout emits within the declareOutputFields method:

    declarer.declare(new Fields("sentence"));
  14. Once you have resolved all the required imports for the class, you need to create HelloWorldBolt. This class will consume the produced tuples and implement the required counting logic. Create the new class within the storm.cookbook package; it should extend the BaseRichBolt class. Declare a private member variable and provide the following implementation for the execute method, which does the work for this bolt:

    String test = input.getStringByField("sentence");
        if("Hello World".equals(test)){
          myCount++;
          System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount));
        }
  15. Finally, you need to bring the elements together and declare the Storm topology. Create a main class named HelloWorldTopology within the same package and provide the following main implementation:

    TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("randomHelloWorld", new 
                            HelloWorldSpout(), 10);
            builder.setBolt("HelloWorldBolt", new 
                           HelloWorldBolt(), 2)
             .shuffleGrouping("randomHelloWorld");
                    
            Config conf = new Config();
            conf.setDebug(true);
            
            if(args!=null && args.length > 0) {
                conf.setNumWorkers(3);
                
                StormSubmitter.submitTopology(args[0], conf, 
                 builder.createTopology());
            } else {
            
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", conf, 
                                      builder.createTopology());
                Utils.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();    
            }

    This will essentially set up the topology and submit it to either a local or remote Storm cluster, depending on the arguments passed to the main method.

  16. After you have resolved the compiler issues, you can execute the cluster by issuing the following command from the project's root folder:

    mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=storm.cookbook.HelloWorldTopology
    

How it works…

The following diagram describes the "Hello World" topology:

The spout essentially emits a stream containing one of the following two sentences:

  • Other Random Word

  • Hello World

Based on random probability, it works by generating a random number upon construction and then generates subsequent random numbers to test against the original member's variable value. When it matches, Hello World is emitted; during the remaining executions, the other random words are emitted.

The bolt simply matches and counts the instances of Hello World. In the current implementation, you will notice sequential increments being printed from the bolt. In order to scale this bolt, you simply need to increase the parallelism hint for the topology by updating the following line:

builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 3)
                .shuffleGrouping("randomHelloWorld");

The key parameter here is parallism_hint, which you can adjust upwards. If you execute the cluster again, you will then notice three separate counts that are printed independently and interweaved with each other.

Tip

You can scale a cluster after deployment by updating these hints using the Storm GUI or CLI; however, you can't change the topology structure without recompiling and redeploying the JAR. For the command-line option, please see the CLI documentation on the wiki available at the following link:

https://github.com/nathanmarz/storm/wiki/Command-line-client

It is important to ensure that your project dependencies are declared correctly within your POM. The Storm JARs must be declared with the provided scope; if not, they would be packaged into your JAR; this would result in duplicate class files on the classpath within a deployed node of the cluster. Note that Storm checks for this classpath duplication; it will fail to start if you have included Storm into your distribution.

Tip

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Open source versions of the code are maintained by the author at his Bitbucket account at https://bitbucket.org/qanderson.