Monday, March 19, 2012

Writing Lucene Records to SequenceFiles on HDFS

I've been looking at using algorithms from the Apache Mahout project, with a view to applying them on the data in my Cassandra database created using Nutch/GORA, and I have come to the conclusion that while being able to (write and) run Map-Reduce jobs directly against Cassandra or Lucene is cool, for maximum flexibility its preferable to use files as intermediate storage.

Couple of reasons for this. First, most "boxed" algorithms such as those Mahout provides require a specific format for input, and its much easier to just convert the data to a file format rather than worry about how to interface it directly to the datastore in question. Second, being able to pull the data out and experiment with it "offline" is easier because there are fewer dependencies to worry about.

One such flat file format popular in the Hadoop world is the SequenceFile. I've been meaning to check it out for a while now, and recently, an opportunity presented itself, in the form of a very large (~400 million records) Lucene index for which I needed to build a language model.

To build the model, I needed to pull out all the text for titles, authors and content out of the Lucene index into a set of SequenceFiles. The Lucene index is on a regular (ie non-HDFS) filesystem, and I wanted to read the index and write out the text into a SequenceFile in HDFS. This post describes the code I built to do this.

Here is the code to generate the sequence file(s). The code is heavily adapted from the examples provided here and here. Because of the size of the index, and because I had access to a fairly large multi-CPU box, I decided to partition the job using a simple hashmod partitioning scheme and run the partitions using GNU Parallel.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package com.mycompany.myapp.train;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.MapFieldSelector;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CachingWrapperFilter;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryWrapperFilter;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.FSDirectory;

public class LuceneToSequenceFileGenerator {

  private static final int MAX_JOBS = 10;
  private static final int TITLE_WEIGHT = 8;
  private static final int AUTHOR_WEIGHT = 8;
  
  private String indexDir;
  private String seqfilesDir;
  private String hadoopDir;
  private int id;
  
  private void setIndexDir(String indexDir) {
    this.indexDir = indexDir;
  }

  private void setSequenceFilesDir(String seqfilesDir) {
    this.seqfilesDir = seqfilesDir;
  }

  private void setIndex(int id) {
    this.id = id;
  }
  
  private void setHadoopDir(String hadoopDir) {
    this.hadoopDir = hadoopDir;
  }
  
