Friday, July 31, 2009

Nutch: A Semantic Indexer with Hadoop

Introduction

I've been playing with Nutch the past couple of weeks (see here and here). One of the reasons I started looking at Nutch was the hope that its code would help me understand how to build real-life programs with Hadoop. Here is a toy example I came up with - a rather primitive semantic indexer that uses my blog pages as input, and my set of blog tags (manually enhanced, see below) as its dictionary.

Rather than insert additional logic into specific points in the Nutch lifecycle as I have done with my plugins in previous weeks, this code creates an entire pipeline of 3 Map-Reduce jobs to count and score occurrences of tags in my blog text. It piggybacks on the artifacts created by Nutch during its crawl, so I did not have to build everything from scratch and I learned a bit more about Nutch and Hadoop in the process. Experienced users of Nutch and/or Hadoop would probably find this code horribly amateurish - so if you have suggestions for improvement, I would greatly appreciate your comments.

The problem

I find that the hardest thing about Map-Reduce is defining the problem in terms of the framework, possibly because of my relative unfamiliarity with it. So it probably helps to define the problem first, so here goes.

When you write a blog on Blogger (and I guess on pretty much any other blogging software/service), you are allowed to tag the blog with some keywords that describe the content. Since I don't have a computer for a mind, I generally end up tagging with whatever seems appropriate at the time - as you can see from the Tag Cloud on the sidebar, this approach has proved to be less than optimal.

What I really want is a program that will "know" the tags I have set in the past, scan my new content, and suggest a subset of these tags for tagging this content. If in addition, it could "roll up" the tags into higher level terms, that would be icing on the cake. For example, if one of my tags is "cx_oracle", it could roll up to tags such as "databases", "python", "scripting" and "oracle", because it is very likely (for me at least) that when I talk about "cx_oracle", I am actually talking about all of the others as well.

The Dictionary

I first collected the tags I have created so far by downloading the blog posts using the GData Java API, and parsing out the labels into a flat file, then using some simple Unix commands to get the unique sorted set of blog tags. The code is quite simple, its basically adapted from the examples on the GData site, and its peripheral to this stuff anyway, so I don't show it here.

I then manually enhanced it with "roll-up" tags as I discussed above. Here are some examples from my dictionary.txt file. The idea is that the roll-up tags will be counted each time the base tag is counted, and because a roll-up tag can be associated with multiple base tags, over time, they will end up larger in the Tag Cloud, reflecting the nature of the blog more accurately than a bunch of super-specific tags.

1
2
3
4
5
...
classification:ir,algorithms
clustering:ir,algorithms
cx_oracle:databases,python,scripting,oracle
...

Algorithm

As mentioned before, this is a 3-stage Map-Reduce job, the output of one is used to feed the input of the next one in the chain. The Mappers and Reducers are described in terms of their inputs and outputs in the table below:

Job-#PhaseInputOutput
1 Map <url,ParseData> or <url,ParseText> List(<(url,term),count>)
Reduce (LongSumReducer) <(url,term),List(count)> List(<(url,term),count>)
2 Map <(url,term),count> List(<url,(term:count)>)
Reduce <url,List(term:count)> List(<url,CSV(term:count)>)
3 Map <url,CSV(term:count)> <digest(url,term),NutchDocument>
Reduce List(<digest(url,term),NutchDocument>) (used to create Lucene index)

The first stage pulls the page text and page title from the <url,ParseText> and <url,ParseData> maps stored by Nutch during its crawl in the segments/../parse_text and segments/../parse_data directories respectively. Doug Cutting's slides on Hadoop usage in Nutch were immensely helpful in figuring out what to look for. The Mapper analyzes the text of the page or title for matches against the dictionary. If a match occurs in the text agsinst one or more terms on the LHS part of the dictionary entry, then a map entry is written as <(url,term),count> for it. Additionally, if there are corresponding RHS terms for the matched term, each RHS term is scored in the same manner. The output of this Map job is passed to a LongSumReducer, which aggregates the term counts for the (url,term) pair.

Before starting, the Mapper at this stage sets up an internal map of <LHS term,List<RHS term>> from the dictionary file, the path to which is passed into the job from the caller.

The second stage flattens the entries so each URL has one entry with all the aggregated term counts. The Mapper changes the entry from <(url,term),count> to <url,(term:count)>, dropping terms whose counts are less than a specified value. The Reducer takes the collection of (term:count) values for a given URL and strings them up together into a single entry of url to comma-separated string of term:count pairs.

