Wednesday, August 28, 2013

Sentence Genre Classification using Scikit-Learn Linear SVC


To satisfy the (optional) real-world project requirement for my Introduction to Data Science class on Coursera, I built a classifier that could differentiate between a sentence from the medical versus the legal domain. It was based on interpolated trigram language models built out of training sets for both genres, and an unseen sentence was classified based on its probability of being part of one language model or the other. You can find the full report and the associated code on my github page here.

The data consisted of 950,887 medical sentences and 837,393 legal sentences. 2,000 sentences (1,000 each from medical and legal) were used to test the classifier. The overall accuracy of 92.7%, which was good enough for our (real-world business) purposes. However, it got me wondering whether I could get comparable results by using a simpler, more mainstream approach. After all, we could just treat this as a simple text classification problem, with each sentence being an instance and each word in the sentence being a feature. So thats what I did - this post describes that effort.

Our training data comes from selected volumes of the Gale Encyclopedia of Medicine for the medical content, and the UCI Machine Learning Repository Legal Case Reports Dataset for the legal content. Both are in XML format, so our first task is to parse these files and convert them to a flat file of sentences, one sentence per line. Here is some code to do that.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# -*- coding: utf-8 -*-
# Source: preprocess.py
# Code to convert from XML format to a file of sentences for
# each genre, one sentence per line.
from __future__ import division
import glob
import nltk
import re
import unicodedata
from xml.dom.minidom import Node
from xml.dom.minidom import parseString

def medical_plaintext(fn):
  print "processing", fn
  if not (fn.startswith("data/medical/eph_") or
      fn.startswith("data/medical/gemd_") or
      fn.startswith("data/medical/gesu_") or
      fn.startswith("data/medical/gea2_") or
      fn.startswith("data/medical/gem_") or
      fn.startswith("data/medical/gech_") or
      fn.startswith("data/medical/geca_") or
      fn.startswith("data/medical/gecd_") or
      fn.startswith("data/medical/gegd_") or
      fn.startswith("data/medical/gend_") or
      fn.startswith("data/medical/gec_") or
      fn.startswith("data/medical/genh_") or
      fn.startswith("data/medical/nwaz_")):
    return ""
  file = open(fn, 'rb')
  data = file.read()
  file.close()
  # remove gale: namespace from attributes
  data = re.sub("gale:", "", data)
  dom = parseString(data)
  text = ""
  paragraphs = dom.getElementsByTagName("p")
  for paragraph in paragraphs:
    xml = paragraph.toxml()
    xml = re.sub("\n", " ", xml)
    xml = re.sub("<.*?>", "", xml)
    text = text + " " + xml
  text = re.sub("\\s+", " ", text)
  text = text.strip()
  text = text.encode("ascii", "ignore")
  return text

def legal_plaintext(fn):
  print "processing", fn
  file = open(fn, 'rb')
  data = file.read()
  data = re.sub("&eacute;", "e", data)
  data = re.sub("&aacute;", "a", data)
  data = re.sub("&yacute;", "y", data)
  data = re.sub("&nbsp;", " ", data)
  data = re.sub("&tm;", "(TM)", data)
  data = re.sub("&reg;", "(R)", data)
  data = re.sub("&agrave;", "a", data)
  data = re.sub("&egrave;", "e", data)
  data = re.sub("&igrave", "i", data)
  data = re.sub("&ecirc;", "e", data)
  data = re.sub("&ocirc;", "o", data)
  data = re.sub("&icirc;", "i", data)
  data = re.sub("&ccedil;", "c", data)
  data = re.sub("&amp;", "and", data)
  data = re.sub("&auml;", "a", data)
  data = re.sub("&szlig;", "ss", data)
  data = re.sub("&aelig;", "e", data)
  data = re.sub("&iuml;", "i", data)
  data = re.sub("&euml;", "e", data)
  data = re.sub("&ouml;", "o", data)
  data = re.sub("&uuml;", "u", data)
  data = re.sub("&acirc;", "a", data)
  data = re.sub("&oslash;", "o", data)
  data = re.sub("&ntilde;", "n", data)
  data = re.sub("&Eacute;", "E", data)
  data = re.sub("&Aring;", "A", data)
  data = re.sub("&Ouml;", "O", data)
  data = unicodedata.normalize("NFKD",
    unicode(data, 'iso-8859-1')).encode("ascii", "ignore")
  # fix "id=xxx" pattern, causes XML parsing to fail
  data = re.sub("\"id=", "id=\"", data)
  file.close()
  text = ""
  dom = parseString(data)
  sentencesEl = dom.getElementsByTagName("sentences")[0]
  for sentenceEl in sentencesEl.childNodes:
    if sentenceEl.nodeType == Node.ELEMENT_NODE:
      stext = sentenceEl.firstChild.data
      if len(stext.strip()) == 0:
        continue
      text = text + " " + re.sub("\n", " ", stext)
  text = re.sub("\\s+", " ", text)
  text = text.strip()
  text = text.encode("ascii", "ignore")
  return text

