Now that we've introduced the basic Storm concepts, we're ready to start developing a simple application. For now, we'll be developing and running a Storm topology in local mode. Storm's local mode simulates a Storm cluster within a single JVM instance, making it easy to develop and debug Storm topologies in a local development environment or IDE. In later chapters, we'll show you how to take Storm topologies developed in local mode and deploy them to a fully clustered environment.
Creating a new Storm project is just a matter of adding the Storm library and its dependencies to the Java classpath. However, as you'll learn in Chapter 2, Configuring Storm Clusters, deploying a Storm topology to a clustered environment requires special packaging of your compiled classes and dependencies. For this reason, it is highly recommended that you use a build management tool such as Apache Maven, Gradle, or Leinengen. For the distributed word count example, we will use Maven.
Let's begin by creating a new Maven project:
$ mvn archetype:create -DgroupId=storm.blueprints -DartifactId=Chapter1 -DpackageName=storm.blueprints.chapter1.v1
Next, edit the pom.xml
file and add the Storm dependency:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.1-incubating</version> </dependency>
Then, test the Maven configuration by building the project with the following command:
$ mvn install
Note
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/ support and register to have the files e-mailed directly to you.
Maven will download the Storm library and all its dependencies. With the project set up, we're now ready to begin writing our Storm application.
To keep things simple, our SentenceSpout
implementation will simulate a data source by creating a static list of sentences that gets iterated. Each sentence is emitted as a single field tuple. The complete spout implementation is listed in Example 1.1.
Example 1.1: SentenceSpout.java
public class SentenceSpout extends BaseRichSpout { private SpoutOutputCollector collector; private String[] sentences = { "my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't have a cow man", "i don't think i like fleas" }; private int index = 0; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void open(Map config, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { this.collector.emit(new Values(sentences[index])); index++; if (index >= sentences.length) { index = 0; } Utils.waitForMillis(1); } }
The BaseRichSpout
class is a convenient implementation of the ISpout
and IComponent
interfaces and provides default implementations for methods we don't need in this example. Using this class allows us to focus only on the methods we need.
The declareOutputFields()
method is defined in the IComponent
interface that all Storm components (spouts and bolts) must implement and is used to tell Storm what streams a component will emit and the fields each stream's tuples will contain. In this case, we're declaring that our spout will emit a single (default) stream of tuples containing a single field ("sentence"
).
The open()
method is defined in the ISpout
interface and is called whenever a spout component is initialized. The open()
method takes three parameters: a map containing the Storm configuration, a TopologyContext
object that provides information about a components placed in a topology, and a SpoutOutputCollector
object that provides methods for emitting tuples. In this example, we don't need to perform much in terms of initialization, so the open()
implementation simply stores a reference to the SpoutOutputCollector
object in an instance variable.
The nextTuple()
method represents the core of any spout implementation. Storm calls this method to request that the spout emit tuples to the output collector. Here, we just emit the sentence at the current index, and increment the index.
The SplitSentenceBolt
implementation is listed in Example 1.2.
Example 1.2 – SplitSentenceBolt.java
public class SplitSentenceBolt extends BaseRichBolt{ private OutputCollector collector; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ this.collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
The BaseRichBolt
class is another convenience class that implements both the IComponent
and IBolt
interfaces. Extending this class frees us from having to implement methods we're not concerned with and lets us focus on the functionality we need.
The prepare()
method defined by the IBolt
interface is analogous to the open()
method of ISpout
. This is where you would prepare resources such as database connections during bolt initialization. Like the SentenceSpout
class, the SplitSentenceBolt
class does not require much in terms of initialization, so the prepare()
method simply saves a reference to the OutputCollector
object.
In the declareOutputFields()
method, the SplitSentenceBolt
class declares a single stream of tuples, each containing one field ("word"
).
The core functionality of the SplitSentenceBolt
class is contained in the execute()
method defined by IBolt
. This method is called every time the bolt receives a tuple from a stream to which it subscribes. In this case, it looks up the value of the "sentence"
field of the incoming tuple as a string, splits the value into individual words, and emits a new tuple for each word.
The WordCountBolt
class (Example 1.3) is the topology component that actually maintains the word count. In the bolt's prepare()
method, we instantiate an instance of HashMap<String, Long>
that will store all the words and their corresponding counts. It is common practice to instantiate most instance variables in the prepare()
method. The reason behind this pattern lies in the fact that when a topology is deployed, its component spouts and bolts are serialized and sent across the network. If a spout or bolt has any non-serializable instance variables instantiated before serialization (created in the constructor, for example) a NotSerializableException
will be thrown and the topology will fail to deploy. In this case, since HashMap<String, Long>
is serializable, we could have safely instantiated it in the constructor. However, in general, it is best to limit constructor arguments to primitives and serializable objects and instantiate non-serializable objects in the prepare()
method.
In the declareOutputFields()
method, the WordCountBolt
class declares a stream of tuples that will contain both the word received and the corresponding count. In the execute()
method, we look up the count for the word received (initializing it to 0
if necessary), increment and store the count, and then emit a new tuple consisting of the word and current count. Emitting the count as a stream allows other bolts in the topology to subscribe to the stream and perform additional processing.
Example 1.3 – WordCountBolt.java
public class WordCountBolt extends BaseRichBolt{ private OutputCollector collector; private HashMap<String, Long> counts = null; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap<String, Long>(); } public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if(count == null){ count = 0L; } count++; this.counts.put(word, count); this.collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
The purpose of the ReportBolt
class is to produce a report of the counts for each word. Like the WordCountBolt
class, it uses a HashMap<String, Long>
object to record the counts, but in this case, it just stores the count received from the counter bolt.
One difference between the report bolt and the other bolts we've written so far is that it is a terminal bolt—it only receives tuples. Because it does not emit any streams, the declareOutputFields()
method is left empty.
The report bolt also introduces the cleanup()
method defined in the IBolt
interface. Storm calls this method when a bolt is about to be shutdown. We exploit the cleanup()
method here as a convenient way to output our final counts when the topology shuts down, but typically, the cleanup()
method is used to release resources used by a bolt, such as open files or database connections.
One important thing to keep in mind about the IBolt.cleanup()
method when writing bolts is that there is no guarantee that Storm will call it when a topology is running on a cluster. We'll discuss the reasons behind this when we talk about Storm's fault tolerance mechanisms in the next chapter. But for this example, we'll be running Storm in a development mode where the cleanup()
method is guaranteed to be called.
The full source for the ReportBolt
class is listed in Example 1.4.
Example 1.4 – ReportBolt.java
public class ReportBolt extends BaseRichBolt { private HashMap<String, Long> counts = null; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.counts = new HashMap<String, Long>(); } public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = tuple.getLongByField("count"); this.counts.put(word, count); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // this bolt does not emit anything } public void cleanup() { System.out.println("--- FINAL COUNTS ---"); List<String> keys = new ArrayList<String>(); keys.addAll(this.counts.keySet()); Collections.sort(keys); for (String key : keys) { System.out.println(key + " : " + this.counts.get(key)); } System.out.println("--------------"); } }
Now that we've defined the spout and bolts that will make up our computation, we're ready to wire them together into a runnable topology (refer to Example 1.5).
Example 1.5 – WordCountTopology.java
public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt splitBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout); // SentenceSpout --> SplitSentenceBolt builder.setBolt(SPLIT_BOLT_ID, splitBolt) .shuffleGrouping(SENTENCE_SPOUT_ID); // SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt) .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); // WordCountBolt --> ReportBolt builder.setBolt(REPORT_BOLT_ID, reportBolt) .globalGrouping(COUNT_BOLT_ID); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); waitForSeconds(10); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }
Storm topologies are typically defined and run (or submitted if the topology is being deployed to a cluster) in a Java main()
method. In this example, we begin by defining string constants that will serve as unique identifiers for our Storm components. We begin the main()
method by instantiating our spout and bolts and creating an instance of TopologyBuilder
. The TopologyBuilder
class provides a fluent-style API for defining the data flow between components in a topology. We start by registering the sentence spout and assigning it a unique ID:
builder.setSpout(SENTENCE_SPOUT_ID, spout);
The next step is to register SplitSentenceBolt
and establish a subscription to the stream emitted by the SentenceSpout
class:
builder.setBolt(SPLIT_BOLT_ID, splitBolt) .shuffleGrouping(SENTENCE_SPOUT_ID);
The setBolt()
method registers a bolt with the TopologyBuilder
class and returns an instance of BoltDeclarer
that exposes methods for defining the input source(s) for a bolt. Here we pass in the unique ID we defined for the SentenceSpout
object to the shuffleGrouping()
method establishing the relationship. The shuffleGrouping()
method tells Storm to shuffle tuples emitted by the SentenceSpout
class and distribute them evenly among instances of the SplitSentenceBolt
object. We will explain stream groupings in detail shortly in our discussion of parallelism in Storm.
The next line establishes the connection between the SplitSentenceBolt
class and the WordCountBolt
class:
builder.setBolt(COUNT_BOLT_ID, countBolt) .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
As you'll learn, there are times when it's imperative that tuples containing certain data get routed to a specific instance of a bolt. Here, we use the fieldsGrouping()
method of the BoltDeclarer
class to ensure that all tuples containing the same "word"
value get routed to the same WordCountBolt
instance.
The last step in defining our data flow is to route the stream of tuples emitted by the WordCountBolt
instance to the ReportBolt
class. In this case, we want all tuples emitted by WordCountBolt
routed to a single ReportBolt
task. This behavior is provided by the globalGrouping()
method, as follows:
builder.setBolt(REPORT_BOLT_ID, reportBolt) .globalGrouping(COUNT_BOLT_ID);
With our data flow defined, the final step in running our word count computation is to build the topology and submit it to a cluster:
Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); waitForSeconds(10); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown();
Here, we're running Storm in local mode using Storm's LocalCluster
class to simulate a full-blown Storm cluster within our local development environment. Local mode is a convenient way to develop and test Storm applications without the overhead of deploying to a distributed cluster. Local mode also allows you to run Storm topologies within an IDE, setting breakpoints, halting execution, inspecting variables and profiling the application in ways that are much more time consuming or near impossible when deploying to a Storm cluster.
In this example, we create a LocalCluster
instance and call the submitTopology()
method with the topology name, an instance of backtype.storm.Config
, and the Topology
object returned by the TopologyBuilder
class' createTopology()
method. As you'll see in the next chapter, the submitTopology()
method used to deploy a topology in local mode has the same signature as the method to deploy a topology in remote (distributed) mode.
Storm's Config
class is simply an extension of HashMap<String, Object>
, which defines a number of Storm-specific constants and convenience methods for configuring a topology's runtime behavior. When a topology is submitted, Storm will merge its predefined default configuration values with the contents of the Config
instance passed to the submitTopology()
method, and the result will be passed to the open()
and prepare()
methods of the topology spouts and bolts respectively. In this sense, the Config
object represents a set of configuration parameters that are global to all components in a topology.
We're now ready to run the WordCountTopology
class. The main()
method will submit the topology, wait for ten seconds while it runs, kill (undeploy) the topology, and finally shut down the local cluster. When the program run is complete, you should see console output similar to the following:
--- FINAL COUNTS --- a : 1426 ate : 1426 beverages : 1426 cold : 1426 cow : 1426 dog : 2852 don't : 2851 fleas : 2851 has : 1426 have : 1426 homework : 1426 i : 4276 like : 2851 man : 1426 my : 2852 the : 1426 think : 1425 --------------