Let's look at the sample indexer code that creates indexes for Katta:
public class KattaIndexer implements MapRunnable<LongWritable, Text, Text, Text> { private JobConf _conf; public void configure(JobConf conf) { _conf = conf; } public void run(RecordReader<LongWritable, Text> reader, OutputCollector<Text, Text> output, final Reporter report) throws IOException { LongWritable key = reader.createKey(); Text value = reader.createValue(); String tmp = _conf.get("hadoop.tmp.dir"); long millis = System.currentTimeMillis(); String shardName = "" + millis + "-" + new Random().nextInt(); File file = new File(tmp, shardName); report.progress(); Analyzer analyzer = IndexConfiguration.getAnalyzer(_conf); IndexWriter indexWriter = new IndexWriter(file, analyzer); indexWriter.setMergeFactor(100000); report.setStatus("Adding documents..."); while (reader.next(key, value)) { report.progress(); Document doc = new Document(); String text = "" + value.toString(); Field contentField = new Field("content", text, Store.YES, Index.TOKENIZED); doc.add(contentField); indexWriter.addDocument(doc); } report.setStatus("Done adding documents."); Thread t = new Thread() { public boolean stop = false; @Override public void run() { while (!stop) { // Makes sure hadoop is not killing the task in case the // optimization // takes longer than the task timeout. report.progress(); try { sleep(10000); } catch (InterruptedException e) { // don't need to do anything. stop = true; } } } }; t.start(); report.setStatus("Optimizing index..."); indexWriter.optimize(); report.setStatus("Done optimizing!"); report.setStatus("Closing index..."); indexWriter.close(); report.setStatus("Closing done!"); FileSystem fileSystem = FileSystem.get(_conf); report.setStatus("Starting copy to final destination..."); Path destination = new Path(_conf.get("finalDestination")); fileSystem.copyFromLocalFile(new Path(file.getAbsolutePath()), destination); report.setStatus("Copy to final destination done!"); report.setStatus("Deleting tmp files..."); FileUtil.fullyDelete(file); report.setStatus("Delteing tmp files done!"); t.interrupt(); } }
Here is a sample Hadoop job that creates the Katta instance:
KattaIndexer kattaIndexer = new KattaIndexer(); String input = <input>; String output = <output>; int numOfShards = Integer.parseInt(args[2]); kattaIndexer.startIndexer(input, output, numOfShards);
You can use the following search client to search on the Katta instance:
Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_CURRENT); Query query = new QueryParser(Version.LUCENE_CURRENT, args[1], analyzer).parse(args[2]); ZkConfiguration conf = new ZkConfiguration(); LuceneClient luceneClient = new LuceneClient(conf); Hits hits = luceneClient.search(query, Arrays.asList(args[0]).toArray(new String[1]), 99); int num = 0; for (Hit hit : hits.getHits()) { MapWritable mw = luceneClient.getDetails(hit); for (Map.Entry<Writable, Writable> entry : mw.entrySet()) { System.out.println("[" + (num++) + "] key -> " + entry.getKey() + ", value -> " + entry.getValue()); } }