The third stage is responsible for merging information from the Lucene index already created by Nutch in the index directory, and the <url,CSV(term:count)> structure, and creating a new Lucene index out of it. The Mapper looks up the Nutch Lucene index by URL using a two-step map lookup from an internal data structure, and merges it with each (term:count) pair to create multiple NutchDocument objects for each URL. The Reducer takes each of these records and writes it to a new Lucene index into the index2 subdirectory.

The internal data structure used by the Mapper in this stage is a Map of <url,docId> of the Nutch Lucene index. Because the url field is tokenized by Nutch, it was not possible to use the url to look up the record directly, so I dumped out the Nutch Lucene index into memory and create a mapping between url and docId - this approach may not be feasible for large indexes.

Code

Build and Setup Info

I decided to build the indexer within the Nutch codebase, rather than do this externally and have Nutch and Hadoop as dependencies. This was mainly for convenience - I did not want to have to figure out Nutch's build system as well right away. So I created a sub-package called "custom" under Nutch's org.apache.nutch.indexer package and put all my classes in there.

This way I could use Nutch's build.xml to build a new nutch-1.0.jar file that included my code with "ant compile jar".

InvertedIndex.java

The code for the InvertedIndex data structure is shown below. I could also have used Lucene's term API to let Lucene do much of this logic, but this approach means less wrapping/unwrapping.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
// Source: src/java/org/apache/nutch/indexer/custom/InvertedIndex.java
package org.apache.nutch.indexer.custom;

import java.text.BreakIterator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.StringUtils;

/**
 * A home grown inverted index to count occurrences of multi-word terms
 * within a body of text. The underlying data structure is a Map keyed
 * by individual words. The value mapped by the key is a Set of positions
 * corresponding to the positions of the word (0-based) in the text. For
 * multi-word terms, consecutive words are looked up and their positions
 * used to find the number of times a multi-word term appears in the text.
 */
public class InvertedIndex {

  private Map<String,Set<Integer>> termPositions;
  
  public InvertedIndex(String text) {
    termPositions = new HashMap<String,Set<Integer>>();
    BreakIterator wordBreakIterator = BreakIterator.getWordInstance(
      Locale.getDefault());
    wordBreakIterator.setText(text);
    int current = 0;
    int wordPosition = 0;
    for (;;) {
      int end = wordBreakIterator.next();
      if (end == BreakIterator.DONE) {
        break;
      }
      String nextWord = text.substring(current, end);
      current = end;
      if (StringUtils.isBlank(nextWord) || 
          nextWord.matches("\\p{Punct}")) {
        continue;
      }
      String[] words = getMultiWords(nextWord);
      for (String word : words) {
        wordPosition = addPosition(word, wordPosition);
      }
    }
  }

  public boolean exists(String term) {
    return countOccurrencesOf(term) > 0;
  }

  public int countOccurrencesOf(String term) {
    String[] multiwords = 
      getMultiWords(StringUtils.replace(term, " ", "-"));
    Set<Integer> newPrevPositions = new HashSet<Integer>();
    Set<Integer> prevPositions = new HashSet<Integer>();
    int termId = 0;
    for (String word : multiwords) {
      termId++;
      if (termPositions.containsKey(word)) {
        if (termId == 1) {
          prevPositions.addAll(termPositions.get(word));
          // if this is the only word, we've found it
          if (multiwords.length == 1) {
            newPrevPositions.addAll(prevPositions);
          } else {
            continue;
          }
        } else {
          Set<Integer> currentPositions = termPositions.get(word);
          for (Integer currentPosition : currentPositions) {
            // check for the occurrence of (currentPosition - 1) in
            // the prevPositions, if so, copy to the newPrevPositions
            if (prevPositions.contains(currentPosition - 1)) {
              newPrevPositions.add(currentPosition);
            }
          }
          prevPositions.clear();
          prevPositions.addAll(newPrevPositions);
          newPrevPositions.clear();
        }
      } else {
        // the current term is not found in our index, invalidating
        // the results so far, we should exit at this point
        prevPositions.clear();
        break;
      }
    }
    return prevPositions.size();
  }

  private String[] getMultiWords(String term) {
    term = StringUtils.lowerCase(term);
    if (term.indexOf('-') > -1) {
      return StringUtils.split(term, "-");
    } else {
      return new String[] {term};
    }
  }

