Book Image

Storm Blueprints: Patterns for Distributed Real-time Computation

Book Image

Storm Blueprints: Patterns for Distributed Real-time Computation

Overview of this book

Table of Contents (17 chapters)
Storm Blueprints: Patterns for Distributed Real-time Computation
Credits
About the Authors
About the Reviewers
www.PacktPub.com
Preface
Index

The final topology


We now have all the components necessary to build our log analysis topology as follows:

public class LogAnalysisTopology {

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();

        StaticHosts kafkaHosts = KafkaConfig.StaticHosts.fromHostString(Arrays.asList(new String[] { "localhost" }), 1);
        TridentKafkaConfig spoutConf = new TridentKafkaConfig(kafkaHosts, "log-analysis");
        spoutConf.scheme = new StringScheme();
        spoutConf.forceStartOffsetTime(-1);
        OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

        Stream spoutStream = topology.newStream("kafka-stream", spout);

        Fields jsonFields = new Fields("level", "timestamp", "message", "logger");
        Stream parsedStream = spoutStream.each(new Fields("str"), new JsonProjectFunction(jsonFields), jsonFields);

        // drop the unparsed JSON to reduce tuple size
        parsedStream = parsedStream.project...