  private void generate() {
    IndexSearcher searcher = null;
    SequenceFile.Writer writer = null;
    try {
      Configuration conf = new Configuration();
      conf.addResource(new Path(FilenameUtils.concat(hadoopDir, 
        "conf/core-site.xml")));
      conf.addResource(new Path(FilenameUtils.concat(hadoopDir, 
        "conf/hdfs-site.xml")));
      FileSystem hdfs = FileSystem.get(conf);
      // check if path exists
      Path seqfilesPath = new Path(seqfilesDir);
      if (! hdfs.exists(seqfilesPath)) {
        usage("HDFS Directory " + seqfilesDir + " does not exist!");
        return;
      }
      // create writer based on the id passed in
      Path filename = new Path(FilenameUtils.concat(
        seqfilesDir, "indexpart-" + 
        StringUtils.leftPad(String.valueOf(id), 6, "0")));
      LongWritable key = new LongWritable();
      Text value = new Text();
      writer = SequenceFile.createWriter(
        hdfs, conf, filename, key.getClass(), value.getClass());
      // get the docids to work on from Lucene
      searcher = new IndexSearcher(FSDirectory.open(
        new File(indexDir)), true);
      FieldSelector selector = new MapFieldSelector(Arrays.asList(
        "title", "author", "body"));
      Query q = new MatchAllDocsQuery();
      Filter f = new CachingWrapperFilter(new QueryWrapperFilter(
        new TermQuery(new Term("filtername", "filtervalue"))));
      ScoreDoc[] hits = searcher.search(q, f, searcher.maxDoc()).scoreDocs;
      for (int i = 0; i < hits.length; i++) {
        int partition = i % MAX_JOBS;
        if (id != partition) {
          continue;
        }
        Document doc = searcher.doc(hits[i].doc, selector);
        String title = doc.get("title");
        String author = doc.get("author");
        String body = doc.get("body");
        key.set(Long.valueOf(i));
        value.set(constructValue(title, author, body));
        writer.append(key, value);
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      IOUtils.closeStream(writer);
      if (searcher != null) {
        try { searcher.close(); } 
        catch (IOException e) { e.printStackTrace(); }
      }
    }
  }

  private String constructValue(String title, String auth, String body) {
    StringBuilder buf = new StringBuilder();
    if (StringUtils.isNotEmpty(title)) {
      for (int i = 0; i < TITLE_WEIGHT; i++) {
        buf.append(title).append(" ");
      }
    }
    if (StringUtils.isNotEmpty(author)) {
      for (int i = 0; i < AUTHOR_WEIGHT; i++) {
        buf.append(author).append(" ");
      }
    }
    if (StringUtils.isNotEmpty(body)) {
      buf.append(body);
    }
    return buf.toString();
  }

  private static void usage(String error) {
    if (StringUtils.isNotEmpty(error)) {
      System.out.println("Error: " + error);
    }
    System.out.println("Usage: LuceneToSequenceFileConverter " +
      "index_dir seq_dir hadoop_dir id");
    System.out.println("where:");
    System.out.println("index_dir: non-HDFS path to Lucene index directory");
    System.out.println("seq_dir: HDFS path to sequence files directory");
    System.out.println("hadoop_dir: Base directory of hadoop installation");
    System.out.println("id: the integer id for this job");
  }
  
  public static void main(String[] args) {
    if (args.length != 4) {
      usage("Invalid number of arguments");
      return;
    }
    LuceneToSequenceFileGenerator generator = 
      new LuceneToSequenceFileGenerator();
    generator.setIndexDir(args[0]);
    generator.setSequenceFilesDir(args[1]);
    generator.setHadoopDir(args[2]);
    generator.setIndex(Integer.valueOf(args[3]));
    generator.generate();
  }
}

I then packaged the code into a JAR (its part of an existing application), and then built a shell script that sets the CLASSPATH (everything that the existing application needs as specified in the build.xml, plus the hadoop-core-1.0.1.jar and all the JARs in HADOOP_HOME/lib). To run it, I first create an empty directory in HDFS for this process:

1
2
hduser@bigmac:myapp$ /opt/hadoop-1.0.1/bin/hadoop fs -mkdir \
  /data/hadoop/myapp

Then I created a file called ids.txt in which I put in the numbers 0-10, one per line. This corresponds to the fourth argument to the shell script wrapper (lucene2seq) that is passed to it by GNU parallel. The argument serves as a way to determine a unique output filename, as well as to decide which instance will process a given Lucene document. Here is the shell script call.

1
2
hduser@bigmac:myapp$ cat ids.txt | parallel ./lucene2seq.sh \
  /path/to/my/index /data/hadoop/myapp /opt/hadoop-1.0.1

The next step is to use Hadoop and Map-Reduce to build the language model for this. Progress has been a bit slow on my personal experimentation front lately (What? Two weeks to come up with this? :-)), and is likely to remain so for the next couple of months. This is because I am taking the Stanford Online NLP course, and thats taking up a lot of my time. But on the bright side, its been very interesting so far, and I am learning quite a bit of stuff I didn't know (or didn't think to inquire about) before, so hopefully this will show up in the quality of my solutions going forward.

Saturday, March 03, 2012

Distributed Solr: Indexing and Searching

This post is not about SolrCloud. SolrCloud is going to be available in the upcoming Solr 4.x release, and renders a lot of the work described in this blog post obsolete. However, I am working with the latest released Solr version (3.5), and I needed to have a way to have Nutch index its contents onto a bank of Solr server shards, which I could then use to run distributed queries against.

Indexing

Distributed indexing can be achieved quite simply with Nutch by making some fairly minor changes to the SolrWriter and SolrIndexerReducer (in the NutchGora branch, I haven't looked at the trunk, so can't comment).

From the user-interface point of view, you specify a comma-separated list of Solr server URLs instead of a single one in the solrindexer job. Under the covers, the job starts up a list of Solr servers, each with its own document queue. A partitioner checks which server a document will go to based on its key. Each time an input queue becomes larger than a specified size (the commit interval), a commit is called on the appropriate Solr server. Once all the URLs are consumed, a commit is called on all the Solr servers in the list.

You can find my patch (for NutchGora branch only) in NUTCH-945. The discussion that led to this change can be found here.

I also put in the same change to my custom sub-page indexer (originally described here). The changes are only in the reducer, so I have removed the mapper code for brevity. You can get the mapper code from the previous post from the link referenced in this paragraph.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Source: src/java/com/mycompany/nutch/subpageindexer/SolrSubpageIndexerJob.java
package com.mycompany.nutch.subpageindexer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import org.apache.avro.util.Utf8;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.indexer.IndexerJob;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.indexer.solr.NonPartitioningPartitioner;
import org.apache.nutch.indexer.solr.SolrConstants;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.ToolUtil;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.DateUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

public class SolrSubpageIndexerJob extends IndexerJob {

