The click topology is designed to gather basic website-usage statistics, specifically:
The number of visitors
The number of unique visitors
The number of visitors for a given country
The number of visitors for a given city
The percentage of visitors for each city in a given country
The system assumes a limited possible visitor population and prefers server-side client keys as opposed to client-side cookies. The topology derives the geographic information from the IP address and a public IP resolution service.
The click topology also uses Redis to store click events being sent into the topology, specifically as a persistent queue, and it also leverages Redis in order to persistently recall the previous visitors to the site.
Note
For more information on Redis, please visit Redis.io.
Before you proceed, you must install Redis (Version 2.6 or greater):
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make sudo cp redis-server /usr/local/bin/ sudo cp redis-cli /usr/local/bin/
Then start the Redis server.
Create a new Java project named
click-topology
, and create thepom.xml
file and folder structure as per the "Hello World" topology project.In the
pom.xml
file, update the project name and references, and then add the following dependencies to the<dependencies>
tag:<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.jmock</groupId> <artifactId>jmock-junit4</artifactId> <version>2.5.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.jmock</groupId> <artifactId>jmock-legacy</artifactId> <version>2.5.1</version> <scope>test</scope> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.1.0</version> </dependency>
Take a special note of the
scope
definitions of JUnit and JMock so as to not include them in your final deployable JAR.In the
source/main/java
folder, create theClickTopology
main class in thepackage storm.cookbook
package. This class defines the topology and provides the mechanisms to launch the topology into a cluster or in a local mode. Create the class as follows:public ClickTopology(){ builder.setSpout("clickSpout", new ClickSpout(), 10); //First layer of bolts builder.setBolt("repeatsBolt", new RepeatVisitBolt(), 10) .shuffleGrouping("clickSpout"); builder.setBolt("geographyBolt", new GeographyBolt(new HttpIPResolver()), 10) .shuffleGrouping("clickSpout"); //second layer of bolts, commutative in nature builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt"); builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY)); conf.put(Conf.REDIS_PORT_KEY, DEFAULT_JEDIS_PORT); } public void runLocal(int runTime){ conf.setDebug(true); conf.put(Conf.REDIS_HOST_KEY, "localhost"); cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); if(runTime > 0){ Utils.sleep(runTime); shutDownLocal(); } } public void shutDownLocal(){ if(cluster != null){ cluster.killTopology("test"); cluster.shutdown(); } } public void runCluster(String name, String redisHost) throws AlreadyAliveException, InvalidTopologyException { conf.setNumWorkers(20); conf.put(Conf.REDIS_HOST_KEY, redisHost); StormSubmitter.submitTopology(name, conf, builder.createTopology()); }
This is followed by the
main
method, which is guided by the number of arguments passed at runtime:public static void main(String[] args) throws Exception { ClickTopology topology = new ClickTopology(); if(args!=null && args.length > 1) { topology.runCluster(args[0], args[1]); } else { if(args!=null && args.length == 1) System.out.println("Running in local mode, redis ip missing for cluster run"); topology.runLocal(10000); } }
The topology assumes that the web server pushes messages onto a Redis queue. You must create a spout to inject these into the Storm cluster as a stream. In the
storm.cookbook
package, create theClickSpout
class, which connects to Redis when it is opened by the cluster:public class ClickSpout extends BaseRichSpout { public static Logger LOG = Logger.getLogger(ClickSpout.class); private Jedis jedis; private String host; private int port; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP, storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY)); } @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf .get(Conf.REDIS_PORT_KEY).toString()); this.collector = spoutOutputCollector; connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); }
The cluster will then poll the spout for new tuples through the
nextTuple
method:public void nextTuple() { String content = jedis.rpop("count"); if(content==null || "nil".equals(content)) { try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject) JSONValue.parse(content); String ip = obj.get(storm.cookbook.Fields.IP).toString(); String url = obj.get(storm.cookbook.Fields.URL).toString(); String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY).toString(); collector.emit(new Values(ip,url,clientKey)); } }
Next, we need to create the bolts that will enrich the basic data through the database or remote API lookups. Let us start with the repeat visit bolt. This bolt will check the client's ID against previous visit records and emit the enriched tuple with a flag set for unique visits. Create the
RepeatVisitBolt
class, providing the open and Redis connection logic:public class RepeatVisitBolt extends BaseRichBolt { private OutputCollector collector; private Jedis jedis; private String host; private int port; @Override public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf. get(Conf.REDIS_PORT_KEY).toString()); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); }
In the
execute
method, the tuple from theClickSpout
class is provided by the cluster. The bolt needs to look up the previous visit flags from Redis, based on the fields in the tuple, and emit the enriched tuple:public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm.cookbook.Fields.IP); String clientKey = tuple.getStringByField(storm.cookbook.Fields.CLIENT_KEY); String url = tuple.getStringByField(storm.cookbook.Fields.URL); String key = url + ":" + clientKey; String value = jedis.get(key); if(value == null){ jedis.set(key, "visited"); collector.emit(new Values(clientKey, url, Boolean.TRUE.toString())); } else { collector.emit(new Values(clientKey, url, Boolean.FALSE.toString())); } }
Next, the geographic enrichment bolt must be created. This bolt will emit an enriched tuple by looking up the country and city of the client's IP address through a remote API call. The
GeographyBolt
class delegates the actual call to an injected IP resolver in order to increase the testability of the class. In thestorm.cookbook
package, create theGeographyBolt
class, extending from theBaseRichBolt
interface, and implement theexecute
method:public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm .cookbook.Fields.IP); JSONObject json = resolver.resolveIP(ip); String city = (String) json.get(storm .cookbook.Fields.CITY); String country = (String) json.get(storm .cookbook.Fields.COUNTRY_NAME); collector.emit(new Values(country, city)); }
Provide a resolver by implementing the resolver,
HttpIPResolver
, and injecting it intoGeographyBolt
at design time:public class HttpIPResolver implements IPResolver, Serializable { static String url = "http://api.hostip.info/get_json.php"; @Override public JSONObject resolveIP(String ip) { URL geoUrl = null; BufferedReader in = null; try { geoUrl = new URL(url + "?ip=" + ip); URLConnection connection = geoUrl.openConnection(); in = new BufferedReader(new InputStreamReader( connection.getInputStream())); JSONObject json = (JSONObject) JSONValue.parse(in); in.close(); return json; } catch (IOException e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) {} } } return null; } }
Next, we need to derive the geographic stats. The
GeoStatsBolt
class simply receives the enriched tuple fromGeographicBolt
and maintains an in-memory structure of the data. It also emits the updated counts to any interested party. TheGeoStatsBolt
class is designed such that the total population of the countries can be split between many bolts; however, all cities within each country must arrive at the same bolt. The topology, therefore, splits streams into the bolt on this basis:builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt", new Fields(storm.cookbook.Fields.COUNTRY));
Creating the
GeoStatsBolt
class, provide the implementation for theexecute
method:public void execute(Tuple tuple) { String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY); String city = tuple.getStringByField(Fields.CITY); if(!stats.containsKey(country)){ stats.put(country, new CountryStats(country)); } stats.get(country).cityFound(city); collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country).getCityTotal(city))); }
The bulk of logic is contained in the inner-model class that maintains an in-memory model of the city and country:
private class CountryStats { private int countryTotal = 0; private static final int COUNT_INDEX = 0; private static final int PERCENTAGE_INDEX = 1; private String countryName; public CountryStats(String countryName){ this.countryName = countryName; } private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>(); public void cityFound(String cityName){ countryTotal++; if(cityStats.containsKey(cityName)){ cityStats.get(cityName).set(COUNT_INDEX, cityStats.get(cityName) .get(COUNT_INDEX).intValue() + 1); } else { List<Integer> list = new LinkedList<Integer>(); //add some dummy data list.add(1); list.add(0); cityStats.put(cityName, list); } double percent = (double)cityStats.get(cityName) .get(COUNT_INDEX)/(double)countryTotal; cityStats.get(cityName).set(PERCENTAGE_INDEX, (int)percent); } public int getCountryTotal(){return countryTotal;} public int getCityTotal(String cityName){ return cityStats.get(cityName) .get(COUNT_INDEX).intValue(); } }
Finally, the
VisitorStatsBolt
method provides the final counting functionality for visitors and unique visits, based on the enriched stream from theRepeatVisitBolt
class. This bolt needs to receive all the count information in order to maintain a single in-memory count, which is reflected in the topology definition:builder.setBolt("totalStats", new VisitStatsBolt(), 1).globalGrouping("repeatsBolt");
In order to implement the
VisitorStatsBolt
class, create the class and define two member-level integers,total
anduniqueCount
; then implement theexecute
method:public void execute(Tuple tuple) { boolean unique = Boolean.parseBoolean(tuple .getStringByField(storm.cookbook.Fields.UNIQUE)); total++; if(unique)uniqueCount++; collector.emit(new Values(total,uniqueCount)); }
The following diagram illustrates the click topology:
The spout emits the click events from the web server into the topology, through a shuffle grouping, to both the geography and repeat bolts. This ensures that the load is evenly distributed around the cluster, especially for these slow or highly latent processes.
Tip
It is important to understand the commutative versus associative nature of your data model, together with any other concerns that are in your streams and inherent models before designing your topology.
It is important to understand the parallelism of Storm while setting up the topology structure. There is an excellent summary of this on the wiki (https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology). The key points to take into account are:
The number of worker processes for the topology (
TOPOLOGY_WORKERS
).The number of executors (threads) per component of the topology. This is set using the parallelism hint. Note that this sets only the initial value (number of threads); this can be increased at runtime using topology rebalancing (through the UI or CLI). You can limit the number of executors using the
Config#setMaxTaskParallelism()
method.The number of tasks is set by default to
1
per executor. You can adjust this value when you declare a component, using theComponentConfigurationDeclarer#setNumTasks()
method.
These are the key elements to consider when sizing your cluster. The cluster will try distributing work to worker processes, each containing many executors that may be executing one or more tasks. The number of executors per worker is therefore a function of the number of executors over the number of workers. A good example of this can be seen in the previously mentioned wiki page.
Using these numbers, you can size your cluster in terms of nodes and cores per node, where ideally you should have one core per thread (executor) in the cluster.