Sunday, April 28, 2013

scalingpipe - porting LingPipe tutorial examples to Scala


Recently, I was tasked with evaluating LingPipe for use in our NLP processing pipeline. I have looked at LingPipe before, but have generally kept away from it because of its licensing - while it is quite friendly to individual developers such as myself (as long as I share the results of my work, I can use LingPipe without any royalties), a lot of the stuff I do is motivated by problems at work, and LingPipe based solutions are only practical when the company is open to the licensing costs involved.

So anyway, in an attempt to kill two birds with one stone, I decided to work with the LingPipe tutorial, but with Scala. I figured that would allow me to pick up the LingPipe API as well as give me some additional experience in Scala coding. I looked around to see if anybody had done something similar and I came upon the scalingpipe project on GitHub where Alexy Khrabov had started with porting the Interesting Phrases tutorial example.

So I forked the project, and about a month later, I ended up building almost all the LingPipe tutorial demos in Scala. There are now 54 examples across all 19 categories of the tutorial. Read the project's README.md and the code for specifics. I now have some insights into LingPipe's capabilities, and a working knowledge of the API. Here is a link to my scalingpipe fork. I have sent Alexy a pull request (my first attempt at contributing to other people's projects on github!)

Initially, LingPipe code appeared more complex than it needed to be - perhaps because of the heavy use of the Visitor Pattern in the tagging/chunking code, where custom ObjectHandler.handle() methods are invoked through the framework. The Text Processing with LingPipe 4 book (aka the LingPipe Book) by Bob Carpenter and Breck Baldwin has a good explanation of this approach, as well as (a lot of) theoretical NLP and how they are implemented in LingPipe - if you end up using LingPipe for more than trivial stuff (or just copying/porting the examples), you should probably read the book.

Now that I have a reasonable understanding of LingPipe, I plan on re-reading the Building Search Applications: Lucene, LingPipe and Gate by Dr Manu Konchady, and porting some of the Lucene/LingPipe examples in there. I've had this book for a long time, and all three components described in the book are now at least two major versions behind whats currently available, but I believe there may be some powerful stuff in there.

Saturday, April 20, 2013

Language Model to detect Medical Sentences using NLTK


I've been thinking of ways of singling out medical sentences in a body of mixed text for special processing, and one of the approaches I thought of was to train a trigram (backoff) language model using some medical text, then use the model to detect if a sentence is medical or non-medical. The joint probability of the words appearing in the model should be higher for medical sentences than for non-medical ones.

I initially looked at NLTK's NgramModel, but unfortunately could not make it work because the Lidstone probability distribution I was passing to it as an estimator expected a minimum number of bins to be configured. Unfortunately I could not reproduce the error with small amounts of data (so I could submit a bug report). In any case, I also found that the NgramModel can't be pickled (because of a probability distribution function object in it), which made it even less interesting.

In any case, you can find this (non-working) code in my GitHub here. It crashes with a "ValueError - A Lidstone probability distribution must have at least one bin" error message during the testing phase. Unfortunately I can't share the data for licensing reasons. But hopefully, if you have a reasonably large set of XML files (I had about 3,500) to feed the code, it should hopefully fail at around the same place. [Update: I found a publicly available XML sample and I have asked about this on the nltk-users mailing list - you can follow the discussion here, if you'd like.]

However, it turns out that a trigram language model is quite simple to build, especially using NLTK's building blocks. My language model attempts to first report trigram probabilities, falling back to corresponding bigram and unigram probabilities, and finally reporting a Laplace smoothed estimate if the unigram probability is also 0. Probabilities at lower n-grams are discounted by a (heuristically chosen) value alpha, and the final result normalized by the number of words in the sentence (to remove the effect of long sentences). Because this is a proof of concept to test the validity of the idea more than anything else, I decided to skip the calculation of alpha.

