Book Image

Storm Blueprints: Patterns for Distributed Real-time Computation

Book Image

Storm Blueprints: Patterns for Distributed Real-time Computation

Overview of this book

Table of Contents (17 chapters)
Storm Blueprints: Patterns for Distributed Real-time Computation
Credits
About the Authors
About the Reviewers
www.PacktPub.com
Preface
Index

Implementing the word count topology


Now that we've introduced the basic Storm concepts, we're ready to start developing a simple application. For now, we'll be developing and running a Storm topology in local mode. Storm's local mode simulates a Storm cluster within a single JVM instance, making it easy to develop and debug Storm topologies in a local development environment or IDE. In later chapters, we'll show you how to take Storm topologies developed in local mode and deploy them to a fully clustered environment.

Setting up a development environment

Creating a new Storm project is just a matter of adding the Storm library and its dependencies to the Java classpath. However, as you'll learn in Chapter 2, Configuring Storm Clusters, deploying a Storm topology to a clustered environment requires special packaging of your compiled classes and dependencies. For this reason, it is highly recommended that you use a build management tool such as Apache Maven, Gradle, or Leinengen. For the distributed word count example, we will use Maven.

Let's begin by creating a new Maven project:

$ mvn archetype:create -DgroupId=storm.blueprints 
-DartifactId=Chapter1 -DpackageName=storm.blueprints.chapter1.v1

Next, edit the pom.xml file and add the Storm dependency:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.9.1-incubating</version>
</dependency>

Then, test the Maven configuration by building the project with the following command:

$ mvn install

Note

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/ support and register to have the files e-mailed directly to you.

Maven will download the Storm library and all its dependencies. With the project set up, we're now ready to begin writing our Storm application.

Implementing the sentence spout

To keep things simple, our SentenceSpout implementation will simulate a data source by creating a static list of sentences that gets iterated. Each sentence is emitted as a single field tuple. The complete spout implementation is listed in Example 1.1.

Example 1.1: SentenceSpout.java

public class SentenceSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private String[] sentences = {
        "my dog has fleas",
        "i like cold beverages",
        "the dog ate my homework",
        "don't have a cow man",
        "i don't think i like fleas"
    };
    private int index = 0;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void open(Map config, TopologyContext context, 
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        Utils.waitForMillis(1);
    }
}

The BaseRichSpout class is a convenient implementation of the ISpout and IComponent interfaces and provides default implementations for methods we don't need in this example. Using this class allows us to focus only on the methods we need.

The declareOutputFields() method is defined in the IComponent interface that all Storm components (spouts and bolts) must implement and is used to tell Storm what streams a component will emit and the fields each stream's tuples will contain. In this case, we're declaring that our spout will emit a single (default) stream of tuples containing a single field ("sentence").

The open() method is defined in the ISpout interface and is called whenever a spout component is initialized. The open() method takes three parameters: a map containing the Storm configuration, a TopologyContext object that provides information about a components placed in a topology, and a SpoutOutputCollector object that provides methods for emitting tuples. In this example, we don't need to perform much in terms of initialization, so the open() implementation simply stores a reference to the SpoutOutputCollector object in an instance variable.

The nextTuple() method represents the core of any spout implementation. Storm calls this method to request that the spout emit tuples to the output collector. Here, we just emit the sentence at the current index, and increment the index.

Implementing the split sentence bolt

The SplitSentenceBolt implementation is listed in Example 1.2.

Example 1.2 – SplitSentenceBolt.java

public class SplitSentenceBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map config, TopologyContext context,
 OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

The BaseRichBolt class is another convenience class that implements both the IComponent and IBolt interfaces. Extending this class frees us from having to implement methods we're not concerned with and lets us focus on the functionality we need.

The prepare() method defined by the IBolt interface is analogous to the open() method of ISpout. This is where you would prepare resources such as database connections during bolt initialization. Like the SentenceSpout class, the SplitSentenceBolt class does not require much in terms of initialization, so the prepare() method simply saves a reference to the OutputCollector object.

