Book Image

Mastering Apache Storm

By : Ankit Jain
Book Image

Mastering Apache Storm

By: Ankit Jain

Overview of this book

Apache Storm is a real-time Big Data processing framework that processes large amounts of data reliably, guaranteeing that every message will be processed. Storm allows you to scale your data as it grows, making it an excellent platform to solve your big data problems. This extensive guide will help you understand right from the basics to the advanced topics of Storm. The book begins with a detailed introduction to real-time processing and where Storm fits in to solve these problems. You’ll get an understanding of deploying Storm on clusters by writing a basic Storm Hello World example. Next we’ll introduce you to Trident and you’ll get a clear understanding of how you can develop and deploy a trident topology. We cover topics such as monitoring, Storm Parallelism, scheduler and log processing, in a very easy to understand manner. You will also learn how to integrate Storm with other well-known Big Data technologies such as HBase, Redis, Kafka, and Hadoop to realize the full potential of Storm. With real-world examples and clear explanations, this book will ensure you will have a thorough mastery of Apache Storm. You will be able to use this knowledge to develop efficient, distributed real-time applications to cater to your business needs.
Table of Contents (19 chapters)
Title Page
Credits
About the Author
About the Reviewers
www.PacktPub.com
Customer Feedback
Preface

The Storm data model


The basic unit of data that can be processed by a Storm application is called a tuple. Each tuple consists of a predefined list of fields. The value of each field can be a byte, char, integer, long, float, double, Boolean, or byte array. Storm also provides an API to define your own datatypes, which can be serialized as fields in a tuple.

A tuple is dynamically typed, that is, you just need to define the names of the fields in a tuple and not their datatype. The choice of dynamic typing helps to simplify the API and makes it easy to use. Also, since a processing unit in Storm can process multiple types of tuples, it's not practical to declare field types.

Each of the fields in a tuple can be accessed by its name, getValueByField(String), or its positional index, getValue(int), in the tuple. Tuples also provide convenient methods such as getIntegerByField(String) that save you from typecasting the objects. For example, if you have a Fraction (numerator, denominator) tuple, representing fractional numbers, then you can get the value of the numerator by either using getIntegerByField("numerator") or getInteger(0).

You can see the full set of operations supported by org.apache.storm.tuple.Tuple in the Java doc that is located at https://storm.apache.org/releases/1.0.2/javadocs/org/apache/storm/tuple/Tuple.html.

Definition of a Storm topology

In Storm terminology, a topology is an abstraction that defines the graph of the computation. You create a Storm topology and deploy it on a Storm cluster to process data. A topology can be represented by a direct acyclic graph, where each node does some kind of processing and forwards it to the next node(s) in the flow. The following diagram is a sample Storm topology:

The following are the components of a Storm topology:

  • Tuple: A single message/record that flows between the different instances of a topology is called a tuple.
  • Stream: The key abstraction in Storm is that of a stream. A stream is an unbounded sequence of tuples that can be processed in parallel by Storm. Each stream can be processed by a single or multiple types of bolts (the processing units in Storm, which are defined later in this section). Thus, Storm can also be viewed as a platform to transform streams. In the preceding diagram, streams are represented by arrows. Each stream in a Storm application is given an ID and the bolts can produce and consume tuples from these streams on the basis of their ID. Each stream also has an associated schema for the tuples that will flow through it.
  • Spout: A spout is the source of tuples in a Storm topology. It is responsible for reading or listening to data from an external source, for example, by reading from a log file or listening for new messages in a queue and publishing them--emitting in Storm terminology into streams. A spout can emit multiple streams, each of a different schema. For example, it can read records of 10 fields from a log file and emit them as different streams of seven-fields tuples and four-fields tuples each.

The org.apache.storm.spout.ISpout interface is the interface used to define spouts. If you are writing your topology in Java, then you should use org.apache.storm.topology.IRichSpout as it declares methods to use with the TopologyBuilder API. Whenever a spout emits a tuple, Storm tracks all the tuples generated while processing this tuple, and when the execution of all the tuples in the graph of this source tuple is complete, it will send an acknowledgement back to the spout. This tracking happens only if a message ID was provided when emitting the tuple. If null was used as the message ID, this tracking will not happen.

A tuple processing timeout can also be defined for a topology, and if a tuple is not processed within the specified timeout, a fail message will be sent back to the spout. Again, this will happen only if you define a message ID. A small performance gain can be extracted out of Storm at the risk of some data loss by disabling the message acknowledgements, which can be done by skipping the message ID while emitting tuples.