Here is the code for the home grown language model described above (also available in my GitHub). The train() method reads in sentences from a bunch of medical XML files, and parses out the sentences. These sentences are then used to instantiate the LanguageModel class, which is then pickled. The test() method then unpickles the model and uses it to compute the log probabilities of sentence trigrams, finally normalizing it with the length of the sentence.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from __future__ import division

import math
import os.path

import cPickle
import glob
import nltk
from nltk.corpus.reader import XMLCorpusReader

class LangModel:
  def __init__(self, order, alpha, sentences):
    self.order = order
    self.alpha = alpha
    if order > 1:
      self.backoff = LangModel(order - 1, alpha, sentences)
      self.lexicon = None
    else:
      self.backoff = None
      self.n = 0
    self.ngramFD = nltk.FreqDist()
    lexicon = set()
    for sentence in sentences:
      words = nltk.word_tokenize(sentence)
      wordNGrams = nltk.ngrams(words, order)
      for wordNGram in wordNGrams:
        self.ngramFD.inc(wordNGram)
        if order == 1:
          lexicon.add(wordNGram)
          self.n += 1
    self.v = len(lexicon)

  def logprob(self, ngram):
    return math.log(self.prob(ngram))
  
  def prob(self, ngram):
    if self.backoff != None:
      freq = self.ngramFD[ngram]
      backoffFreq = self.backoff.ngramFD[ngram[1:]]
      if freq == 0:
        return self.alpha * self.backoff.prob(ngram[1:])
      else:
        return freq / backoffFreq
    else:
      # laplace smoothing to handle unknown unigrams
      return ((self.ngramFD[ngram] + 1) / (self.n + self.v))

def train():
  if os.path.isfile("lm.bin"):
    return
  files = glob.glob("data/*.xml")
  sentences = []
  i = 0
  for file in files:
    if i > 0 and i % 500 == 0:
      print("%d/%d files loaded, #-sentences: %d" %
        (i, len(files), len(sentences)))
    dir, file = file.split("/")
    reader = XMLCorpusReader(dir, file)
    sentences.extend(nltk.sent_tokenize(" ".join(reader.words())))
    i += 1
  lm = LangModel(3, 0.4, sentences)
  cPickle.dump(lm, open("lm.bin", "wb"))

def test():
  lm1 = cPickle.load(open("lm.bin", 'rb'))
  testFile = open("sentences.test", 'rb')
  for line in testFile:
    sentence = line.strip()
    print "SENTENCE:", sentence,
    words = nltk.word_tokenize(sentence)
    wordTrigrams = nltk.trigrams(words)
    slogprob = 0
    for wordTrigram in wordTrigrams:
      logprob = lm1.logprob(wordTrigram)
      slogprob += logprob
    print "(", slogprob / len(words), ")"

def main():
  train()
  test()

if __name__ == "__main__":
  main()

And here are the language model's predictions for a set of test sentences I pulled off the Internet (mainly Wikipedia).

  1. In biology, immunity is the state of having sufficient biological defences to avoid infection, disease, or other unwanted biological invasion. (-6.53506411778)
  2. Naturally acquired immunity occurs through contact with a disease causing agent, when the contact was not deliberate, whereas artificially acquired immunity develops only through deliberate actions such as vaccination. (-7.90563670519)
  3. Immunity from prosecution occurs when a prosecutor grants immunity, usually to a witness in exchange for testimony or production of other evidence. (-8.40420096533)
  4. Transactional immunity (colloquially known as "blanket" or "total" immunity) completely protects the witness from future prosecution for crimes related to his or her testimony. (-8.60917860675)
  5. Hearing loss is being partly or totally unable to hear sound in one or both ears. (-1.61661138183)
  6. Conductive hearing loss (CHL) occurs because of a mechanical problem in the outer or middle ear. (-1.98718543565)
  7. Sensorineural hearing loss (SNHL) occurs when the tiny hair cells (nerve endings) that detect sound in the ear are injured, diseased, do not work correctly, or have died. (-2.5566194904)
  8. This type of hearing loss often cannot be reversed. (-2.72710898378)
  9. In law, a hearing is a proceeding before a court or other decision-making body or officer, such as a government agency. (-5.87112753897)
  10. Within some criminal justice systems, a preliminary hearing (evidentiary hearing) is a proceeding, after a criminal complaint has been filed by the prosecutor, to determine whether there is enough evidence to require a trial. (-7.44050739024)