In the declareOutputFields() method, the SplitSentenceBolt class declares a single stream of tuples, each containing one field ("word").

The core functionality of the SplitSentenceBolt class is contained in the execute() method defined by IBolt. This method is called every time the bolt receives a tuple from a stream to which it subscribes. In this case, it looks up the value of the "sentence" field of the incoming tuple as a string, splits the value into individual words, and emits a new tuple for each word.

Implementing the word count bolt

The WordCountBolt class (Example 1.3) is the topology component that actually maintains the word count. In the bolt's prepare() method, we instantiate an instance of HashMap<String, Long> that will store all the words and their corresponding counts. It is common practice to instantiate most instance variables in the prepare() method. The reason behind this pattern lies in the fact that when a topology is deployed, its component spouts and bolts are serialized and sent across the network. If a spout or bolt has any non-serializable instance variables instantiated before serialization (created in the constructor, for example) a NotSerializableException will be thrown and the topology will fail to deploy. In this case, since HashMap<String, Long> is serializable, we could have safely instantiated it in the constructor. However, in general, it is best to limit constructor arguments to primitives and serializable objects and instantiate non-serializable objects in the prepare() method.

In the declareOutputFields() method, the WordCountBolt class declares a stream of tuples that will contain both the word received and the corresponding count. In the execute() method, we look up the count for the word received (initializing it to 0 if necessary), increment and store the count, and then emit a new tuple consisting of the word and current count. Emitting the count as a stream allows other bolts in the topology to subscribe to the stream and perform additional processing.

Example 1.3 – WordCountBolt.java

public class WordCountBolt extends BaseRichBolt{
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    public void prepare(Map config, TopologyContext context, 
            OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if(count == null){
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

Implementing the report bolt

The purpose of the ReportBolt class is to produce a report of the counts for each word. Like the WordCountBolt class, it uses a HashMap<String, Long> object to record the counts, but in this case, it just stores the count received from the counter bolt.

One difference between the report bolt and the other bolts we've written so far is that it is a terminal bolt—it only receives tuples. Because it does not emit any streams, the declareOutputFields() method is left empty.

The report bolt also introduces the cleanup() method defined in the IBolt interface. Storm calls this method when a bolt is about to be shutdown. We exploit the cleanup() method here as a convenient way to output our final counts when the topology shuts down, but typically, the cleanup() method is used to release resources used by a bolt, such as open files or database connections.

One important thing to keep in mind about the IBolt.cleanup() method when writing bolts is that there is no guarantee that Storm will call it when a topology is running on a cluster. We'll discuss the reasons behind this when we talk about Storm's fault tolerance mechanisms in the next chapter. But for this example, we'll be running Storm in a development mode where the cleanup() method is guaranteed to be called.

The full source for the ReportBolt class is listed in Example 1.4.

Example 1.4 – ReportBolt.java

public class ReportBolt extends BaseRichBolt {

    private HashMap<String, Long> counts = null;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = tuple.getLongByField("count");
        this.counts.put(word, count);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // this bolt does not emit anything
    }

    public void cleanup() {
        System.out.println("--- FINAL COUNTS ---");
        List<String> keys = new ArrayList<String>();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.counts.get(key));
        }
        System.out.println("--------------");
    }
}

Implementing the word count topology

Now that we've defined the spout and bolts that will make up our computation, we're ready to wire them together into a runnable topology (refer to Example 1.5).

Example 1.5 – WordCountTopology.java

public class WordCountTopology {

    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();


        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout);
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt(SPLIT_BOLT_ID, splitBolt)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
        // SplitSentenceBolt --> WordCountBolt
        builder.setBolt(COUNT_BOLT_ID, countBolt)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        // WordCountBolt --> ReportBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt)
                .globalGrouping(COUNT_BOLT_ID);

        Config config = new Config();

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        waitForSeconds(10);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

