Book Image

Storm Real-time Processing Cookbook

By : Quinton Anderson
Book Image

Storm Real-time Processing Cookbook

By: Quinton Anderson

Overview of this book

<p>Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!<br />Storm Real Time Processing Cookbook will have basic to advanced recipes on Storm for real-time computation.<br /><br />The book begins with setting up the development environment and then teaches log stream processing. This will be followed by real-time payments workflow, distributed RPC, integrating it with other software such as Hadoop and Apache Camel, and more.</p>
Table of Contents (16 chapters)
Storm Real-time Processing Cookbook
Credits
About the Author
About the Reviewers
www.packtpub.com
Preface
Index

Deriving basic click statistics


The click topology is designed to gather basic website-usage statistics, specifically:

  • The number of visitors

  • The number of unique visitors

  • The number of visitors for a given country

  • The number of visitors for a given city

  • The percentage of visitors for each city in a given country

The system assumes a limited possible visitor population and prefers server-side client keys as opposed to client-side cookies. The topology derives the geographic information from the IP address and a public IP resolution service.

The click topology also uses Redis to store click events being sent into the topology, specifically as a persistent queue, and it also leverages Redis in order to persistently recall the previous visitors to the site.

Note

For more information on Redis, please visit Redis.io.

Getting ready

Before you proceed, you must install Redis (Version 2.6 or greater):

wget http://download.redis.io/redis-stable.tar.gz 
tar xvzf redis-stable.tar.gz 
cd redis-stable 
make
sudo cp redis-server /usr/local/bin/
sudo cp redis-cli /usr/local/bin/

Then start the Redis server.