def parse_to_plaintext(dirs, labels, funcs, sent_file, label_file):
  fsent = open(sent_file, 'wb')
  flabs = open(label_file, 'wb')
  idx = 0
  for dir in dirs:
    files = glob.glob("/".join([dir, "*.xml"]))
    for file in files:
      text = funcs[idx](file)
      if len(text.strip()) > 0:
        for sentence in nltk.sent_tokenize(text):
          fsent.write("%s\n" % sentence)
          flabs.write("%d\n" % labels[idx])
    idx += 1
  fsent.close()
  flabs.close()

def main():
  parse_to_plaintext(["data/medical", "data/legal"],
    [1, 0], [medical_plaintext, legal_plaintext],
    "data/sentences.txt", "data/labels.txt")

if __name__ == "__main__":
  main()

The code just reads the two directories full of medical and legal XML files, and writes out the sentences one per line into a file called sentences.txt. Parallelly it also writes out a 1 or 0 to another file labels.txt depending on whether the input file being read is from the medical or legal corpus. The code is largely similar to that for my previous classifier, except that I write out a single file of sentences. This is so I can more easily use Scikit-learn's text API to vectorize the sentences, as described below.

I construct a pipeline of a CountVectorizer to count words, eliminating English stopwords and lowercasing the input. This count vector is then passed to the TfidfTransformer which converts the count vector to a TF-IDF vector, which is the X (feature) vector for our classification algorithm. I use L2 normalization to scale the vector. The outcome vector is read off the labels.txt file with np.loadtxt().

The X and y vectors are then fed into Scikit-Learn's Linear Support Vector Classifier (SVC) algorithm. LinearSVC is a popular classifier for text, since the number of features tend to be quite large in text classification problems. Although it is generally advisable to use L1 loss function, I got very good results (97% accuracy) with L2 during my 10-fold cross validation phase. This was with simply using individual words as features. I did try to use bigrams and trigrams along with single word features, capping the maximum number of features to the 10,000 most frequent, but the program took a long time and I eventually killed it.

Here is the code that wraps the classifier. The code for cross validation is triggered by passing in an argument "xval". Passing in an argument "run" will split the input data (our list of sentences and labels) to be split 90%/10% for training/test. A model is then created and persisted with the training set, and the model evaluated against the training set. We then run the model against the testing set and evaluate the results.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# Source: classify.py
from __future__ import division

import sys

import cPickle as pickle
import datetime
import numpy as np
from sklearn.cross_validation import KFold
from sklearn.cross_validation import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.svm import LinearSVC

# total number of sentences (combined)
NTOTAL = 1788280

def generate_xy(texts, labels):
  ftext = open(texts, 'rb')
  pipeline = Pipeline([
    ("count", CountVectorizer(stop_words='english', min_df=0.0,
              binary=False)),
    ("tfidf", TfidfTransformer(norm="l2"))
  ])
  X = pipeline.fit_transform(ftext)
  ftext.close()
  flabel = open(labels, 'rb')
  y = np.loadtxt(flabel)
  flabel.close()
  return X, y

def crossvalidate_model(X, y, nfolds):
  kfold = KFold(X.shape[0], n_folds=nfolds)
  avg_accuracy = 0
  for train, test in kfold:
    Xtrain, Xtest, ytrain, ytest = X[train], X[test], y[train], y[test]
    clf = LinearSVC()
    clf.fit(Xtrain, ytrain)
    ypred = clf.predict(Xtest)
    accuracy = accuracy_score(ytest, ypred)
    print "...accuracy = ", accuracy
    avg_accuracy += accuracy
  print "Average Accuracy: ", (avg_accuracy / nfolds)