  private int addPosition(String word, int position) {
    Set<Integer> positions = (termPositions.containsKey(word) ?
      termPositions.get(word) : new HashSet<Integer>());
    positions.add(position);
    termPositions.put(word, positions);
    position++;
    return position;
  }
}

Indexer2.java

Here is the code for the indexer. All the Map and Reduce classes are built as private static classes. The main() method calls ToolRunner.run(), which calls index(), which in turn calls the three Map-Reduce jobs in sequence.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
// Source: src/java/org/apache/nutch/indexer/custom/Indexer2.java
package org.apache.nutch.indexer.custom;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.parse.ParseData;
import org.apache.nutch.parse.ParseText;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;

/**
 * Component to build a primitive "semantic" index, using a dictionary 
 * based on blog tags. Occurrences of a tag are counted and stored in
 * the index, along with their counts. Blog articles can then be looked
 * up by the tag and ranked using the tag counts.
 * 
 * The input to this index is the parse_data and parse_text information
 * in the crawled segments, the dictionary, and the index generated by 
 * the default Nutch index mechanism.
 * 
 * The format of the dictionary is a flat file, with tags (single or multi
 * word) as the key, mapped to zero or more "semantic" meanings of the 
 * term. For example, when we notice a tag "cx_oracle", it is probably
 * not only about cx_oracle, but also about databases, the Oracle database,
 * the Python scripting language, and scripting in general. Hence, the 
 * entry for cx_oracle will look like this:
 * 
 * <pre>
 * cx_oracle:databases,oracle,scripting,python
 * ...
 * </pre>
 * 
 * Tags appearing in the title are given a 5x (currently, configured by
 * TITLE_BOOST in the code) boost, ie, each occurrence of a tag in a title
 * is counted 5 times, and each occurrence of a tag in the body is counted
 * once.
 */
public class Indexer2 extends Configured implements Tool {

  private static final Logger LOGGER = Logger.getLogger(Indexer2.class);

  private static final int TITLE_BOOST = 5;
  private static final int LABEL_CUTOFF_SCORE = 2;
  
  // ==========================
  // **** STAGE 1: Analyze ****
  // ==========================

  /**
   * Input: <url,ParseData> or <url,ParseText>
   * Output: List(<(url,term),count>)
   * Processing: extract text from ParseText and title from ParseData and
   * generate a list of terms in the lexicon that are found in these texts.
   * Send a List(<url,term>,count) to the reducer. The reducer is a
   * (built-in) LongSumReducer, which aggregates counts for each
   * <url,term> key.
   * Configuration: the location of the dictionary is passed in, which is
   * used to build up the lookup table that is used for the processing.
   */
  private static class Mapper1 extends MapReduceBase
      implements Mapper<WritableComparable<Text>,
                 Writable,Text,LongWritable> {

    private static Map<String,List<String>> DICTIONARY = null;
    private static Integer TITLE_BOOST = null;
    
    @Override
    public void configure(JobConf job) {
      if (DICTIONARY == null || TITLE_BOOST == null) {
        TITLE_BOOST = Integer.valueOf(job.get("title.boost"));
        DICTIONARY = new HashMap<String,List<String>>();
        String dictFile = job.get("index2.dictfile");
        try {
          FileSystem localFileSystem = 
            FileSystem.getLocal(new Configuration());
          Path dictPath = new Path(dictFile);
          FSDataInputStream istream = localFileSystem.open(dictPath);
          BufferedReader reader = 
            new BufferedReader(new InputStreamReader(istream));
          String line;
          while ((line = reader.readLine()) != null) {
            String[] nvp = StringUtils.split(line, ":");
            List<String> values = new ArrayList<String>();
            if (nvp.length > 1) {
              String[] vals = StringUtils.split(nvp[1], ",");
              for (String val : vals) {
                values.add(val);
              }
            }
            DICTIONARY.put(nvp[0], values);
          }
        } catch (IOException e) {
          LOGGER.error("Could not get DICTIONARY file: " + dictFile, e);
          throw new RuntimeException(e);
        }
      }
    }
    
    @Override
    public void map(WritableComparable<Text> key, Writable value,
        OutputCollector<Text,LongWritable> output, Reporter reporter)
        throws IOException {
      String url = key.toString();
      String text = null;
      long occurrenceBoost = 1;
      if (value instanceof ParseText) {
        ParseText parseText = (ParseText) value;
        text = parseText.getText();
      } else if (value instanceof ParseData) {
        ParseData parseData = (ParseData) value;
        text = parseData.getTitle();
        occurrenceBoost *= TITLE_BOOST;
      }
      if (text != null && (! StringUtils.trim(text).equals("null"))) {
        InvertedIndex invertedIndex = new InvertedIndex(text);
        for (String term : DICTIONARY.keySet()) {
          int occurrences = invertedIndex.countOccurrencesOf(term);
          if (occurrences > 0) {
            String newKey = StringUtils.join(new String[] {url, term}, ",");
            LongWritable score = 
              new LongWritable(occurrences * occurrenceBoost);
            output.collect(new Text(newKey), score); 
            List<String> synonyms = DICTIONARY.get(term);
            if (synonyms != null && synonyms.size() > 0) {
              for (String synonym : synonyms) {
                newKey = StringUtils.join(new String[] {url, synonym}, ",");
                output.collect(new Text(newKey), score);
              }
            }
          }
        }
      }
    }
  }