How to do it…

  1. Create a new Java project named click-topology, and create the pom.xml file and folder structure as per the "Hello World" topology project.

  2. In the pom.xml file, update the project name and references, and then add the following dependencies to the <dependencies> tag:

    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.11</version>
       <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.jmock</groupId>
        <artifactId>jmock-junit4</artifactId>
        <version>2.5.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.jmock</groupId>
        <artifactId>jmock-legacy</artifactId>
        <version>2.5.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
       <groupId>redis.clients</groupId>
       <artifactId>jedis</artifactId>
       <version>2.1.0</version>
    </dependency>
  3. Take a special note of the scope definitions of JUnit and JMock so as to not include them in your final deployable JAR.

  4. In the source/main/java folder, create the ClickTopology main class in the package storm.cookbook package. This class defines the topology and provides the mechanisms to launch the topology into a cluster or in a local mode. Create the class as follows:

    public ClickTopology(){
      builder.setSpout("clickSpout", new ClickSpout(), 10);
    
      //First layer of bolts
      builder.setBolt("repeatsBolt", new RepeatVisitBolt(), 10)
                  .shuffleGrouping("clickSpout");
      builder.setBolt("geographyBolt", new GeographyBolt(new 
                  HttpIPResolver()), 10)
                  .shuffleGrouping("clickSpout");
    
      //second layer of bolts, commutative in nature
      builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt");
      builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY));
      conf.put(Conf.REDIS_PORT_KEY, DEFAULT_JEDIS_PORT);
      }
      public void runLocal(int runTime){
         conf.setDebug(true);
         conf.put(Conf.REDIS_HOST_KEY, "localhost");
         cluster = new LocalCluster();
         cluster.submitTopology("test", conf, builder.createTopology());
         if(runTime > 0){
             Utils.sleep(runTime);
             shutDownLocal();
         }
      }
    
      public void shutDownLocal(){
          if(cluster != null){
              cluster.killTopology("test");
              cluster.shutdown();
          }
        }
    
        public void runCluster(String name, String redisHost) throws AlreadyAliveException, 
                               InvalidTopologyException {
            conf.setNumWorkers(20);
            conf.put(Conf.REDIS_HOST_KEY, redisHost);
            StormSubmitter.submitTopology(name, conf, 
                builder.createTopology());
        }
  5. This is followed by the main method, which is guided by the number of arguments passed at runtime:

    public static void main(String[] args) throws Exception {
         ClickTopology topology = new ClickTopology();
         
          if(args!=null && args.length > 1) {
              topology.runCluster(args[0], args[1]);
          } else {
              if(args!=null && args.length == 1)
                System.out.println("Running in local mode, redis ip missing for cluster run");
              topology.runLocal(10000);
          }
    
    }
  6. The topology assumes that the web server pushes messages onto a Redis queue. You must create a spout to inject these into the Storm cluster as a stream. In the storm.cookbook package, create the ClickSpout class, which connects to Redis when it is opened by the cluster:

    public class ClickSpout extends BaseRichSpout {
    
       public static Logger LOG = Logger.getLogger(ClickSpout.class);
    
        private Jedis jedis;
        private String host;
        private int port;
        private SpoutOutputCollector collector;
    
    
      @Override
      public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
          outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
                  storm.cookbook.Fields.URL,
                  storm.cookbook.Fields.CLIENT_KEY));
      }
    
      @Override
      public void open(Map conf, TopologyContext 
                       topologyContext, SpoutOutputCollector 
                spoutOutputCollector) {
            host = conf.get(Conf.REDIS_HOST_KEY).toString();
            port = Integer.valueOf(conf
                 .get(Conf.REDIS_PORT_KEY).toString());
            this.collector = spoutOutputCollector;
            connectToRedis();
        }
    
        private void connectToRedis() {
            jedis = new Jedis(host, port);
        }
  7. The cluster will then poll the spout for new tuples through the nextTuple method:

    public void nextTuple() {
      String content = jedis.rpop("count");
       if(content==null || "nil".equals(content)) {
          try { Thread.sleep(300); } 
          catch (InterruptedException e) {}
       } else {
        JSONObject obj=(JSONObject) JSONValue.parse(content);
        String ip = obj.get(storm.cookbook.Fields.IP).toString();
        String url = obj.get(storm.cookbook.Fields.URL).toString();
        String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY).toString();
        collector.emit(new Values(ip,url,clientKey));
          }
      }
  8. Next, we need to create the bolts that will enrich the basic data through the database or remote API lookups. Let us start with the repeat visit bolt. This bolt will check the client's ID against previous visit records and emit the enriched tuple with a flag set for unique visits. Create the RepeatVisitBolt class, providing the open and Redis connection logic:

    public class RepeatVisitBolt extends BaseRichBolt {
    
        private OutputCollector collector;
    
        private Jedis jedis;
        private String host;
        private int port;
    
        @Override
        public void prepare(Map conf,
         TopologyContext topologyContext, OutputCollector 
         outputCollector) {
            this.collector = outputCollector;
            host = conf.get(Conf.REDIS_HOST_KEY).toString();
            port = Integer.valueOf(conf.
                   get(Conf.REDIS_PORT_KEY).toString());
            connectToRedis();
        }
    
        private void connectToRedis() {
            jedis = new Jedis(host, port);
            jedis.connect();
        }
  9. In the execute method, the tuple from the ClickSpout class is provided by the cluster. The bolt needs to look up the previous visit flags from Redis, based on the fields in the tuple, and emit the enriched tuple:

    public void execute(Tuple tuple) {
      String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
      String clientKey = tuple.getStringByField(storm.cookbook.Fields.CLIENT_KEY);
      String url = tuple.getStringByField(storm.cookbook.Fields.URL);
      String key = url + ":" + clientKey;
      String value = jedis.get(key);
      if(value == null){
           jedis.set(key, "visited");
           collector.emit(new Values(clientKey, url, 
              Boolean.TRUE.toString()));
      }  else {
            collector.emit(new Values(clientKey, url, 
                    Boolean.FALSE.toString()));
          }
    
      }
  10. Next, the geographic enrichment bolt must be created. This bolt will emit an enriched tuple by looking up the country and city of the client's IP address through a remote API call. The GeographyBolt class delegates the actual call to an injected IP resolver in order to increase the testability of the class. In the storm.cookbook package, create the GeographyBolt class, extending from the BaseRichBolt interface, and implement the execute method:

    public void execute(Tuple tuple) {
       String ip = tuple.getStringByField(storm
                   .cookbook.Fields.IP);
       JSONObject json = resolver.resolveIP(ip);
       String city = (String) json.get(storm
                   .cookbook.Fields.CITY);
       String country = (String) json.get(storm
                   .cookbook.Fields.COUNTRY_NAME);
       collector.emit(new Values(country, city));
       }
  11. Provide a resolver by implementing the resolver, HttpIPResolver, and injecting it into GeographyBolt at design time:

    public class HttpIPResolver implements IPResolver, Serializable {
        static String url =
                "http://api.hostip.info/get_json.php";
        @Override
        public JSONObject resolveIP(String ip) {
          URL geoUrl = null;
          BufferedReader in = null;
          try {
            geoUrl = new URL(url + "?ip=" + ip);
            URLConnection connection = geoUrl.openConnection();
            in = new BufferedReader(new InputStreamReader(
                                    connection.getInputStream()));
            JSONObject json = (JSONObject) JSONValue.parse(in);
            in.close();
            return json;
            } catch (IOException e) {
                e.printStackTrace();
            }
            finally {
                if(in != null){
                    try {
                        in.close();
                    } catch (IOException e) {}
                }
            }
            return null;
        }
    }
  12. Next, we need to derive the geographic stats. The GeoStatsBolt class simply receives the enriched tuple from GeographicBolt and maintains an in-memory structure of the data. It also emits the updated counts to any interested party. The GeoStatsBolt class is designed such that the total population of the countries can be split between many bolts; however, all cities within each country must arrive at the same bolt. The topology, therefore, splits streams into the bolt on this basis:

    builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY));
  13. Creating the GeoStatsBolt class, provide the implementation for the execute method:

    public void execute(Tuple tuple) {
            String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
            String city = tuple.getStringByField(Fields.CITY);
            if(!stats.containsKey(country)){
                stats.put(country, new CountryStats(country));
            }
            stats.get(country).cityFound(city);
            collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country).getCityTotal(city)));
    
        }
  14. The bulk of logic is contained in the inner-model class that maintains an in-memory model of the city and country:

    private class CountryStats {
            private int countryTotal = 0;
            private static final int COUNT_INDEX = 0;
            private static final int PERCENTAGE_INDEX = 1;
            private String countryName;
            public CountryStats(String countryName){
                this.countryName = countryName;
            }
            private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>();
            public void cityFound(String cityName){
                countryTotal++;
                if(cityStats.containsKey(cityName)){
                    cityStats.get(cityName).set(COUNT_INDEX, 
                      cityStats.get(cityName)
                      .get(COUNT_INDEX).intValue() + 1);
                } else {
                   List<Integer> list = new LinkedList<Integer>();
          //add some dummy data
                list.add(1);
                list.add(0);
                cityStats.put(cityName, list);
                }
    
            double percent = (double)cityStats.get(cityName)
                  .get(COUNT_INDEX)/(double)countryTotal;
            cityStats.get(cityName).set(PERCENTAGE_INDEX, 
                   (int)percent);
            }
         public int getCountryTotal(){return countryTotal;}
         public int getCityTotal(String cityName){
             return cityStats.get(cityName)
               .get(COUNT_INDEX).intValue();
         }
    }
  15. Finally, the VisitorStatsBolt method provides the final counting functionality for visitors and unique visits, based on the enriched stream from the RepeatVisitBolt class. This bolt needs to receive all the count information in order to maintain a single in-memory count, which is reflected in the topology definition:

    builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt");
  16. In order to implement the VisitorStatsBolt class, create the class and define two member-level integers, total and uniqueCount; then implement the execute method:

    public void execute(Tuple tuple) {
       boolean unique = Boolean.parseBoolean(tuple
        .getStringByField(storm.cookbook.Fields.UNIQUE));
       total++;
       if(unique)uniqueCount++;
       collector.emit(new Values(total,uniqueCount));
      }

