Saturday, March 17, 2018

Accessing ML models in Spark from various NLP toolkits


NLTK users know that a lot of functionality, even seemingly basic ones like sentence and word tokenization, are dependent on machine learning models pre-trained on default corpora. These models are available as a separate download because of their size. Making these models available to your code is simple -- just a single one time nltk.download() command as described on this page.

The situation is slightly more complicated in case of a distributed environment such as Apache Spark. The general idea is that you partition your data processing across multiple nodes in a cluster and then bring back the processed datasets. We use the web-based Databricks analytics platform on top of Spark, which allows us, among other things, a notebook based development environment that hides some of the boilerplate associated with straight Spark code. Databricks notebooks support Python, but NLTK does not come pre-installed.

In order to use NLTK to process text within Databricks, you need to install NLTK on your cluster. That's not too hard as long as you have the necessary permissions, the process is described on this Databricks documentation page. The PyPI package name I used was "nltk==3.2.5". This will make NLTK available to the master node and all the worker nodes in your cluster.

Generally the very first step in analyzing text data is to tokenize it into sentences and words, and as I mentioned earlier, this needs the appropriate ML model to be available on the workers. Back when I first started working on this, a colleague mentioned that he just added the nltk.download() command to the map call, so it was called for each record on the worker. He accompanied this hint with a brilliant peice of insight -- that the nltk.download() call has code to check if the download has already happened, so subsequent calls after the first one are just pass-throughs.

I thought about this a bit, and realized that I could make the process even more efficient, by calling nltk.download() once per partition using mapPartitions instead of once per record using map. So that's what I did, the code below loads the NLTK models once per partition and tokenizes the text into sentences, then words. Also, once loaded, these models are available to subsequent calls made within the same partition, as shown in the POS tagging done in a subsequent map call.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def download_and_tokenize(iter):
  import nltk

  def tokenize(line):
    image_id, caption_text = line.split("\t")
    tokens = []
    for sent in nltk.sent_tokenize(caption_text):
      for word in nltk.word_tokenize(sent):
        tokens.append(word)
    return (image_id, tokens)

  # for each partition
  nltk.download("all")

  # for each record within partition
  for line in iter:
    yield tokenize(line)


def postag(rec):
  id, tokens = rec
  tokentags = nltk.pos_tag(tokens)
  return (id, tokentags)


captions_rdd = (sc.textFile("/path/to/input/text/file")
  .mapPartitions(download_and_tokenize)
  .map(postag)
)
captions_rdd.take(10)

Of late, the SpaCy NLP library has become more popular, and I think for good reason. It is faster and has more functionality, and is being actively developed based on user feedback. Like NLTK, SpaCy does not come pre-installed on Databricks either, you can install it using the PyPI loader using the package name "spacy==2.0.9". The main problem with my trying to use SpaCy in the same way as NLTK was that I did not know of a Python analog to nltk.download(). SpaCy has a set of 2 commands, a "python -m spacy download en" call on the command line followed by a spacy.load("en") Python call as described on the SpaCy Models and Languages page. While this works very nicely in a single user environment, the only way I could think of to do this in Spark was to login separately into each of the workers and download the model on each, obviously not the most desirable approach in an automated notebook environment.

I had some spare cycles, so I went digging in the code, and found that the "python -m spacy ..." call corresponds to identically named functions in the spacy.cli package. So this allowed me to use SpaCy in Databricks using code as shown below. The idea is the same as for NLTK. We download the model and load it into SpaCy once per partition. Just like NLTK, the spacy.cli.download() call checks for the existence of the model and its dependencies using the pip installer. In addition, my code also checks for existence, so it will bypass the download() call altogether after the first time on each worker. Also unlike NLTK, SpaCy batches up basic operations in a single call for performance as shown in SpaCy lightning tour code example, so we don't have a separate POS tagging step here. But the model should be accessible to subsequent map calls similar to the NLTK case here as well.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def download_tokenize_and_postag(iter):
  import os
  import spacy

  def tokenize_and_postag(line):
    image_id, caption_text = line.split("\t")
    doc = nlp(caption_text)
    token_tags = []
    for token in doc:
      token_tags.append((token.text, token.pos_))
    return (image_id, token_tags)

  # for each partition
  model_dir = spacy.util.get_data_path()
  if not os.path.exists(os.path.join(model_dir.as_posix(), "en")):
    spacy.cli.download("en")
  nlp = spacy.load("en", parser=False)
  
  # for each record within partition
  for line in iter:
    yield tokenize_and_postag(line)


captions_rdd = (sc.textFile("/path/to/input/text/file")
  .mapPartitions(download_tokenize_and_postag)
)
captions_rdd.take(10)