  // ==========================
  // **** STAGE 2: Flatten ****
  // ==========================
  
  /**
   * Input: <(url,term),count>
   * Output: List(<url,(term:count)>
   * Processing: each record is re-parsed to be keyed by URL and passed to
   * the OutputCollector. Only terms with counts above a preconfigured cutoff
   * are collected. This is an attempt to remove label counts which have been
   * mentioned "in passing".
   */
  private static class Mapper2 extends MapReduceBase
      implements Mapper<WritableComparable<Text>,
                 LongWritable,Text,Text> {

    private static Float LABEL_CUTOFF_SCORE = null;
    
    @Override
    public void configure(JobConf job) {
      if (LABEL_CUTOFF_SCORE == null) {
        LABEL_CUTOFF_SCORE = new Float(job.get("label.cutoff.score"));
      }
    }
    
    @Override
    public void map(WritableComparable<Text> key, LongWritable value,
        OutputCollector<Text,Text> output,
        Reporter reporter) throws IOException {
      String[] urlTermPair = StringUtils.split(key.toString(), ",");
      long count = value.get();
      if (count > LABEL_CUTOFF_SCORE) {
        output.collect(new Text(urlTermPair[0]), 
          new Text(StringUtils.join(new String[] {
          urlTermPair[1], String.valueOf(count)}, ":")));
      }
    }
  }
  
  /**
   * Input: <url,List(term:count)>
   * Output: List(<url,CSV(term:count)>)
   * Processing: flattens multiple terms and their associated aggregate
   * counts back to the same URL key.
   */
  private static class Reducer2 extends MapReduceBase
      implements Reducer<WritableComparable<Text>,Text,
                 WritableComparable<Text>,Text> {

    @Override
    public void reduce(WritableComparable<Text> key,
        Iterator<Text> values,
        OutputCollector<WritableComparable<Text>,Text> output,
        Reporter reporter) throws IOException {
      StringBuilder termCounts = new StringBuilder();
      int i = 0;
      while (values.hasNext()) {
        Text value = values.next();
        if (i > 0) {
          termCounts.append(",");
        }
        termCounts.append(value);
        i++;
      }
      output.collect(key, new Text(termCounts.toString()));
    }
  }

  // ==========================
  // ****  STAGE 3: Merge  ****
  // ==========================