How it works…

The following diagram illustrates the click topology:

The spout emits the click events from the web server into the topology, through a shuffle grouping, to both the geography and repeat bolts. This ensures that the load is evenly distributed around the cluster, especially for these slow or highly latent processes.

Tip

It is important to understand the commutative versus associative nature of your data model, together with any other concerns that are in your streams and inherent models before designing your topology.

It is important to understand the parallelism of Storm while setting up the topology structure. There is an excellent summary of this on the wiki (https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology). The key points to take into account are:

  • The number of worker processes for the topology (TOPOLOGY_WORKERS).

  • The number of executors (threads) per component of the topology. This is set using the parallelism hint. Note that this sets only the initial value (number of threads); this can be increased at runtime using topology rebalancing (through the UI or CLI). You can limit the number of executors using the Config#setMaxTaskParallelism() method.

  • The number of tasks is set by default to 1 per executor. You can adjust this value when you declare a component, using the ComponentConfigurationDeclarer#setNumTasks() method.

These are the key elements to consider when sizing your cluster. The cluster will try distributing work to worker processes, each containing many executors that may be executing one or more tasks. The number of executors per worker is therefore a function of the number of executors over the number of workers. A good example of this can be seen in the previously mentioned wiki page.

Using these numbers, you can size your cluster in terms of nodes and cores per node, where ideally you should have one core per thread (executor) in the cluster.