def train_model(X, y, binmodel):
  model = LinearSVC()
  model.fit(X, y)
  # reports
  ypred = model.predict(X)
  print "Confusion Matrix (Train):"
  print confusion_matrix(y, ypred)
  print "Classification Report (Train)"
  print classification_report(y, ypred)
  pickle.dump(model, open(binmodel, 'wb'))

def test_model(X, y, binmodel):
  model = pickle.load(open(binmodel, 'rb'))
  if y is not None:
    # reports
    ypred = model.predict(X)
    print "Confusion Matrix (Test)"
    print confusion_matrix(y, ypred)
    print "Classification Report (Test)"
    print classification_report(y, ypred)

def print_timestamp(message):
  print message, datetime.datetime.now()

def usage():
  print "Usage: python classify.py [xval|test|train]"
  sys.exit(-1)
  
def main():
  if len(sys.argv) != 2:
    usage()
  print_timestamp("started:")
  X, y = generate_xy("data/sentences.txt", "data/labels.txt")
  if sys.argv[1] == "xval":
    crossvalidate_model(X, y, 10)
  elif sys.argv[1] == "run":
    Xtrain, Xtest, ytrain, ytest = train_test_split(X, y,
      test_size=0.1, random_state=42)
    train_model(Xtrain, ytrain, "data/model.bin")
    test_model(Xtest, ytest, "data/model.bin")
  else:
    usage()
  print_timestamp("finished:")
  
if __name__ == "__main__":
  main()

The output of our cross validation looks like this:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
sujit@cyclone:medorleg2$ python classify.py xval
started: 2013-08-28 20:37:35.097280
...accuracy =  0.938426868276
...accuracy =  0.974534189277
...accuracy =  0.989134811103
...accuracy =  0.98005345919
...accuracy =  0.970250743731
...accuracy =  0.972509897779
...accuracy =  0.971810902096
...accuracy =  0.972672064777
...accuracy =  0.96800836558
...accuracy =  0.976105531572
Average Accuracy:  0.971350683338
finished: 2013-08-28 20:41:35.281316
sujit@cyclone:medorleg2$ 

And the output of the run (train then test) looks like this (the data from the confusion matrix has been prettified a bit).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
sujit@cyclone:medorleg2$ python classify.py run
started: 2013-08-28 21:25:20.398061

Confusion Matrix (Train):
                     0      1
          0     745509   7931
          1       7989 848023

Classification Report (Train)
             precision    recall  f1-score   support

          0       0.99      0.99      0.99    753440
          1       0.99      0.99      0.99    856012

avg / total       0.99      0.99      0.99   1609452

Confusion Matrix (Test)
                     0      1
          0      82686   1267
          1       1482  93393

Classification Report (Test)
             precision    recall  f1-score   support

          0       0.98      0.98      0.98     83953
          1       0.99      0.98      0.99     94875

avg / total       0.98      0.98      0.98    178828

finished: 2013-08-28 21:28:02.311399

As you can see, the accuracy of the classifier with the unseen test set is 0.98, which is better than the language model based classifier. The solution is also simpler and needs less explanation since it depends on well-known algorithms which have been developed and implemented by machine learning experts.

As before, I cannot provide the medical data since it is a non-free dataset, but the code for the two Python programs described in this post can be found on github here.

Thursday, August 22, 2013

Degrees of Separation from Kevin Bacon using Cascading


Motivation


This post came about as a result of two events. First, I finished reading Paco Nathan's "Enterprise Data Workflows with Cascading" book (see my review on Amazon), and second, I started learning about the Enterprise Control Language (ECL) on the Lexis-Nexis High Performance Computing Cluster (HPCC). ECL is a bit like Pig which is a bit like Cascading, and one of the examples in the ECL tutorial was the Kevin Bacon Six Degrees of Separation problem. So I decided to try to build the example with Cascading, both as a way to get some experience with the Cascading API and as a comparison with the ECL solution.

Data Loading


The input to the problem is a set of (actor, movie) tuples from the IMDB database. This document (PDF) on the HPCC site contains links to FTP sites from which you can download the actors and actresses files, from which you can derive the (actor, movie) tuples using the following Java code.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Source: src/main/java/com/mycompany/kevinbacon/load/Imdb2Csv.java
package com.mycompany.kevinbacon.load;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.util.StringUtils;

public class Imdb2Csv {
  