As you can see, sentences that are obviously medical tend to have a higher normalized log probability (the value at the end of the sentence) than sentences that are not. Sentences #1 and #2 are right on the border with normalized log probability comparable to non-medical sentences. Depending on the results of more tests, this model may or may not be good enough. Alternatively, it may be more effective to reframe the problem as one where we have classify a sentence as belonging to one of multiple genres, and each genre has a language model.

Anyway, thats all I have for today. Hope you found it interesting.


Wednesday, April 03, 2013

A Newspaper Clipping Service with Cascading


This post describes a possible implementation for an automated Newspaper Clipping Service. The end-user is a researcher (or team of researchers) in a particular discipline who registers an interest in a set of topics (or web-pages). An assistant (or team of assistants) then scour information sources to find more documents of interest to the researcher based on these topics identified. In this particular case, the information sources were limited to a set of "approved" newspapers, hence the name "Newspaper Clipping Service". The goal is to replace the assistants with an automated system.

The solution I came up with was to analyze the original web pages and treat keywords extracted out of these pages as topics, then for each keyword, query a popular search engine and gather the top 10 results from each query. The search engine can be customized so the sites it looks at is restricted by the list of approved newspapers. Finally the URLs of the results are aggregated together, and only URLs which were returned by more than 1 keyword topic are given back to the user.

The entire flow can be thought of as a series of Hadoop Map-Reduce jobs, to first download, extract and count keywords from (web pages corresponding to) URLs, and then to extract and count search result URLs from the keywords. I've been wanting to play with Cascading for a while, and this seemed like a good candidate, so the solution is implemented with Cascading.

I have used Scalding in the past, but it seems to me that while Scalding's collection-like interface is easier to work with, its harder to extend. So even though I think I could have done this in Scalding without any problems, the objective was to learn Cascading, so I used that instead. I initially started using Cascading with Scala (I write enough Java code at work :-)), but Cascading's use of generics (at least some of it) is too complicated for Scala's type inference system, so I fell back to using Java instead*.

One can write Cascading code in local mode, which uses in-memory data structures and the local filesystem, or in hadoop mode, which uses Hadoop and HDFS. Since this was a learning exercise, I decided to use local mode. To move it to Hadoop, one would have to use Hadoop specific FlowControllers and Taps instead. Here is the code for the Main (callable) class. The entire Maven project is available on my GitHub page.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Source: src/main/java/com/mycompany/newsclip/Main.java
package com.mycompany.newsclip;

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Aggregator;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;

public class Main {

