We now have all the components necessary to build our log analysis topology as follows:
public class LogAnalysisTopology { public static StormTopology buildTopology() { TridentTopology topology = new TridentTopology(); StaticHosts kafkaHosts = KafkaConfig.StaticHosts.fromHostString(Arrays.asList(new String[] { "localhost" }), 1); TridentKafkaConfig spoutConf = new TridentKafkaConfig(kafkaHosts, "log-analysis"); spoutConf.scheme = new StringScheme(); spoutConf.forceStartOffsetTime(-1); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); Stream spoutStream = topology.newStream("kafka-stream", spout); Fields jsonFields = new Fields("level", "timestamp", "message", "logger"); Stream parsedStream = spoutStream.each(new Fields("str"), new JsonProjectFunction(jsonFields), jsonFields); // drop the unparsed JSON to reduce tuple size parsedStream = parsedStream.project...