  public static void main(String[] args) {
    
    try {
      Pattern actorPattern = Pattern.compile(
        "(.*?)\\s\\(.*?\\)"); 
      Pattern moviePattern = 
        Pattern.compile("(.*?)\\s\\(\\d{4}\\).*$");
      String[] inputs = new String[] {
        "data/landing/actors.list",
        "data/landing/actresses.list"
      };
      PrintWriter output = new PrintWriter(
        new FileWriter("data/input/actor-movie.csv"), true);
      for (String input : inputs) {
        boolean header = true;
        boolean data = false;
        boolean footer = false;
        String actor = null;
        String movie = null;
        BufferedReader reader = new BufferedReader(
          new FileReader(new File(input)));
        String line = null;
        while ((line = reader.readLine()) != null) {
          // loop through lines until we hit this pattern
          // Name\tTitles
          // ----\t-------
          if (line.startsWith("----\t")) header = false;
          // skip the footer, it occurs after a long 40 dash
          // or so standalone line (pattern below works if
          // you are already in the data area).
          if (data && line.startsWith("--------------------")) 
            footer = true;
          if (! header && ! footer) {
            data = true;
            if (line.trim().length() > 0 && ! line.startsWith("----\t")) {
              String[] cols = line.replaceAll("\t+", "\t").split("\t");
              if (! line.startsWith("\t")) {
                Matcher ma = actorPattern.matcher(cols[0]);
                if (ma.matches()) actor = ma.group(1);
                Matcher mm = moviePattern.matcher(cols[1]);
                if (mm.matches()) movie = mm.group(1);
              } else {
                Matcher mm = moviePattern.matcher(cols[1]);
                if (mm.matches()) movie = mm.group(1);
              }
              // if line contains non-ascii chars, skip this line
              // the reasoning is that this is perhaps non-English
              // movie which we don't care about.
              if (isNonAscii(actor) || isNonAscii(movie)) continue;
              if (actor != null && movie != null)
                output.println(dequote(actor) + "\t" + dequote(movie));
            }
          }
        }
        reader.close();
      }
      output.flush();
      output.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  
  private static boolean isNonAscii(String s) {
    if (s == null) return true;
    char[] cs = s.toCharArray();
    for (int i = 0; i < cs.length; i++) {
      if (cs[i] > 127) return true;
    }
    return false;
  }

  private static String dequote(String s) {
    String st = s.trim();
    if (st.startsWith("\"") && st.endsWith("\"")) 
      return st.substring(1, st.length()-1);
    else return st;
  }
}

For this project, I decided to use Gradle, a relatively new (at least to me) build system that uses Groovy as the underlying language (as opposed to XML for Maven and Ant, or Scala for SBT). The decision was in part driven by the fact that Cascading devs use Gradle, and consequently their solutions for build level problems are also usually Gradle based, so I figured this would make my learning curve easier. In reality, I ended up spending quite some time wrestling with Gradle, but I think I now know enough about Gradle to get by. In any case, to run the above code using Gradle, you would need to put your IMDB files under data/landing and run the following command to get the (actor, movie) tuples file in data/inputs/actor-movie.csv.

1
2
sujit@cyclone:cascading-kevinbacon$ gradle run \
    -DmainClass=com.mycompany.kevinbacon.load.Imdb2Csv

Gradle Build File


Here is my Gradle build file, adapted from examples in the Cascading book, the Cascading for the Impatient series, and a lot of Googling. I can use this to build my Eclipse .classpath file, compile, run JUnit tests, and build a fat JAR that can be used both locally as well as on Amazon's Elastic Map Reduce (EMR) platform. It is included here as an example of a fully functioning build file (at least for my purposes).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Source: build.gradle
apply plugin: "java"
apply plugin: "idea"
apply plugin: "eclipse"
apply plugin: "application"

mainClassName = System.getProperty("mainClass")

archivesBaseName = "kevinbacon"

repositories {
  mavenLocal()
  mavenCentral()
  mavenRepo name: "conjars", url: "http://conjars.org/repo/"
}

configurations {
  provided
  compile.extendsFrom provided
}

ext.cascadingVer = "2.1.6"

dependencies {
  compile(group: "cascading", name: "cascading-core", version: cascadingVer)
  compile(group: "cascading", name: "cascading-local", version: cascadingVer)
  compile(group: "cascading", name: "cascading-hadoop", version: cascadingVer)
  provided(group: "org.apache.hadoop", name: "hadoop-core", version: "1.0.3")
  testCompile(group: "cascading", name: "cascading-platform", 
              version: cascadingVer)
  testCompile("org.apache.hadoop:hadoop-test:1.0.3")
  testCompile("cascading:cascading-test:2.0.8")
  testCompile("junit:junit:4.8.+")
  testCompile("org.slf4j:slf4j-api:1.7.2")
  testCompile("commons-io:commons-io:2.1")
  testRuntime("org.slf4j:slf4j-log4j12:1.7.2")
  testRuntime("log4j:log4j:1.2.16")
}

test {
  testLogging.showStandardStreams = true
  beforeTest {
    descriptor -> logger.lifecycle("Running test: " + descriptor)
  }
  onOutput {
    descriptor, event ->
      logger.lifecycle("Test " + descriptor + " produced error: " + 
        event.message)
  }
}

jar {
  description = "Assembles a JAR file"
  from {
    (configurations.runtime - configurations.provided).collect {
      it.isDirectory() ? it : zipTree(it)
    }
  }
  {
    exclude "META-INF/*.SF"
    exclude "META-INF/*.DSA"
    exclude "META-INF/*.RSA"
  }
  manifest {
    attributes("Main-Class": "com.mycompany.kevinbacon.flow.Main")
  }
}

The Cascading Flow



The DOT file generated by the Cascading flow planner is shown on left. Essentially, the job consists of 7 iterations - at each iteration, the input is the (actor, movie) tuple collection and the Kevin Bacon costars from the previous degree of separation. For example, at the first iteration, we are looking for direct costars of Kevin Bacon (0 degrees of separation), so our costars are the tuple containing Kevin Bacon. We join against the (actor, movie) tuple to find all movies where Kevin Bacon worked in, then join the (actor, movie) tuple set against the (movie) tuple set to find the list of Kevin Bacon costars. We also annotate each costar with the current degree of separation. In the next iteration, this set of costar tuples are used to find the costars of the costars, and so on. Finally, the costar tuples from all 7 iterations are merged and grouped so the minimum degree of separation is stored against each costar. We then group this tuple on the "Bacon number" to find a count of costars at each degree of separation.

The code below represents the Cascading flow depicted in the diagrams above. It translates to 18 Hadoop MapReduce jobs.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Source: src/main/java/com/mycompany/kevinbacon/flow/Main.java
package com.mycompany.kevinbacon.flow;

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.Filter;
import cascading.operation.Identity;
import cascading.operation.aggregator.Min;
import cascading.operation.expression.ExpressionFilter;
import cascading.operation.filter.Not;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.CountBy;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.pipe.assembly.Unique;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

public class Main {
  
  public static void main(String[] args) {

    String input = args[0];
    String detailOutput = args[1];
    String summaryOutput = args[2];
    
    Tap<?,?,?> tin = new Hfs(new TextDelimited(
      Constants.inputFields, false, false, "\t"), input);
    Tap<?,?,?> toutDetail = new Hfs(new TextDelimited(
      Constants.detailFields, false, false, "\t"), detailOutput);
    Tap<?,?,?> toutSummary = new Hfs(new TextDelimited(
      Constants.summaryFields, false, false, "\t"), summaryOutput);
    
    Pipe allPairs = new Pipe("allPairs");
    
    // create a pipe with only Kevin Bacon
    Pipe kevinBacon = new Pipe("kevinBacon", allPairs);
    Filter<?> kevinBaconFilter = new ExpressionFilter(
      "! actor.equals(\"Bacon, Kevin\")", String.class);
    kevinBacon = new Each(kevinBacon, kevinBaconFilter);
    kevinBacon = new Retain(kevinBacon, Constants.actorField);
    kevinBacon = new Unique(kevinBacon, Constants.actorField);
    
    // At each degree of separation, find the costars of 
    // actors in the actor pipe (second arg to FindCostars)
    // by joining on actor to find movies, then joining on
    // movie to find costars.
    Pipe kevinBaconCostars0 = new FindCostars(
      allPairs, kevinBacon, 0);
      
    Pipe kevinBaconCostars1 = new FindCostars(
      allPairs, kevinBaconCostars0, 1);
      
    Pipe kevinBaconCostars2 = new FindCostars(
      allPairs, kevinBaconCostars1, 2);
      
    Pipe kevinBaconCostars3 = new FindCostars(
      allPairs, kevinBaconCostars2, 3);
      
    Pipe kevinBaconCostars4 = new FindCostars(
      allPairs, kevinBaconCostars3, 4);
      
    Pipe kevinBaconCostars5 = new FindCostars(
      allPairs, kevinBaconCostars4, 5);
      
    Pipe kevinBaconCostars6 = new FindCostars(
      allPairs, kevinBaconCostars5, 6);

    // merge pipes together, then filter out Kevin Bacon, 
    // group by actors and choose the minimum Bacon number
    // for each actor, and finally rename the min column to
    // count.
    Pipe merged = new Merge("merged", Pipe.pipes(
      kevinBaconCostars0, kevinBaconCostars1, kevinBaconCostars3,
      kevinBaconCostars4, kevinBaconCostars5, kevinBaconCostars6));
    merged = new Each(merged, new Not(kevinBaconFilter));
    merged = new GroupBy(merged, Constants.actorField);
    merged = new Every(merged, Constants.kbnumField, new Min());
    merged = new Rename(merged, new Fields("min"), Constants.kbnumField);
    
    // split the merged pipe into detail and summary pipes.
    // This is needed to avoid "duplicate pipe" errors from
    // Cascading when trying to set two tail sinks. The 
    // merged pipe already contains the information needed
    // for the detail pipe, and the summary pipe needs a bit
    // more processing, detailed in the comment block below.
    Pipe details = new Pipe("details", merged);
    details = new Each(details, new Identity());
    
    // generate summary stats - retain only the bacon number
    // column and group by it and count the number of costars
    // in each bacon number group.
    Pipe summary = new Pipe("summary", merged);
    summary = new Retain(summary, Constants.kbnumField);
    summary = new CountBy(summary, 
      Constants.kbnumField, Constants.countField);
    
    FlowDef fd = FlowDef.flowDef().
      addSource(allPairs, tin).
      addTailSink(details, toutDetail).
      addTailSink(summary, toutSummary);

    Properties props = new Properties();
    AppProps.setApplicationJarClass(props, Main.class);
    FlowConnector fc = new HadoopFlowConnector(props);
    
    Flow<?> flow = fc.connect(fd);
    flow.writeDOT("data/kevinbacon.dot");
    flow.writeStepsDOT("data/kevinbacon-steps.dot");
    flow.complete();
  }
}

We have broken off the work to find costars at each degree of separation into a sub-assembly FindCostars, which is also shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Source: src/main/java/com/mycompany/kevinbacon/flow/FindCostars.java
package com.mycompany.kevinbacon.flow;

import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.pipe.assembly.Retain;
import cascading.pipe.assembly.Unique;
import cascading.pipe.joiner.InnerJoin;
import cascading.tuple.Fields;

/**
 * This subassembly returns a pipe containing costars at 
 * the next degree of separation. Following functionality
 * is implemented.
 * 
 * (1) Join with original pipe of (actor, movie) tuples and
 *     the pipe containing actors found in previous step
 *     to find all movies acted in by the actors.
 * (2) Dedup the movies pipe.
 * (3) Join with original pipe of (actor, movie) tuples and
 *     the movies pipe to find all costars of the actors.
 * (4) Dedup the actors pipe.
 * (5) Add a new column with the current Kevin Bacon number
 *     (degree of separation).
 */
public class FindCostars extends SubAssembly {

  private static final long serialVersionUID = 3450219986636439710L;

  private Fields movieResultFields = 
    new Fields("actor", "movie", "actor1");
  private Fields actorResultFields = 
    new Fields("actor", "movie", "movie1");

  public FindCostars(Pipe allPairs, Pipe actors, int kbNumber) {
    // join with original pipe on actor to produce pipe of
    // all movies acted on by the actors in pipe actor
    actors = new Retain(actors, Constants.actorField);
    Pipe movies = new HashJoin(
      allPairs, Constants.actorField, 
      actors, Constants.actorField,
      movieResultFields, new InnerJoin());
    movies = new Retain(movies, Constants.movieField);
    movies = new Unique(movies, Constants.movieField);
    // now find all the actors for these movies, these
    // will be the costars for the incoming actors in 
    // actorPipe. Finally insert the Bacon number for
    // costars at this degree of separation.
    Pipe costars = new HashJoin(
      allPairs, Constants.movieField, 
      movies, Constants.movieField, 
      actorResultFields, new InnerJoin());
    costars = new Retain(costars, Constants.actorField);
    costars = new Unique(costars, Constants.actorField);
    Insert insfun = new Insert(Constants.kbnumField, kbNumber);
    costars = new Each(costars, insfun, Constants.detailFields);
    setTails(costars);
  }
}

Unit Testing


The Cascading user guide recommends that all sub-assemblies, flows, operations, etc should be unit tested. Unit tests should extend the PlatformTestCase class. Tuples participating in test cases are populated via test files. Here is the JUnit test for the FindCostars subassembly. In addition, since I was unfamiliar with some operations in the Cascading API, I built another test case to help me experiment with various strategies (not included here, its on the GitHub site).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Source: src/test/java/com/mycompany/kevinbacon/flow/FindCostarsTest.java
package com.mycompany.kevinbacon.flow;

import java.io.File;

import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.Test;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.test.LocalPlatform;
import cascading.test.PlatformRunner.Platform;

@Platform(LocalPlatform.class)
public class FindCostarsTest extends PlatformTestCase {
  
  private static final long serialVersionUID = 8950872097793074273L;

  @Before
  public void setup() throws Exception {
    File output = new File("src/test/data/output");
    FileUtils.deleteDirectory(output);
  }
  
  @Test
  public void testFindCostars() throws Exception {
    String allPairsFilename = "src/test/data/find_costars_allpairs.csv";
    String actorsFilename = "src/test/data/find_costars_actors.csv";
    String costarFilename = "src/test/data/output/costars.csv";
    
    getPlatform().copyFromLocal(allPairsFilename);
    getPlatform().copyFromLocal(actorsFilename);
    
    Tap<?,?,?> tapAllPairs = getPlatform().getDelimitedFile(
      Constants.inputFields, "\t", allPairsFilename, SinkMode.KEEP);
    Tap<?,?,?> tapActors = getPlatform().getDelimitedFile(
      Constants.detailFields, "\t", actorsFilename, SinkMode.KEEP);
    Tap<?,?,?> tapCostars = getPlatform().getDelimitedFile(
      Constants.detailFields, "\t", costarFilename, SinkMode.REPLACE);
    
    Pipe allPairs = new Pipe("allPairs");
    Pipe actors = new Pipe("actors");
    Pipe costars = new FindCostars(allPairs, actors, 2);
    
    FlowDef flowDef = FlowDef.flowDef().
      addSource(allPairs, tapAllPairs).
      addSource(actors, tapActors).
      addTailSink(costars, tapCostars);
    
    Flow<?> flow = getPlatform().getFlowConnector().connect(flowDef);
    flow.complete();
    validateLength(flow, 7);
  }
}

To run it, we can use the gradle test task, like below:

1
2
sujit@cyclone:cascading-kevinbacon$ gradle test \
    -Dtest.single=path.to.test.class

Running on Amazon EMR


Before running on Amazon EMR, I made sure that the job ran correctly (with a tiny input file I hand-built) against my local Hadoop instance.

1
2
3
4
5
6
7
sujit@cyclone:cascading-kevinbacon$ gradle clean jar
sujit@cyclone:cascading-kevinbacon$ rm -rf data/output 
sujit@cyclone:cascading-kevinbacon$ /opt/hadoop-1.2.1/bin/hadoop \
    jar build/libs/kevinbacon.jar \
    src/test/data/test.csv \
    data/output/detail \
    data/output/summary

I then used the EMR browser interface to start the job. The input file and fat JAR first needed to be uploaded into S3, then a new job flow had to be created in EMR - the prompts are pretty easy to understand. My input file had 9438301 (actor, movie) tuples. It took 58 minutes on a 1+4 node m1.medium cluster to produce the following summary:

1
2
3
0    43405
1    208829
3    635

Conclusion


Building this project was a lot of fun. I ended up learning enough about the Cascading API to be reasonably confident about being able to implement real-world applications in future. Also, the Cascading code clearly came out ahead in terms of readability compared to the ECL example code - although that may be at least partly because I am more proficient in Java than ECL.

In any case, hope you found this interesting as well. If you would like to replicate this or improve upon it, you can find the code on my GitHub page for this project.