Java based toolkits are easier to work with, at least with respect to model files, since they often embed their model into their JARs and access it as a Resource instead of a File. So the models are automatically distributed to workers along with the code by attaching the JAR file to the cluster. So the user of these toolkits does not have to do anything special to use these libraries. As an example, here is some code to do tokenization and POS tagging using the Spark-NLP library from John Snow Labs. The code is based on Spark-NLP 2.11-1.2.3, and is part of the pipeline described on their quickstart page.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import com.johnsnowlabs.nlp._
import com.johnsnowlabs.nlp.annotators._
import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel
import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach

import org.apache.spark.sql.functions._
import org.apache.spark.ml.Pipeline

case class Caption(id: String, text: String)

val captionDF = sc.textFile("/path/to/input/text/file")
  .map(line => {
    val Array(id, text) = line.split("\t")
    Caption(id, text)
  })
  .toDF

val assembleDoc = new DocumentAssembler()
  .setInputCol("text")
  .setOutputCol("document")

val sentTokenize = new SentenceDetectorModel()
  .setInputCols(Array("document"))
  .setOutputCol("sentence")

val wordTokenize = new RegexTokenizer()
  .setInputCols(Array("sentence"))
  .setOutputCol("token")
  .setPattern("[^ \\(\\)\\/%]+")     // default pattern is \S+ too loose

val posTagger = new PerceptronApproach()
  .setInputCols(Array("sentence", "token"))
  .setOutputCol("pos")
  .setCorpusPath("/anc-pos-corpus/1400.txt")   // see spark-nlp issue 41

val finishDoc = new Finisher()
  .setInputCols("token")
  .setCleanAnnotations(false)

val pipeline = new Pipeline()
  .setStages(Array(
    assembleDoc,
    sentTokenize,
    wordTokenize,
    posTagger,
    finishDoc
))
val transformedDF = pipeline.fit(captionDF)
  .transform(captionDF)

Notice that the sentence tokenizer and POS tagger uses models, but no mention is made of loading them up-front. The POS tagger has to explicitly set the corpus path. If you look at the file in the repository, you will see it's just a POS-tagged dataset, so presumably the tagger trains itself inline on startup. The SentenceModel on the other hand, seems to use a pre-trained model, which is also loaded once on startup. This would happen once per worker JVM as part of the object's initialization, so this mechanism is even more performant than using mapPartitions().

Lastly, I wanted to mention yet another approach to doing NLP tasks on Spark that we use internally. The approach is similar to the one SpaCy uses -- we have a set of annotators, each of which does a set of tasks. For example, our GeniaAnnotator uses models trained against the GENIA corpus, and outputs sentence, phrase and word boundaries, POS tags and lemmas. An example of annotations output by the annotator is shown below.


These annotations convert unstructured text data into structured annotations, and can be consumed by downstream applications in a language agnostic manner. The annotation building framework has too many hooks into internal systems to be effectively open-sourced, but we do plan on providing exemplar output from our annotators for OA-STM-Corpus, our hand-annotated mini-corpus of scientific open access articles. Our team has also open-sourced AnnotationQuery, a framework that allows you to compose interesting queries on these annotations, either locally or on Spark.

Saturday, March 03, 2018

An implementation of the Silhouette Score metric on Spark


While clustering some data on Spark recently, I needed a quantitative metric to evaluate the quality of the clustering. Couldn't find anything built-in, so (predictably) went looking on Google, where I found this Stack Overflow page discussing this very thing. However, as you can see from the accepted answer, the Silhouette score by definition is not very scalable, since it requires measuring the average distance from each point to every other point in its cluster, as well as the average distance from each point to every other point in all the other clusters. This is clearly an O(N2) operation and not likely to scale for large N. What caught my eye, however, was Geoffrey Anderson's answer, who suggested using the Simplified Silhouette score, which requires only the distance from every point to its own centroid and is thus an O(N) operation and therefore quite scalable.

However, while this is indeed the algorithm suggested in the cited paper (Multiple K Means++ Clustering of Satellite Image Using Hadoop MapReduce and Spark by Sharma, Shokeen and Mathur), this simplified metric only reports on how tight each cluster is. The intent of the original Silhouette score is to report both on tightness of individual clusters as well as separation between clusters. Turns out that such a simplified Silhouette metric does exist, and is defined in detail in this paper titled An Analysis of the Application of Simplified Silhouette to the Evaluation of k-means Clustering Validity (PDF) by Wang, et al. Interestingly, this is also the definition used in the implementation of Silhouette score in Scikit-Learn.

The latter formulation of the Simplified Silhouette Index (SSI) is shown below. For each point i, call the distance to its own cluster centroid ai, and call the distance to the nearest neighboring centroid bi. The Silhouette score for the i-th point is given by SSIi as shown below. Here ai is the indicator of cluster tightness and bi is the indicator of cluster separation. One thing to notice is that in order to compute bi you will need to compute the distance from point i to all the other centroids except its own, so the complexity of the algorithm is O(Nk) where k is the number of clusters — so still linear, but can be high for large k.


Values for SSI can vary in the range [-1, 1]. Values near 0 indicate overlapping clusters, and negative values generally indicate that the point has been wrongly clustered, since it is closer to a different cluster than it's own. Best values of SSI are close to 1. The average SSI across all the points in the corpus gives us an indication of how good the cluster is.