Storm topologies are typically defined and run (or submitted if the topology is being deployed to a cluster) in a Java main() method. In this example, we begin by defining string constants that will serve as unique identifiers for our Storm components. We begin the main() method by instantiating our spout and bolts and creating an instance of TopologyBuilder. The TopologyBuilder class provides a fluent-style API for defining the data flow between components in a topology. We start by registering the sentence spout and assigning it a unique ID:

builder.setSpout(SENTENCE_SPOUT_ID, spout);

The next step is to register SplitSentenceBolt and establish a subscription to the stream emitted by the SentenceSpout class:

builder.setBolt(SPLIT_BOLT_ID, splitBolt)
                .shuffleGrouping(SENTENCE_SPOUT_ID);

The setBolt() method registers a bolt with the TopologyBuilder class and returns an instance of BoltDeclarer that exposes methods for defining the input source(s) for a bolt. Here we pass in the unique ID we defined for the SentenceSpout object to the shuffleGrouping() method establishing the relationship. The shuffleGrouping() method tells Storm to shuffle tuples emitted by the SentenceSpout class and distribute them evenly among instances of the SplitSentenceBolt object. We will explain stream groupings in detail shortly in our discussion of parallelism in Storm.

The next line establishes the connection between the SplitSentenceBolt class and the WordCountBolt class:

builder.setBolt(COUNT_BOLT_ID, countBolt)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

As you'll learn, there are times when it's imperative that tuples containing certain data get routed to a specific instance of a bolt. Here, we use the fieldsGrouping() method of the BoltDeclarer class to ensure that all tuples containing the same "word" value get routed to the same WordCountBolt instance.

The last step in defining our data flow is to route the stream of tuples emitted by the WordCountBolt instance to the ReportBolt class. In this case, we want all tuples emitted by WordCountBolt routed to a single ReportBolt task. This behavior is provided by the globalGrouping() method, as follows:

builder.setBolt(REPORT_BOLT_ID, reportBolt)
                .globalGrouping(COUNT_BOLT_ID);

With our data flow defined, the final step in running our word count computation is to build the topology and submit it to a cluster:

Config config = new Config();

LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        waitForSeconds(10);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();

Here, we're running Storm in local mode using Storm's LocalCluster class to simulate a full-blown Storm cluster within our local development environment. Local mode is a convenient way to develop and test Storm applications without the overhead of deploying to a distributed cluster. Local mode also allows you to run Storm topologies within an IDE, setting breakpoints, halting execution, inspecting variables and profiling the application in ways that are much more time consuming or near impossible when deploying to a Storm cluster.

In this example, we create a LocalCluster instance and call the submitTopology() method with the topology name, an instance of backtype.storm.Config, and the Topology object returned by the TopologyBuilder class' createTopology() method. As you'll see in the next chapter, the submitTopology() method used to deploy a topology in local mode has the same signature as the method to deploy a topology in remote (distributed) mode.

Storm's Config class is simply an extension of HashMap<String, Object>, which defines a number of Storm-specific constants and convenience methods for configuring a topology's runtime behavior. When a topology is submitted, Storm will merge its predefined default configuration values with the contents of the Config instance passed to the submitTopology() method, and the result will be passed to the open() and prepare() methods of the topology spouts and bolts respectively. In this sense, the Config object represents a set of configuration parameters that are global to all components in a topology.

We're now ready to run the WordCountTopology class. The main() method will submit the topology, wait for ten seconds while it runs, kill (undeploy) the topology, and finally shut down the local cluster. When the program run is complete, you should see console output similar to the following:

--- FINAL COUNTS ---
a : 1426
ate : 1426
beverages : 1426
cold : 1426
cow : 1426
dog : 2852
don't : 2851
fleas : 2851
has : 1426
have : 1426
homework : 1426
i : 4276
like : 2851
man : 1426
my : 2852
the : 1426
think : 1425
--------------