  private static Log LOG = LogFactory.getLog(SolrSubpageIndexerJob.class);
  
  private static final Collection<WebPage.Field> FIELDS = 
    new HashSet<WebPage.Field>();
  
  static {
    FIELDS.addAll(Arrays.asList(WebPage.Field.values()));
  }
  
  public static class SolrSubpageIndexerJobMapper 
      extends GoraMapper<String,WebPage,Text,NutchDocument> {
    // ... no changes here ...
  }
  
  public static class SolrSubpageIndexerJobReducer
      extends Reducer<Text,NutchDocument,Text,NutchDocument> {
   
    private int commitSize;
    private SolrServer[] servers;
    private Partitioner<String,NutchDocument> partitioner;
    private List<SolrInputDocument>[] sdocs = null; 
    
    @SuppressWarnings("unchecked")
    @Override
    public void setup(Context ctx) throws IOException {
      Configuration conf = ctx.getConfiguration();
      String[] urls = conf.getStrings(SolrConstants.SERVER_URL);
      if (urls.length == 0) {
        throw new IOException(SolrConstants.SERVER_URL + " not configured");
      }
      this.servers = new SolrServer[urls.length];
      this.sdocs = (ArrayList<SolrInputDocument>[]) 
        new ArrayList[urls.length];
      for (int i = 0; i < urls.length; i++) {
        servers[i] = new CommonsHttpSolrServer(urls[i]);
        sdocs[i] = new ArrayList<SolrInputDocument>();
      }
      commitSize = conf.getInt(SolrConstants.COMMIT_SIZE, 1000);
      if (urls.length == 1) {
        partitioner = new NonPartitioningPartitioner();
      } else {
        try {
          String partitionerClass = conf.get(SolrConstants.PARTITIONER_CLASS);
          partitioner = (Partitioner<String,NutchDocument>) 
            Class.forName(partitionerClass).newInstance();
          LOG.info("Partitioning using: " + partitionerClass);
        } catch (Exception e) {
          partitioner = new HashPartitioner<String, NutchDocument>();
          LOG.info("Partitioning using default HashMod partitioner");
        }
      }
      this.commitSize = conf.getInt(SolrConstants.COMMIT_SIZE, 1000);
    }
    
    @Override
    public void reduce(Text key, Iterable<NutchDocument> values,
        Context ctx) throws IOException, InterruptedException {
      for (NutchDocument doc : values) {
        SolrInputDocument sdoc = new SolrInputDocument();
        for (String fieldname : doc.getFieldNames()) {
          sdoc.addField(fieldname, doc.getFieldValue(fieldname));
        }
        int partition = partitioner.getPartition(
          key.toString(), doc, sdocs.length);
        sdocs[partition].add(sdoc);
        if (sdocs[partition].size() >= commitSize) {
          try {
            servers[partition].add(sdocs[partition]);
          } catch (SolrServerException e) {
            throw new IOException(e);
          }
          sdocs[partition].clear();
        }
      }
    }
    
    @Override
    public void cleanup(Context ctx) throws IOException {
      for (int i = 0; i < sdocs.length; i++) {
        try {
          if (sdocs[i].size() > 0) {
            servers[i].add(sdocs[i]);
          }
          sdocs[i].clear();
          servers[i].commit();
        } catch (SolrServerException e) {
          throw new IOException(e);
        }
      }
    }
  }
  
  @Override
  public Map<String,Object> run(Map<String,Object> args) throws Exception {
    String solrUrl = (String) args.get(SolrConstants.SERVER_URL);
    if (StringUtils.isNotEmpty(solrUrl)) {
      getConf().set(SolrConstants.SERVER_URL, solrUrl);
    }
    String batchId = (String) args.get(Nutch.ARG_BATCH);
    if (StringUtils.isNotEmpty(batchId)) {
      getConf().set(Nutch.ARG_BATCH, batchId);
    }
    currentJob = new NutchJob(getConf(), "solr-subpage-index");
    StorageUtils.initMapperJob(currentJob, FIELDS, Text.class, 
      NutchDocument.class, SolrSubpageIndexerJobMapper.class);
    currentJob.setMapOutputKeyClass(Text.class);
    currentJob.setMapOutputValueClass(NutchDocument.class);
    currentJob.setReducerClass(SolrSubpageIndexerJobReducer.class);
    currentJob.setNumReduceTasks(5);
    currentJob.waitForCompletion(true);
    ToolUtil.recordJobStatus(null, currentJob, results);
    return results;
  }