  @SuppressWarnings("rawtypes")
  public static void main(String[] args) {
    // handle input parameters
    String input = args[0];
    String output = args[1];

    Fields urlFields = new Fields("num", "line");
    Tap iTap = new FileTap(new TextLine(urlFields), input);
    
    Fields kFields = new Fields("kword");
    Tap oTap = new FileTap(new TextLine(kFields), output);

    Pipe pipe = new Pipe("keyword");
    
    // read urls, download, clean and extract keywords (1:n)
    Function kFun = new KeywordExtractFunction(kFields);
    pipe = new Each(pipe, urlFields,  kFun);
    
    // group by word and count it
    pipe = new GroupBy(pipe, kFields);
    Aggregator kCount = new Count(new Fields("count"));
    pipe = new Every(pipe, kCount);
    
    // filter out words with count < 1
    Filter kCountFilter = new ExpressionFilter("$1 <= 1", Integer.class);
    pipe = new Each(pipe, kCountFilter);
    
    // pass the keywords to our custom google search
    Fields kcFields = new Fields("kword", "count");
    Fields uFields = new Fields("url");
    Function uFun = new UrlExtractFunction(uFields);
    pipe = new Each(pipe, kcFields, uFun);
    
    // group by url and count it
    pipe = new GroupBy(pipe, uFields);
    Aggregator uCount = new Count(new Fields("count"));
    pipe = new Every(pipe, uCount);
    
    // filter out urls that occur once
    Filter uCountFilter = new ExpressionFilter("$1 <= 1", Integer.class);
    pipe = new Each(pipe, uCountFilter);
    
    // remove the count value
    pipe = new Each(pipe, Fields.ALL, new Identity(), Fields.FIRST);
    
    FlowDef flowDef = FlowDef.flowDef().
      setName("newsclip").
      addSource(pipe, iTap).
      addTailSink(pipe,  oTap);
    
    Properties props = new Properties();
    AppProps.setApplicationJarClass(props, Main.class);
    FlowConnector flowConnector = new LocalFlowConnector(props);

    Flow flow = flowConnector.connect(flowDef);
    flow.writeDOT("data/newsclip.dot");
    flow.complete();
  }
}




The corresponding Graphviz DOT file for the assembly (generated by flow.writeDOT in the code above) is shown at left. I converted it to a web-displayable PNG file using the command "dot -Tpng newsclip.dot -o newsclip.png".

The code above uses built-in functions and filters where available, but the core operations are done by custom functions. The KeywordExtractFunctionTest extracts a set of keywords from a web page given its URL. It uses Boilerpipe to extract the relevant plain text from a web page, and my implementation of the RAKE algorithm to extract keywords from the plain text.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Source: src/main/java/com/mycompany/newsclip/KeywordExtractFunction.java
package com.mycompany.newsclip;

import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import de.l3s.boilerpipe.BoilerpipeProcessingException;
import de.l3s.boilerpipe.extractors.DefaultExtractor;

@SuppressWarnings("rawtypes")
public class KeywordExtractFunction extends BaseOperation 
    implements Function {

  private static final long serialVersionUID = -7122434545764806604L;
  private static final Logger LOGGER = 
    LoggerFactory.getLogger(KeywordExtractFunction.class);

  public KeywordExtractFunction(Fields fields) {
    super(2, fields);
  }
  
  @Override
  public void operate(FlowProcess flowProcess, FunctionCall funCall) {
    TupleEntry args = funCall.getArguments();
    String url = args.getString(1);
    String rawText = download(url);
    String plainText = parse(rawText);
    List<String> keywords = extractKeywords(plainText);
    for (String keyword : keywords) {
      Tuple t = new Tuple();
      t.add(keyword);
      funCall.getOutputCollector().add(t);
    }
  }

  protected String download(String url) {
    try {
      URL u = new URL(url);
      u.openConnection();
      InputStream istream = u.openStream();
      StringBuilder buf = new StringBuilder();
      byte[] b = new byte[1024];
      int bytesRead = 0;
      while ((bytesRead = istream.read(b)) > 0) {
        buf.append(new String(b, 0, bytesRead));
        b = new byte[1024];
      }
      istream.close();
      return buf.toString();
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return null;
    }
  }

  protected String parse(String rawText) {
    if (StringUtils.isEmpty(rawText)) return null;
    else {
      try {
        return DefaultExtractor.INSTANCE.getText(rawText);
      } catch (BoilerpipeProcessingException e) {
        LOGGER.error(e.getMessage(), e);
        return null;
      }
    }
  }

  protected List<String> extractKeywords(String plainText) {
    try {
      return RakeExtractor.INSTANCE.extract(plainText);
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return Collections.emptyList();
    }
  }
}