  /**
   * Input: <url,CSV(term:count)>
   * Output:<digest(url,term),NutchDocument>
   * Processing: the url is used to lookup the record in the index built by
   * Nutch as part of its normal cycle. A NutchDocument is created for each
   * (url,term) combination with this information, and the corresponding term
   * and term count. A new unique key is generated for this output record
   * using an MD5 hash of the url and term.
   */
  private static class Mapper3 extends MapReduceBase
      implements Mapper<WritableComparable<Text>,Text,
                 Text,NutchDocument> { 

    private static Map<String,Integer> URL_DOCID_MAP = null;
    private static IndexReader NUTCH_INDEX_READER = null;
    
    @Override
    public void configure(JobConf job) {
      try {
        if (URL_DOCID_MAP == null) {
          URL_DOCID_MAP = new HashMap<String,Integer>();
          NUTCH_INDEX_READER = 
            IndexReader.open(job.get("nutch.index.dir"));
          int numDocs = NUTCH_INDEX_READER.maxDoc();
          for (int i = 0; i < numDocs; i++) {
            Document doc = NUTCH_INDEX_READER.document(i);
            String url = doc.get("url");
            URL_DOCID_MAP.put(url, i);
          }
        }
      } catch (Exception e) {
        LOGGER.error("Cannot open index reader on nutch index for lookup");
        throw new RuntimeException(e);
      }
    }
    
    @Override
    public void map(WritableComparable<Text> key, Text value,
        OutputCollector<Text,NutchDocument> output, Reporter reporter)
        throws IOException {
      Integer docId = URL_DOCID_MAP.get(key.toString());
      if (docId != null) {
        Document doc = NUTCH_INDEX_READER.document(docId);
        if (doc != null) {
          String termCounts = value.toString();
          String[] termCountPairs = StringUtils.split(termCounts, ",");
          for (String termCountPair : termCountPairs) {
            String[] components = StringUtils.split(termCountPair, ":");
            NutchDocument nutchDoc = new NutchDocument();
            String label = components[0];
            String url = doc.get("url");
            nutchDoc.add("label", label);
            nutchDoc.add("label_count", components[1]);
            nutchDoc.add("url", url);
            nutchDoc.add("title", doc.get("title"));
            // generate a new unique key based on url and label
            String newKey = DigestUtils.md5Hex(
              StringUtils.join(new String[] {url, label}, ","));
            output.collect(new Text(newKey), nutchDoc);
          }
        }
      }
    }

    @Override
    public void close() {
      if (NUTCH_INDEX_READER != null) {
        try {
          NUTCH_INDEX_READER.close();
        } catch (Exception e) {}
      }
    }
  }

  /**
   * Input: List(<digest(url,term),NutchDocument>)
   * Output: none
   * Processing: A new Lucene index is created (path to the index passed in).
   * For each NutchDocument, a corresponding record is written to the Lucene 
   * index.
   */
  private static class Reducer3 extends MapReduceBase
      implements Reducer<WritableComparable<Text>,NutchDocument,
      WritableComparable<Text>,NutchDocument> {

    private static IndexWriter INDEX2_WRITER = null;
    
    @Override
    public void configure(JobConf job) {
      if (INDEX2_WRITER == null) {
        String indexOutputDir = job.get("index2.output.dir");
        try {
        INDEX2_WRITER = new IndexWriter(indexOutputDir, 
          new StandardAnalyzer(), MaxFieldLength.UNLIMITED);
        } catch (Exception e) {
          LOGGER.error("Could not open index [" + indexOutputDir + 
            "] for writing");
          throw new RuntimeException(e);
        }
      }
    }
    
    @Override
    public void reduce(WritableComparable<Text> key,
        Iterator<NutchDocument> values,
        OutputCollector<WritableComparable<Text>, 
        NutchDocument> output,
        Reporter reporter) throws IOException {
      while (values.hasNext()) {
        NutchDocument nutchDoc = values.next();
        Document doc = new Document();
        doc.add(new Field("url", nutchDoc.getFieldValue("url"), 
          Store.YES, Index.NOT_ANALYZED));
        doc.add(new Field("label", nutchDoc.getFieldValue("label"), 
          Store.YES, Index.NOT_ANALYZED));
        doc.add(new Field("label_count", 
          nutchDoc.getFieldValue("label_count"), 
          Store.YES, Index.NOT_ANALYZED));
        doc.add(new Field("title", nutchDoc.getFieldValue("title"), 
          Store.YES, Index.ANALYZED));
        INDEX2_WRITER.addDocument(doc);
      }
    }
    
    @Override
    public void close() {
      if (INDEX2_WRITER != null) {
        try {
          INDEX2_WRITER.optimize();
          INDEX2_WRITER.close();
        } catch (Exception e) {}
      }
    }
  }

  // ==========================
  // ****   Calling code   ****
  // ==========================
  
