Sign In Start Free Trial
Account

Add to playlist

Create a Playlist

Modal Close icon
You need to login to use this feature.
  • Book Overview & Buying Storm Blueprints: Patterns for Distributed Real-time Computation
  • Table Of Contents Toc
Storm Blueprints: Patterns for Distributed Real-time Computation

Storm Blueprints: Patterns for Distributed Real-time Computation

4.1 (8)
close
close
Storm Blueprints: Patterns for Distributed Real-time Computation

Storm Blueprints: Patterns for Distributed Real-time Computation

4.1 (8)

Overview of this book

Table of Contents (17 chapters)
close
close
Storm Blueprints: Patterns for Distributed Real-time Computation
Credits
About the Authors
About the Reviewers
www.PacktPub.com
Preface
1
Index

Guaranteed processing


Storm provides an API that allows you to guarantee that a tuple emitted by a spout is fully processed. So far in our example, we've not worried about failures. We've seen that a spout stream can be split and can generate any number of streams in a topology, depending on the behavior of downstream bolts. What happens in the event of a failure? As an example, consider a bolt that persists information to tuple data based on a database. How do we handle situations where the database update fails?

Reliability in spouts

In Storm, guaranteed message processing begins with the spout. A spout that supports guaranteed processing needs a way to keep track of tuples it has emitted and be prepared to re-emit a tuple if downstream processing of that tuple, or any child tuples, fails. A child tuple can be thought of as any tuple emitted as a result of a tuple originating from a spout. Another way to look at it is to consider the spout's stream(s) as the trunk of a tuple tree (shown in the following diagram):

Tuple tree

In the preceding diagram, the solid lines represent the original trunk tuples emitted by a spout, and the dotted lines represent tuples derived from the original tuple. The resulting graph represents the tuple tree. With guaranteed processing, each bolt in the tree can either acknowledge (ack) or fail a tuple. If all bolts in the tree acknowledge tuples derived from the trunk tuple, the spout's ack method will be called to indicate that message processing is complete. If any of the bolts in the tree explicitly fail a tuple, or if processing of the tuple tree exceeds the time-out period, the spout's fail method will be called.

Storm's ISpout interface defines three methods involved in the reliability API: nextTuple, ack, and fail.

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

As we've seen before, when Storm requests that a spout emit a tuple, it calls the nextTuple() method. The first step in implementing guaranteed processing is to assign the outbound tuple a unique ID and pass that value to the emit() method of SpoutOutputCollector:

collector.emit(new Values("value1", "value2") , msgId);

Assigning the tuple a message ID tells Storm that a spout would like to receive notifications either when the tuple tree is completed or if it fails at any point. If processing succeeds, the spout's ack() method will be called with the message ID assigned to the tuple. If processing fails or times out, the spout's fail method will be called.

Reliability in bolts

Implementing a bolt that participates in guaranteed processing involves two steps:

  1. Anchoring to an incoming tuple when emitting a derived tuple.

  2. Acknowledging or failing tuples that have been processed successfully or unsuccessfully, respectively.

Anchoring to a tuple means that we are creating a link between an incoming tuple and derived tuples such that any downstream bolts are expected to participate in the tuple tree by acknowledging the tuple, failing the tuple, or allowing it to time out.

You can anchor to a tuple (or a list of tuples) by calling one of the overloaded emit methods of OutputCollector:

collector.emit(tuple, new Values(word));

Here, we're anchoring to the incoming tuple and emitting a new tuple that downstream bolts should acknowledge or fail. An alternative form of the emit method will emit unanchored tuples:

collector.emit(new Values(word));));

Unanchored tuples do not participate in the reliability of a stream. If an unanchored tuple fails downstream, it will not cause a replay of the original root tuple.

After successfully processing a tuple and optionally emitting new or derived tuples, a bolt processing a reliable stream should acknowledge the inbound tuple:

this.collector.ack(tuple);

If tuple processing fails in such a way that the spout must replay (re-emit) the tuple, the bolt should explicitly fail the tuple:

this.collector.fail(tuple)

If tuple processing fails as a result of a time out or through an explicit call, the OutputCollector.fail() method, the spout that emitted the original tuple, will be notified, allowing it to re-emit the tuple, as you'll see shortly.

Reliable word count

To further illustrate reliability, let's begin by enhancing the SentenceSpout class to make it support guaranteed delivery. It will need to keep track of all tuples emitted and assign each one a unique ID. We'll use a HashMap<UUID, Values> object to store the tuples that are pending. For each tuple we emit, we'll assign a unique identifier and store it in our map of pending tuples. When we receive an acknowledgement, we'll remove the tuple from our pending list. On failure, we'll replay the tuple:

public class SentenceSpout extends BaseRichSpout {
    
    
    private ConcurrentHashMap<UUID, Values> pending;
    private SpoutOutputCollector collector;
    private String[] sentences = {
        "my dog has fleas",
        "i like cold beverages",
        "the dog ate my homework",
        "don't have a cow man",
        "i don't think i like fleas"
    };
    private int index = 0;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void open(Map config, TopologyContext context, 
            SpoutOutputCollector collector) {
        this.collector = collector;
        this.pending = new ConcurrentHashMap<UUID, Values>();
    }

    public void nextTuple() {
        Values values = new Values(sentences[index]);
        UUID msgId = UUID.randomUUID();
        this.pending.put(msgId, values);
        this.collector.emit(values, msgId);
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        Utils.waitForMillis(1);
    }

    public void ack(Object msgId) {
        this.pending.remove(msgId);
    }

    public void fail(Object msgId) {
        this.collector.emit(this.pending.get(msgId), msgId);
    }    
}

Modifying the bolts to provide guaranteed processing simply involves anchoring outbound tuples to the incoming tuple and then acknowledging the inbound tuple:

public class SplitSentenceBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(tuple, new Values(word));
        }
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
CONTINUE READING
83
Tech Concepts
36
Programming languages
73
Tech Tools
Icon Unlimited access to the largest independent learning library in tech of over 8,000 expert-authored tech books and videos.
Icon Innovative learning tools, including AI book assistants, code context explainers, and text-to-speech.
Icon 50+ new titles added per month and exclusive early access to books as they are being written.
Storm Blueprints: Patterns for Distributed Real-time Computation
notes
bookmark Notes and Bookmarks search Search in title playlist Add to playlist font-size Font size

Change the font size

margin-width Margin width

Change margin width

day-mode Day/Sepia/Night Modes

Change background colour

Close icon Search
Country selected

Close icon Your notes and bookmarks

Confirmation

Modal Close icon
claim successful

Buy this book with your credits?

Modal Close icon
Are you sure you want to buy this book with one of your credits?
Close
YES, BUY

Submit Your Feedback

Modal Close icon
Modal Close icon
Modal Close icon