  @Override
  public int run(String[] args) throws Exception {
    if (args.length < 2) {
      System.err.println("Usage: SolrSubpageIndexerJob <solr url> (<batch_id> | -all)");
      return -1;
    }
    LOG.info("SolrSubpageIndexerJob: starting");
    run(ToolUtil.toArgMap(
      SolrConstants.SERVER_URL, args[0],
      Nutch.ARG_BATCH, args[1]));
    LOG.info("SolrSubpageIndexerJob: success");
    return 0;
  }

  public static void main(String[] args) throws Exception {
    final int res = ToolRunner.run(NutchConfiguration.create(), 
      new SolrSubpageIndexerJob(), args);
    System.exit(res);
  }
}

If you want to specify your own custom partitioner, then you will need to define it in your nutch-site.xml file. Here is an example from mine:

1
2
3
4
5
<property>
  <name>solr.partitioner.class</name>
  <value>com.mycompany.nutch.indexer.solr.MurmurHashPartitioner</value>
  <description>Custom partitioner for distributed Solr index</description>
</property>

I set up clones of Solr by copying my non-distributed Solr server bin distribution directory (with the nutch version of schema.xml, updated as described in previous posts), deleting the contents of the data directory, and running each on their own ports, like so:

1
2
3
4
5
6
7
sujit@cyclone:NutchGora$ # On one terminal
sujit@cyclone:NutchGora$ cd solr1/example
sujit@cyclone:example$ java -Djetty.port=8984 -jar start.jar
sujit@cyclone:example$ 
sujit@cyclone:NutchGora$ # On another terminal
sujit@cyclone:NutchGora$ cd solr2/example
sujit@cyclone:example$ java -Djetty.port=8985 -jar start.jar

Once you apply the batch and build the runtime, you can run the solrindexer and SolrSubpageIndexer jobs from the command line like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
sujit@cyclone:local$ # indexing single-solr mode to port 8983
sujit@cyclone:local$ bin/nutch solrindex http://localhost:8983/solr -all
sujit@cyclone:local$ 
sujit@cyclone:local$ # indexing distributed-solr mode to ports 8984/8985
sujit@cyclone:local$ bin/nutch solrindex 
      http://localhost:8984/solr,http://localhost:8985/solr -all
sujit@cyclone:local$ 
sujit@cyclone:local$ # indexing subpages single-solr mode to port 8983
sujit@cyclone:local$ bin/nutch \
      com.mycompany.nutch.subpageindexer.SolrSubpageIndexerJob \
      http://localhost:8983/solr -all
sujit@cyclone:local$ 
sujit@cyclone:local$ # indexing subpages distrib-solr mode to ports 8984/8985
sujit@cyclone:local$ bin/nutch \
      com.mycompany.nutch.subpageindexer.SolrSubpageIndexerJob \
      http://localhost:8984/solr,http://localhost:8985/solr -all

I have intentionally shown the commands for the single-solr indexing version to illustrate that the change is fully backward compatible, and also because I wanted to compare the search results between the non-distributed (port 8983) and distributed (ports 8984 and 8985) environments.

Search

Solr (version 3.5) which I am using supports distributed search via sharding out of the box. The Solr Distributed Search wiki page has more information about it. But to enable distributed search on a query (provided its handler is not using any of the unsupported components), is as easy as adding a shards parameter to your URL, which contains a comma-separated list of Solr servers. In my setup, my shards parameter would look like shards=localhost:8984/solr,localhost:8985/solr.

To support sharding in my Python client (described here), all I needed to do was declare my SOLR_SHARDS value and add the shards to my solrparams tuple list (around line 141), so its passed back to Solr. Also since I am pointing to (8984,8985) now, my query has to hit one of these servers instead of 8983 (hardcoded in SOLR_SERVER) so that should be changed too.

1
2
3
4
5
    SOLR_SERVER = "http://localhost:8984/solr/select"
    SOLR_SHARDS = ["localhost:8984/solr", "localhost:8985/solr"]
    ...
    # finally, add the shards parameters
    solrparams.append(tuple(["shards", ",".join(SOLR_SHARDS)]))

I set up two copies of my CherryPy based client applications, one running against the single-Solr instance on port 8983 and listening on port 8081, and another one running against the distributed Solr instances on ports 8984 and 8985 and listening on port 8082, and compared results from sending the same query to both applications. Below are some screenshots - as you can see, results are identical (which is expected, of course).

From what I see from the logs, the response handler that is invoked with the sharded query (/select on port 8984 in our case), intercepts the shards parameter and forwards the query to each shard, with the shards parameter replaced with isShard=true. Once it gets back all the responses, it joins them back and presents it back to the caller.