The other custom function is the UrlExtractFunction, which takes each keyword and hands it off to Google's Custom Search API, and returns the URLs of the top 10 search results returned. The Custom Search instance you set up can be customized to only allow results from a list of websites (or the entire web). The KEY and CX values are parameters that identify your client to the Google Search API, and you will need to populate a file with these values in src/main/resources/google.lic (the one in GitHub has placeholders).

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
// Source: src/main/java/com/mycompany/newsclip/UrlExtractFunction.java
package com.mycompany.newsclip;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

/**
 * Function to take a keyword and use Google's custom search
 * service to retrieve the top 10 URLs.
 */
@SuppressWarnings("rawtypes")
public class UrlExtractFunction extends BaseOperation implements Function {

  private static final long serialVersionUID = 1622228905563317614L;
  private static final Logger LOGGER = 
    LoggerFactory.getLogger(UrlExtractFunction.class);
  
  private static final String CUSTOM_SEARCH_URL_TEMPLATE =
    "https://www.googleapis.com/customsearch/v1?key={KEY}&cx={CX}&q={Q}";
  private String key;
  private String cx;
  private ObjectMapper objectMapper;
  
  public UrlExtractFunction(Fields fields) {
    super(2, fields);
    Properties props = new Properties();
    try {
      props.load(new FileInputStream("src/main/resources/google.lic"));
    } catch (IOException e) {
      LOGGER.error(e.getMessage(), e);
    }
    key = props.getProperty("key");
    cx = props.getProperty("cx");
    objectMapper = new ObjectMapper();
  }
  
  @Override
  public void operate(FlowProcess flowProcess, FunctionCall funCall) {
    TupleEntry args = funCall.getArguments();
    String keyword = args.getString(0);
    List<String> urls = parseSearchResult(keyword);
    for (String url : urls) {
      Tuple t = new Tuple();
      t.add(url);
      funCall.getOutputCollector().add(t);
    }
  }

  protected List<String> parseSearchResult(String keyword) {
    try {
      String url = CUSTOM_SEARCH_URL_TEMPLATE.
        replaceAll("{KEY}", key).
        replaceAll("{CX}", cx).
        replaceAll("{Q}", URLEncoder.encode(keyword, "UTF-8"));
      URL u = new URL(url);
      u.openConnection();
      InputStream istream = u.openStream();
      StringBuilder buf = new StringBuilder();
      byte[] b = new byte[1024];
      int bytesRead = 0;
      while ((bytesRead = istream.read(b)) > 0) {
        buf.append(new String(b, 0, bytesRead));
        b = new byte[1024];
      }
      istream.close();
      return parseJson(buf.toString());
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return Collections.emptyList();
    }
  }

  protected List<String> parseJson(String json) throws Exception {
    List<String> urls = new ArrayList<String>();
    JsonParser parser = objectMapper.getJsonFactory().
      createJsonParser(json);
    JsonNode root = objectMapper.readTree(parser);
    ArrayNode items = (ArrayNode) root.get("items");
    for (JsonNode item : items) {
      urls.add(item.get("link").getTextValue());
    }
    return urls;
  }
}

And thats pretty much it. Put the list of your "interesting pages", one per line, into data/urls.txt, and run the Cascading job locally using the mvn exec:java command, as shown below. The output of the job is written to data/new_urls.txt. The new data can be used to feed back URLs into the original list (perhaps after some sort of manual vetting by the researcher).

1
2
3
sujit@cyclone:cascading-newsclip$ mvn exec:java \
  -Dexec.mainClass="com.mycompany.newsclip.Main" \
  -Dexec.args="data/urls.txt data/new_urls.txt"

As you can see from the diagram, the Cascading code is running 11 Hadoop Map-Reduce jobs in sequence. This translates to a lot of Hadoop code. So Cascading, like Pig, is a huge time saver. Pig does allow Java UDFs, but I think Cascading's all-Java approach is easier to work with.

[*] Update 2013-04-16: I came across Tommy Chheng's post where he shows how to write Cascading code in Scala on this GitHub page. So great news, it appears that it may be possible to do this after all :-).