Recall from the introduction that Storm allows a computation to scale horizontally across multiple machines by dividing the computation into multiple, independent tasks that execute in parallel across a cluster. In Storm, a task is simply an instance of a spout or bolt running somewhere on the cluster.
To understand how parallelism works, we must first explain the four main components involved in executing a topology in a Storm cluster:
Nodes (machines): These are simply machines configured to participate in a Storm cluster and execute portions of a topology. A Storm cluster contains one or more nodes that perform work.
Workers (JVMs): These are independent JVM processes running on a node. Each node is configured to run one or more workers. A topology may request one or more workers be assigned to it.
Executors (threads): These are Java threads running within a worker JVM process. Multiple tasks can be assigned to a single executor. Unless explicitly overridden, Storm will assign one task for each executor.
Tasks (bolt/spout instances): Tasks are instances of spouts and bolts whose
nextTuple()
andexecute()
methods are called by executor threads.
So far in our word count example, we have not explicitly used any of Storm's parallelism APIs; instead, we allowed Storm to use its default settings. In most cases, unless overridden, Storm will default most parallelism settings to a factor of one.
Before changing the parallelism settings for our topology, let's consider how our topology will execute with the default settings. Assuming we have one machine (node), have assigned one worker to the topology, and allowed Storm to one task per executor, our topology execution would look like the following:
As you can see, the only parallelism we have is at the thread level. Each task runs on a separate thread within a single JVM. How can we increase the parallelism to more effectively utilize the hardware we have at our disposal? Let's start by increasing the number of workers and executors assigned to run our topology.
Assigning additional workers is an easy way to add computational power to a topology, and Storm provides the means to do so through its API as well as pure configuration. Whichever method we choose, our component spouts and bolts do not have to change, and can be reused as is.
In the previous version of the word count topology, we introduced the Config
object that gets passed to the submitTopology()
method at deployment time but left it largely unused. To increase the number of workers assigned to a topology, we simply call the setNumWorkers()
method of the Config
object:
Config config = new Config(); config.setNumWorkers(2);
This assigns two workers to our topology instead of the default of one. While this will add computation resources to our topology, in order to effectively utilize those resources, we will also want to adjust the number of executors in our topology as well as the number of tasks per executor.
As we've seen, Storm creates a single task for each component defined in a topology, by default, and assigns a single executor for each task. Storm's parallelism API offers control over this behavior by allowing you to set the number of executors per task as well as the number of tasks per executor.
The number of executors assigned to a given component is configured by setting a parallelism hint when defining a stream grouping. To illustrate this feature, let's modify our topology definition to parallelize SentenceSpout
such that it is assigned two tasks and each task is assigned its own executor thread:
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
If we're using one worker, the execution of our topology now looks like the following:
Next, we will set up the split sentence bolt to execute as four tasks with two executors. Each executor thread will be assigned two tasks to execute (4 / 2 = 2). We'll also configure the word count bolt to run as four tasks, each with its own executor thread:
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2) .setNumTasks(4) .shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 4) .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
With two workers, the execution of the topology will now look like the following diagram:
With the topology parallelism increased, running the updated WordCountTopology
class should yield higher total counts for each word:
--- FINAL COUNTS --- a : 2726 ate : 2722 beverages : 2723 cold : 2723 cow : 2726 dog : 5445 don't : 5444 fleas : 5451 has : 2723 have : 2722 homework : 2722 i : 8175 like : 5449 man : 2722 my : 5445 the : 2727 think : 2722 --------------
Since spout emits data indefinitely and only stops when the topology is killed, the actual counts will vary depending on the speed of your computer and what other processes are running on it, but you should see an overall increase in the number of words emitted and processed.
It's important to point out that increasing the number of workers has no effect when running a topology in local mode. A topology running in local mode always runs in a single JVM process, so only task and executor parallelism settings have any effect. Storm's local mode offers a decent approximation of cluster behavior and is very useful for development, but you should always test your application in a true clustered environment before moving to production.