Book Image

Storm Blueprints: Patterns for Distributed Real-time Computation

Book Image

Storm Blueprints: Patterns for Distributed Real-time Computation

Overview of this book

Table of Contents (17 chapters)
Storm Blueprints: Patterns for Distributed Real-time Computation
Credits
About the Authors
About the Reviewers
www.PacktPub.com
Preface
Index

Understanding stream groupings


Based on the previous example, you may wonder why we did not bother increasing the parallelism of ReportBolt. The answer is that it does not make any sense to do so. To understand why, you need to understand the concept of stream groupings in Storm.

A stream grouping defines how a stream's tuples are distributed among bolt tasks in a topology. For example, in the parallelized version of the word count topology, the SplitSentenceBolt class was assigned four tasks in the topology. The stream grouping determines which one of those tasks will receive a given tuple.

Storm defines seven built-in stream groupings:

  • Shuffle grouping: This randomly distributes tuples across the target bolt's tasks such that each bolt receives an equal number of tuples.

  • Fields grouping: This routes tuples to bolt tasks based on the values of the fields specified in the grouping. For example, if a stream is grouped on the "word" field, tuples with the same value for the "word" field will always be routed to the same bolt task.

  • All grouping: This replicates the tuple stream across all bolt tasks such that each task will receive a copy of the tuple.

  • Global grouping: This routes all tuples in a stream to a single task, choosing the task with the lowest task ID value. Note that setting a parallelism hint or number of tasks on a bolt when using the global grouping is meaningless since all tuples will be routed to the same bolt task. The global grouping should be used with caution since it will route all tuples to a single JVM instance, potentially creating a bottleneck or overwhelming a specific JVM/machine in a cluster.

  • None grouping: The none grouping is functionally equivalent to the shuffle grouping. It has been reserved for future use.

  • Direct grouping: With a direct grouping, the source stream decides which component will receive a given tuple by calling the emitDirect() method. It and can only be used on streams that have been declared direct streams.

  • Local or shuffle grouping: The local or shuffle grouping is similar to the shuffle grouping but will shuffle tuples among bolt tasks running in the same worker process, if any. Otherwise, it will fall back to the shuffle grouping behavior. Depending on the parallelism of a topology, the local or shuffle grouping can increase topology performance by limiting network transfer.

In addition to the predefined groupings, you can define your own stream grouping by implementing the CustomStreamGrouping interface:

public interface CustomStreamGrouping extends Serializable {
    
void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List<Integer> targetTasks);
    
List<Integer> chooseTasks(int taskId, List<Object> values); 
}

The prepare() method is called at runtime to initiate the grouping with information the grouping implementation can use to make decisions on how to group tuples to receiving tasks. The WorkerTopologyContext object provides contextual information about the topology, and the GlobalStreamId object provides metadata about the stream being grouped on. The most useful parameter is targetTasks, which is a list of all the task identifiers the grouping needs to take into account. You will usually want to store the targetTasks parameter as an instance variable for reference in the implementation of the chooseTasks() method.

The chooseTasks() method returns a list of task identifiers to which a tuple should be sent. Its parameters are the task identifier of the component emitting the tuple and the values of the tuple.

To illustrate the importance of stream groupings, let's introduce a bug into our topology. Begin by modifying the nextTuple() method of SentenceSpout so it only emits each sentence once:

public void nextTuple() {
        if(index < sentences.length){
            this.collector.emit(new Values(sentences[index]));
            index++;
        }
        Utils.waitForMillis(1);
    }

Now run the topology to get the following output:

--- FINAL COUNTS ---
a : 2
ate : 2
beverages : 2
cold : 2
cow : 2
dog : 4
don't : 4
fleas : 4
has : 2
have : 2
homework : 2
i : 6
like : 4
man : 2
my : 4
the : 2
think : 2
--------------

Now change the field grouping on the CountBolt parameter to a shuffle grouping and rerun the topology:

builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
                .shuffleGrouping(SPLIT_BOLT_ID);

The output should look like the following:

--- FINAL COUNTS ---
a : 1
ate : 2
beverages : 1
cold : 1
cow : 1
dog : 2
don't : 2
fleas : 1
has : 1
have : 1
homework : 1
i : 3
like : 1
man : 1
my : 1
the : 1
think : 1
--------------

Our counts are off because the CountBolt parameter is stateful: it maintains a count for each word it's seen. In this case, the accuracy of our computation depends on the ability to group based on a tuple's content when components have been parallelized. The bug we introduced will only be manifested if the parallelism of the CountBolt parameter is greater than one. This underscores the importance of testing topologies with various parallelism configurations.

Tip

In general, you should avoid storing state information in a bolt since any time a worker fails and/or has its tasks reassigned, that information will be lost. One solution is to periodically take a snapshot of state information to a persistent store, such as a database, so it can be restored if a task is reassigned.