The important methods of spout are:

    • nextTuple(): This method is called by Storm to get the next tuple from the input source. Inside this method, you will have the logic of reading data from external sources and emitting them to an instance of org.apache.storm.spout.ISpoutOutputCollector. The schema for streams can be declared by using the declareStream method of org.apache.storm.topology.OutputFieldsDeclarer.

If a spout wants to emit data to more than one stream, it can declare multiple streams using the declareStream method and specify a stream ID while emitting the tuple. If there are no more tuples to emit at the moment, this method will not be blocked. Also, if this method does not emit a tuple, then Storm will wait for 1 millisecond before calling it again. This waiting time can be configured using the topology.sleep.spout.wait.strategy.time.ms setting.

    • ack(Object msgId): This method is invoked by Storm when the tuple with the given message ID is completely processed by the topology. At this point, the user should mark the message as processed and do the required cleaning up, such as removing the message from the message queue so that it does not get processed again.
    • fail(Object msgId): This method is invoked by Storm when it identifies that the tuple with the given message ID has not been processed successfully or has timed out of the configured interval. In such scenarios, the user should do the required processing so that the messages can be emitted again by the nextTuple method. A common way to do this is to put the message back in the incoming message queue.
    • open(): This method is called only once--when the spout is initialized. If it is required to connect to an external source for the input data, define the logic to connect to the external source in the open method, and then keep fetching the data from this external source in the nextTuple method to emit it further.

Note

Another point to note while writing your spout is that none of the methods should be blocking, as Storm calls all the methods in the same thread. Every spout has an internal buffer to keep track of the status of the tuples emitted so far. The spout will keep the tuples in this buffer until they are either acknowledged or failed, calling the ack or fail method, respectively. Storm will call the nextTuple method only when this buffer is not full.

  • Bolt: A bolt is the processing powerhouse of a Storm topology and is responsible for transforming a stream. Ideally, each bolt in the topology should be doing a simple transformation of the tuples, and many such bolts can coordinate with each other to exhibit a complex transformation.

The org.apache.storm.task.IBolt interface is preferably used to define bolts, and if a topology is written in Java, you should use the org.apache.storm.topology.IRichBolt interface. A bolt can subscribe to multiple streams of other components--either spouts or other bolts--in the topology and similarly can emit output to multiple streams. Output streams can be declared using the declareStream method of org.apache.storm.topology.OutputFieldsDeclarer.

The important methods of a bolt are:

    • execute(Tuple input): This method is executed for each tuple that comes through the subscribed input streams. In this method, you can do whatever processing is required for the tuple and then produce the output either in the form of emitting more tuples to the declared output streams, or other things such as persisting the results in a database.

You are not required to process the tuple as soon as this method is called, and the tuples can be held until required. For example, while joining two streams, when a tuple arrives you can hold it until its counterpart also comes, and then you can emit the joined tuple.

The metadata associated with the tuple can be retrieved by the various methods defined in the Tuple interface. If a message ID is associated with a tuple, the execute method must publish an ack or fail event using OutputCollector for the bolt, or else Storm will not know whether the tuple was processed successfully. The org.apache.storm.topology.IBasicBolt interface is a convenient interface that sends an acknowledgement automatically after the completion of the execute method. If a fail event is to be sent, this method should throw org.apache.storm.topology.FailedException.

    • prepare(Map stormConf, TopologyContext context, OutputCollector collector): A bolt can be executed by multiple workers in a Storm topology. The instance of a bolt is created on the client machine and then serialized and submitted to Nimbus. When Nimbus creates the worker instances for the topology, it sends this serialized bolt to the workers. The work will desterilize the bolt and call the prepare method. In this method, you should make sure the bolt is properly configured to execute tuples. Any state that you want to maintain can be stored as instance variables for the bolt that can be serialized/deserialized later.

Operation modes in Storm

Operation modes indicate how the topology is deployed in Storm. Storm supports two types of operation modes to execute the Storm topology:

  • Local mode: In local mode, Storm topologies run on the local machine in a single JVM. This mode simulates a Storm cluster in a single JVM and is used for the testing and debugging of a topology.
  • Remote mode: In remote mode, we will use the Storm client to submit the topology to the master along with all the necessary code required to execute the topology. Nimbus will then take care of distributing your code.

In the next chapter, we are going to cover both local and remote mode in more detail, along with a sample example.