In addition, the distribution of SSI values in each cluster can be histogrammed as shown in this KMeans clustering and Silhouette analysis example on Scikit-Learn. A Clustering where the points are distributd approximately equally across clusters tend to be better, given similar values of average SSI.

A flow diagram for my implementation is shown below. There are 3 inputs needed, an RDD of point vectors keyed by an sequential record ID (rid), an RDD of predictions consisting of a record ID and cluster ID pair, and an RDD of centroids consisting of the clusterID and centroid vector. The RDD of points is just the input vectors with an additional sequential record ID, which you can easily provide with a zipWithIndex call on the original input RDD of vectors to cluster. The prediction RDD is the output of model.predict on the clustering model. The centroids RDD is the output of model.clusterCenters, with an additional zipWithIndex to get the cluster IDs. The point and prediction RDD are joined on the record ID, and the centroids RDD is converted to a lookup dictionary of cluster ID to cluster vector and broadcasted to the workers. The joined RDD and the broadcasted lookup table are used to compute SSI for each point. We retain the cluster ID in case we want to compute the histograms by cluster. I didn't do this because my data is too large for this information to be visually meaningful, I ended up plotting the distribution of number of points in each cluster instead.


All the input RDDs are available to the algorithm as structured text files from the clustering process. Here is the Scala code I used to implement this Simplified Silhouette index. We use Databricks Notebooks for most of our Spark analytics work, so the code below doesn't contain the boilerplate that you need to start Spark jobs directly on EMR or a standard Spark cluster. But it should be relatively simple to add that stuff in if needed.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import breeze.linalg._

val OUTPUT_POINTS = "/path/to/points"
val OUTPUT_PREDICTIONS = "/path/to/predictions"
val OUTPUT_CENTROIDS = "/path/to/centroids"

// read centroids file and convert to lookup table, then broadcast to workers
val centroidsRDD = sc.textFile(OUTPUT_CENTROIDS)
  .map(line => {
    val Array(cid, cstr) = line.split('\t')
    val cvec = DenseVector(cstr.split(',').map(x => x.toDouble))
    (cid.toLong, cvec)
  })

val cid2centroid = centroidsRDD.collect.toMap
val b_cid2centroid = sc.broadcast(cid2centroid)

// read points file
val pointsRDD = sc.textFile(OUTPUT_POINTS)
  .map(line => {
    val Array(rid, pstr) = line.split('\t')
    val pvec = DenseVector(pstr.split(',').map(x => x.toDouble))
    (rid.toLong, pvec)
  })
pointsRDD.persist()

// read predictions file
val predictionsRDD = sc.textFile(OUTPUT_PREDICTIONS)
  .map(line => {
    val Array(rid, cid) = line.split('\t')
    (rid.toLong, cid.toLong)
  })
predictionsRDD.persist()

// join pointsRDD and predictionsRDD, look up centroid vectors from broadcast
// and compute a, b and SSI for all points, group by cluster ID
def euclideanDist(v1: DenseVector[Double], v2: DenseVector[Double]): Double = norm(v1 - v2, 2)

val predictedPointsRDD = pointsRDD.join(predictionsRDD)    // (rid, (pvec, cid))
  .map(rec => {
    val rid = rec._1
    val pvec = rec._2._1
    val cid = rec._2._2
    val cvec = b_cid2centroid.value(cid)
    val aDist = euclideanDist(pvec, cvec)
    val bDists = b_cid2centroid.value.toList
      .filter(cc => cc._1 != cid)
      .map(cc => {
        val otherCid = cc._1
        val otherCvec = cc._2
        val otherDist = euclideanDist(pvec, otherCvec)
        (otherCid, otherDist)
      })
    val bDist = bDists.sortWith((a, b) => a._2 < b._2).head._2
    val ssi = if (aDist == 0.0 && bDist == 0.0) 0.0D 
              else (bDist - aDist) / max(List(aDist, bDist))
    (cid, ssi)
  })
predictedPointsRDD.persist()

// compute mean SSI
val acc = sc.doubleAccumulator("acc_ssi")
val sumSSI = predictedPointsRDD.foreach(rec => acc.add(rec._2))
val meanSSI = acc.value.toDouble / predictedPointsRDD.count
print("mean SSI: %.5f\n".format(meanSSI))

predictedPointsRDD.unpersist()
pointsRDD.unpersist()
predictionsRDD.unpersist()

I have used this code to evaluate two clustering algorithms (KMeans and Bisecting KMeans) on my data, using two different approaches to vectorizing the data, and with various values of K (number of clusters). The mean SSI metric provided me the ability to reduce an entire operation to a single number that I could compare across runs. I thought this was very helpful and helped me decide which outputs to keep and which to discard without having to physically scan each output. I hope this code is useful to others who might need to evaluate their clustering algorithms on Spark.