  private void analyze(Path indexDir, String dictFile, 
      List<Path> segments, int titleBoost) throws IOException {
    LOGGER.info("Stage 1 (analyze)");
    final JobConf job1 = new NutchJob(getConf());
    job1.set("index2.dictfile", dictFile);
    job1.set("title.boost", String.valueOf(titleBoost));
    job1.setJobName("index2-analyze " + segments);
    // inputs for this mapper (parse_data contains the title which we
    // want to analyze, and parse_text contains the body of the document,
    // so we add paths to both in our mapper
    for (Path segment : segments) {
      FileInputFormat.addInputPath(
        job1, new Path(segment, ParseData.DIR_NAME));
      FileInputFormat.addInputPath(
        job1, new Path(segment, ParseText.DIR_NAME));
    }
    FileOutputFormat.setOutputPath(job1, new Path(indexDir, "stage1"));
    job1.setMapperClass(Mapper1.class);
    job1.setReducerClass(LongSumReducer.class);
    job1.setInputFormat(SequenceFileInputFormat.class);
    job1.setOutputFormat(SequenceFileOutputFormat.class);
    job1.setMapOutputKeyClass(Text.class);
    job1.setMapOutputValueClass(LongWritable.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(LongWritable.class);
    JobClient.runJob(job1);
  }

  private void flatten(Path indexDir, int labelCutoffScore) 
      throws IOException {
    LOGGER.info("Stage 2 (flatten)");
    final JobConf job2 = new NutchJob(getConf());
    job2.set("label.cutoff.score", String.valueOf(labelCutoffScore));
    job2.setJobName("index2-normalize");
    FileInputFormat.addInputPath(job2, new Path(indexDir, "stage1"));
    FileOutputFormat.setOutputPath(job2, new Path(indexDir, "stage2"));
    job2.setMapperClass(Mapper2.class);
    job2.setReducerClass(Reducer2.class);
    job2.setInputFormat(SequenceFileInputFormat.class);
    job2.setOutputFormat(SequenceFileOutputFormat.class);
    job2.setMapOutputKeyClass(Text.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(Text.class);
    JobClient.runJob(job2);
  }

  private void merge(Path indexDir, String nutchIndexDir) 
      throws IOException {
    LOGGER.info("Stage 3 (merge)");
    final JobConf job3 = new NutchJob(getConf());
    job3.set("nutch.index.dir", nutchIndexDir);
    job3.set("index2.output.dir", 
      new Path(indexDir.getParent(), "index2").toString());
    job3.setJobName("index2-merge");
    FileInputFormat.addInputPath(job3, new Path(indexDir, "stage2"));
    FileOutputFormat.setOutputPath(job3, new Path(indexDir, "stage3"));
    job3.setMapperClass(Mapper3.class);
    job3.setReducerClass(Reducer3.class);
    job3.setInputFormat(SequenceFileInputFormat.class);
    job3.setOutputFormat(SequenceFileOutputFormat.class);
    job3.setMapOutputKeyClass(Text.class);
    job3.setMapOutputValueClass(NutchDocument.class);
    job3.setOutputKeyClass(Text.class);
    job3.setOutputValueClass(NutchDocument.class);
    JobClient.runJob(job3);
  }
  
  private void index(Path indexDir, String dictFile, 
      String nutchIndexDir, List<Path> segments) throws IOException {
    LOGGER.info("Starting index2");
    analyze(indexDir, dictFile, segments, TITLE_BOOST);
    flatten(indexDir, LABEL_CUTOFF_SCORE);
    merge(indexDir, nutchIndexDir);
    LOGGER.info("Indexer2: done");
  }

  public int run(String[] args) throws Exception {
    if (args.length < 4) {
      System.err.println(
        "Usage: Indexer2 <index> <dictfile> " + 
        "<nutch_index_dir> <segment> ...");
      return -1;
    }
    Path indexDir = new Path(args[0]);
    String dictFile = args[1];
    String nutchIndexDir = args[2];
    final List<Path> segments = new ArrayList<Path>();
    for (int i = 3; i < args.length; i++) {
      segments.add(new Path(args[i]));
    }
    try {
      index(indexDir, dictFile, nutchIndexDir, segments);
    } catch (Exception e) {
      LOGGER.fatal(e);
      return -1;
    }
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int result = ToolRunner.run(NutchConfiguration.create(), 
      new Indexer2(), args);
    System.exit(result);
  }
}

Compiling and Running

As I have mentioned in previous posts, I keep my development and runtime Nutch installations separate. The former lives under my home directory, and the latter lives in /opt/nutch-1.0. So I compile my code with "ant compile jar", and copy the resulting build/nutch-1.0.jar file over to /opt/nutch-1.0/lib.

One thing I haven't figured out - the default Nutch installation contained nutch-1.0.jar in the $NUTCH_HOME directory, ie, /opt/nutch-1.0. When I overwrote it with my custom nutch-1.0.jar file, bin/nutch could not find my new Indexer2 class. The CLASSPATH defined in bin/nutch does not include the $NUTCH_HOME/nutch-1.0.jar but does include $NUTCH_HOME/lib/*.jar, so I didn't think about it too much and just copied my custom JAR file into $NUTCH_HOME/lib, which worked fine.

My custom indexer is meant to be run after Nutch is done creating its default index. So after that was done, I ran my custom indexer with the following command:

1
2
3
4
5
6
7
sujit@sirocco:/opt/nutch-1.0$ export CRAWL_DIR=/home/sujit/tmp
sujit@sirocco:/opt/nutch-1.0$ bin/nutch \
  org.apache.nutch.indexer.custom.Indexer2 \
  $CRAWL_DIR/data/indexes2 \
  $CRAWL_DIR/dictionary.txt \
  $CRAWL_DIR/data/index \
  $CRAWL_DIR/data/segments/*

The logging output is written to $NUTCH_HOME/logs/hadoop.log, which looks like this. No surprises here, it simply reports that it does the three stages and completes.

1
2
3
4
5
2009-07-28 17:22:30,806 INFO  custom.Indexer2 - Starting index2
2009-07-28 17:22:30,843 INFO  custom.Indexer2 - Stage 1 (analyze)
2009-07-28 17:22:49,658 INFO  custom.Indexer2 - Stage 2 (flatten)
2009-07-28 17:22:57,996 INFO  custom.Indexer2 - Stage 3 (merge)
2009-07-28 17:23:04,921 INFO  custom.Indexer2 - Indexer2: done

Using the new Index

Unfortunately, testing the index is not just a matter of hitting it with Luke, since I want my results ordered by count. So I wrote this small test program in Jython to search the index with an additional Sort parameter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env /opt/jython-2.2.1/jython
# Source: src/main/scripts/python/index2_searcher.py
import string
import sys
import traceback
from org.apache.lucene.search import IndexSearcher, Sort, SortField, TermQuery
from org.apache.lucene.index import Term

def usage():
  print "Usage: %s path_to_index" % (sys.argv[0])

def runQuery(index, label):
  try:
    query = TermQuery(Term("label", label))
    sort = Sort(SortField("label_count", SortField.INT, True))
    hits = index.search(query, sort)
    numResults = hits.length()
    print "%s search results for label:[%s]" % (str(numResults), label)
    for i in range(0, numResults):
      doc = hits.doc(i)
      title = doc.get("title")
      count = doc.get("label_count")
      print "%s (%s)" % (title, count)
  except Exception:
    traceback.print_exc()

def main():
  if (len(sys.argv) != 2):
    usage()
    sys.exit(-1)
  index = IndexSearcher(sys.argv[1])
  try:
    while (True):
      label = raw_input("Enter label: ")
      if (len(string.rstrip(label)) == 0):
        break
      runQuery(index, label)
  finally:
    index.close()

if __name__ == "__main__":
  main()

Running this as shown below produces results like these. The number in parenthesis at the end of the title is the label_count value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Enter label: ror
4 search results for label:[ror]
Salmon Run: Book Reviews with Ruby On Rails (23)
Salmon Run: RoR style URLs with Spring MVC (16)
Salmon Run: First Steps with Pylons (7)
Salmon Run: Django : First Impressions (3)

Enter label: webservice
3 search results for label:[webservice]
Salmon Run: HTTP GET your Web Services here (14)
Salmon Run: SOAP Client for Amazon ECS with XFire (10)
Salmon Run: Spring Remoting Strategies compared (5)

Closing Thoughts

The code ran fine on my laptop, which is a non-distributed environment. Haven't run this on a distributed environment so far, so can't tell for sure, but I would probably need to make the following changes for that.

  1. Wrap the dictionary lookup logic inside a server, and pass in the URL of the service to the first Mapper. That way there is no more dependency on there being a dictionary.txt file on the local file system for all the slave nodes.
  2. Replace the Reducer in the third stage with an IdentityReducer, then use the output of the third Map-Reduce job to build the index on the master node, i.e. the index writing itself can be outside the Map-Reduce framework.

I should also look again at the contents of dictionary.txt, since it determines the quality of the results. I did the "manual enhancement" one weekend afternoon, and I was just trying to build something so I could feed it to my code, so it probably needs more work before I can think of using it this data as the basis for my Tag Cloud.