The Druid integration is the same that was used in the previous chapter. As a brief recap, this integration comprises the StateFactory
, StateUpdater
, and State
implementations. The State
implementation then communicates with a StormFirehoseFactory
implementation and a StormFirehose
implementation for Druid. At the heart of this implementation is the StormFirehose
implementation, which maps the tuples into input rows for Druid. The listing for this method is shown as follows:
@Override public InputRow nextRow() { final Map<String, Object> theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); try { TridentTuple tuple = null; tuple = BLOCKING_QUEUE.poll(); if (tuple != null) { String phrase = (String) tuple.getValue(0); String word = (String) tuple.getValue(2); Long baseline = (Long) tuple.getValue(3); theMap.put("searchphrase", phrase); theMap.put("word", word); theMap.put("baseline", baseline); } if (BLOCKING_QUEUE...