Saturday, May 31, 2014

Outlier Detection on Medical Claims


I have been analysing some (anonymized) Medicare/Mediclaim data. In this post, I describe how I used this data to build a model that could be used to detect fraudulent/abnormal claims.

A claim consists of a set of medical codes. Each code can refer to a procedure performed, drug administered, diagnosis made, or item provided by the provider (your doctor or hospital) to the patient (you) during an encounter (a doctor's visit). Each code has a price that is pre-negotiated between the insurance company and the service provider. The claim represents the invoice the provider sends to your insurance company after deducting your co-pay. One way a service provider could defraud the insurance company is to "accidentally" drop in high-priced procedure codes that don't "belong".

The Medicare/Mediclaim data provides a large number of outpatient claim records, which can be used to build a model of a "normal" claim. The intuition is as follows - some codes naturally go together, such as that for a blood test and a disposable blood-drawing kit, or a diagnosis of a fracture and splints. While the codes used in each encounter can vary depending on age, gender or pre-existing condition even for the same service, computing this "similarity" between different codes can help us find codes that don't belong.

In our model, the similarity simij between two codes i and j is calculated by counting how many times they co-occur across claims in the dataset. We then compute the average distance dij as 1/simij. We then consider each claim as a cluster of codes, and compute the average diameter of the cluster as the square root of the sum of squared distances between all pairs of codes in the claim, ie √(Σi,jdij2).

This is more of an exploration to see if such a model makes sense. I used Python and its data science libraries (numpy, scipy, matplotlib, scikits-learn, pandas, etc) to work on a subset (approximately 5%) of the data. I first read the data into a pandas DataFrame, then extract only the codes from each claim. Then I use scikit-learn's CountVectorizer to create a binarized term-document matrix (or code-claim matrix in this case).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from __future__ import division
import itertools
import numpy as np
import os.path
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer

def join_codes(row):
    return " ".join([str(v) for i, v in row.iteritems() if pd.notnull(v)])
    
DATA_DIR = "/path/to/claims/data"
EPSILON = 0.1

# extract codes as bag of codes from input
opdf = pd.read_csv(
    os.path.join(DATA_DIR, "DE1_0_2008_to_2010_Outpatient_Claims_Sample_1.csv"),
    low_memory=False)
opdf.head()

colnames = [colname for colname in opdf.columns if "_CD_" in colname]
bcdf = opdf.ix[:, colnames].apply(join_codes, axis=1)

# build a code-document matrix out of the codes
vec = CountVectorizer(min_df=1, binary=True)
X = vec.fit_transform(bcdf)

This creates a 790790x17023 sparse matrix on which 0.04% elements are non-zero. Taking a cue from text mining, we can compute the similarity between codes as XT*X. Further, since our matrix is binarized, the matrix product will contain the co-occurrence counts. While this approach seems a bit unintuitive (unless you have done this before), and even a bit wasteful since we will use only half of what we compute (the top or bottom triangle), this takes advantages of numpy's vector optimizations and is orders of magnitude faster than looping through the data manually to calculate the similarity.

1
sim = X.T * X

The similarity matrix is a 17023x17023 sparse square matrix (slightly denser than X, with 2.2% elements being non-zero). We use the similarities to compute the average diameter of the code cluster in each claim, thereby reducing each claim to a single number. This is done within a for-loop and does an all-pairs join on each record with a O(n2) complexity, and takes a while (6 hours in my case) to generate.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
fout = open(os.path.join(DATA_DIR, "clusters.txt"), 'wb')
for row in range(0, X.shape[0]):
    codes = [code for code in X[row, :].nonzero()][1]
    dists = []
    for i, j in itertools.product(codes, codes):
        if i < j:
            sim_ij = sim.getrow(i).todense()[:, j][0]
            if sim_ij == 0:
                sim_ij = EPSILON
            dists.append(1 / (sim_ij ** 2)) 
    fout.write("%f\n" % (np.sqrt(sum(dists)) / len(dists)))
fout.close()

The file contains a series of numbers, each number representing the code cluster diameter for a single claim. This is now a univariate distribution, and we can use techniques described in this paper on univariate outliers (PDF) to detect outliers. Essentially, for normal distributions, elements farther than 2 standard deviations from the mean (at the 95 percentile level) can be considered as mild outliers, and those that are farther than 3 standard deviations from the mean (at the 99 percentile level) can be considered as strong outliers. Mean based measures are sensitive to the presence of outliers, however, so one can also use the median based measure. In this case, mild outliers are those outside the range (Q1 - 1.5*IQR, Q3 + 1.5*IQR) and strong outliers are those outside the range (Q1 - 3*IQR, Q3 + 3*IQR), where the interquartile range IQR = Q3 - Q1.

Since the data (even at 5% of the total volume) is still quite large, I initially thought that I would have to compute mean and median using streaming algorithms - Welford's method for streaming standard deviation (and mean) and the P-square algorithm for streaming quantiles - the Python library livestats implements these algorithms, so I just used that:

The code below generates some visualizations using the cluster diameter data we just generated, from which we can extract cutoffs for prediction.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from livestats import livestats
import math
import os.path

EPSILON = 0.0001

def summary(stats):
    summary = {}
    summary["mean"] = stats.mean()
    summary["stdev"] = math.sqrt(stats.variance())
    q1, q2, q3 = [q[1] for q in stats.quantiles()]
    summary["q1"] = q1
    summary["q2"] = q2
    summary["q3"] = q3
    return summary    
    
def norm(x, mu, sigma):
    return (1 / (sigma * np.sqrt(2 * np.pi)) *
        np.exp(-(x - mu)**2 / (2 * sigma**2)))

lns = 0
fin = open(os.path.join(DATA_DIR, "clusters.txt"), 'rb')
stats = livestats.LiveStats([0.25, 0.5, 0.75])
xs = []
for line in fin:
    line = line.strip()
    lns += 1
    x = EPSILON if line == "nan" else float(line)
    x = math.log(x, math.e)
    xs.append(x)
    # add a mirror image to make it approximately normal
#    xs.append(-x)
    stats.add(x)
#    stats.add(-x)
fin.close()

# plot data for visualization
mu = stats.mean()
sigma = math.sqrt(stats.variance())
count, bins, ignored = plt.hist(xs, bins=100, normed=True)
plt.plot(bins, [norm(x, mu, sigma) for x in bins], linewidth=2, color='r')
max_y = 0.5
#max_y = 10

# mean +/- (2*sigma or 3*sigma)
lb2 = mu - (2 * sigma)
ub2 = mu + (2 * sigma)
lb3 = mu - (3 * sigma)
ub3 = mu + (3 * sigma)
plt.plot([lb2, lb2], [0, max_y], 'r--')
plt.plot([ub2, ub2], [0, max_y], 'r--')
plt.plot([lb3, lb3], [0, max_y], 'r-')
plt.plot([ub3, ub3], [0, max_y], 'r-')

# median based (interquartile range based outlier measure)
q1, q2, q3 = [q[1] for q in stats.quantiles()]
iqr = q3 - q1
lb2 = q1 - (1.5 * iqr)
ub2 = q3 + (1.5 * iqr)
lb3 = q1 - (3.0 * iqr)
ub3 = q3 + (3.0 * iqr)
plt.plot([lb2, lb2], [0, max_y], 'g--')
plt.plot([ub2, ub2], [0, max_y], 'g--')
plt.plot([lb3, lb3], [0, max_y], 'g-')
plt.plot([ub3, ub3], [0, max_y], 'g-')

print summary(stats)

plt.show()

The maximum number of clusters peak close to 0 diameter, the distribution resembles a Zipf distribution. I tried to fake a normal distribution by adding a mirror image (commented out lines in the xs.append(-x) and stats.add(-x) in the code above). I also tried to fit a normal curve of area 1 to the distribution, but as you can see, the normal distribution for the standard deviation is shorter and fatter than the actual distribution.


The vertical red lines (full and dotted) indicate the strong and weak outliers based on the mean, and the vertical green lines (full and dotted) indicate the strong and weak outliers based on the median. However, because the distribution is clearly not normal, we cannot draw any conclusions from this.

I then tried to do a log transform, which fits much better to a normal distribution, except that it is skewed to the left. Here is what the log of the diameters looks like:


As you can see, there is absolutely no outlier data on the right hand side. We are not really interested in outliers on the left, since they represent super-tight clusters of codes. So we abandon this approach and try to find the 95% and 99% percentile points in the Zipf distribution.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def compute_cutoff(level, cys):
    for i in range(len(cys), 0, -1):
        if cys[i-1] < level:
            return i
    return -1    
    
counts, bins, ignored = plt.hist(xs, bins=100)
cumsums = np.cumsum(counts)
plt.plot(bins[:-1], cumsums, color='red')

max_cy = len(xs)
strong_xcut = compute_cutoff(0.99 * max_cy, cumsums) / len(bins)
mild_xcut = compute_cutoff(0.95 * max_cy, cumsums) / len(bins)

print (strong_xcut, mild_xcut)

plt.plot([strong_xcut, strong_xcut], [0, max_cy], 'g-')
plt.plot([mild_xcut, mild_xcut], [0, max_cy], 'g--')

plt.show()

This produces the output as shown below. The red curve plots the cumulative frequency distribution, and the green solid and dotted lines represent the 99 and 95 percentile cutoffs respectively.


Although there are some items beyond the 99 percentile mark, inspection reveals that these are all single rarely occurring codes (occurs once in the dataset), which is not interesting. So we look at the mild outliers (95 percentile mark) ordered by descending cluster diameter, ignoring diameters of 1 (which point to single rare codes which are not interesting as noted before).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
strong_outlier_cutoff = 0.9801980198019802
mild_outlier_cutoff = 0.36633663366336633

fin = open(os.path.join(DATA_DIR, "clusters.txt"), 'rb')
outliers = []
idx = 0
for line in fin:
    line = line.strip()
    x = EPSILON if line == "nan" else float(line)
    if x > mild_outlier_cutoff and x < 1.0:
       outliers.append((idx, x))
    idx += 1
fin.close()

# find corresponding claim ids and claims for verification
outliers_sorted = sorted(outliers, key=operator.itemgetter(0), 
                         reverse=True)[0:10]
colnames = ["CLM_ID"]
colnames.extend([colname for colname in opdf.columns if "_CD_" in colname])
for idx, score in outliers_sorted:
    claim_id = opdf.ix[idx, colnames[0]]
    codes = opdf.ix[idx, colnames[1:]]
    names = ["_".join(x.split("_")[0:2]) for x in colnames[1:]]
    code_names = [":".join([x[0], x[1]]) for x in zip(names, codes.values) 
                                         if pd.notnull(x[1])]
    print("%s %6.4f %s" % (claim_id, score, str(code_names)))

This returns the following result. I am not conversant enough with medical codes to tell if these are true anomalies, but I would be willing to bet that these are relatively rare.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
542382281346896 0.5000 ['ICD9_DGNS:99673', 'HCPCS_CD:11042']
542962281495106 0.4787 ['ICD9_DGNS:99657', 'HCPCS_CD:11042', 'HCPCS_CD:94375']
542332281612480 0.4715 ['ICD9_DGNS:7921', 'ICD9_DGNS:5533', 'HCPCS_CD:A5061']
542482280853055 0.5774 ['ICD9_DGNS:V622', 'ICD9_DGNS:V653', 'HCPCS_CD:90717']
542142281445697 0.3727 ['ICD9_DGNS:6119', 'ICD9_DGNS:71949', 'HCPCS_CD:76645']
542352281341634 0.4787 ['ICD9_DGNS:7948', 'ICD9_DGNS:5739', 'ICD9_DGNS:V7649']
542142281439193 0.3745 ['ICD9_DGNS:7260', 'ICD9_DGNS:7231', 'HCPCS_CD:20550']
542492281574929 0.3727 ['ICD9_DGNS:6279', 'ICD9_DGNS:7822', 'HCPCS_CD:77080']
542362281032984 0.4718 ['ICD9_DGNS:07799', 'ICD9_DGNS:2724', 'HCPCS_CD:87252']
542072280995842 0.3727 ['ICD9_DGNS:496', 'ICD9_DGNS:V601', 'HCPCS_CD:97110']

Another good introduction to outliers in general is this teaser chapter in the book Outlier Analysis by Charu Aggarwal. I got to read this towards the tail end of this work, and found that proximity based outlier detection is a valid approach. Felt great to have figured this out independently :-).

I also started using the Spyder IDE (part of the Anaconda distribution that I use) for this work, and I loved it! Its a bit like R-Studio, allows you to run scripts or code blocks, then inspect the values as you run. Its a huge timesaver (and more convenient than importing scripts into the python shell) when you are dealing with large amounts of data. There is scope for improvement - key mappings are a bit non-intuitive for me and key remapping does not work, and large jobs tend to hang the IDE, but its still a huge convenience to be able to look at your data and try various things with it without having to rerun the entire script from scratch.

I would also like to thank my colleagues Bijal Shah, who confirmed my hypothesis that the practice of switching codes in insurance claims does exist, and Roger Yeung who pointed me to the Medicare/Mediclaim dataset.

All the code for this post is in my GitHub repository. Would appreciate hearing from you if you know of better ways of finding outliers.

Friday, May 23, 2014

IPython Notebooks for StatLearning Exercises


Earlier this year, I attended the StatLearning: Statistical Learning course, a free online course taught by Stanford University professors Trevor Hastie and Rob Tibshirani. They are also the authors of The Elements of Statistical Learning (ESL) and co-authors of its less math-heavy sibling: An Introduction to Statistical Learning (ISL). The course was based on the ISL book. Each week's videos were accompanied by some hands-on exercises in R.

I personally find it easier to work with Python than R. R seems to have grown organically with very little central oversight, so function and package names are often non-intuitive, and often have duplicate or overlapping functionality. In general, an educated guess about an R function has about the same likelihood of being right as a completely random one - unless you know the function or package, your chances are 50-50. On the other hand, with Python, an educated guess has a 40-90 percent chance of being right, depending on the library and how educated your guess was. So while the good profs were patiently explaining the R code, I was mostly busy fantasizing about writing all of it in Python some day.

At the time, I had worked a bit with scikit-learn and NumPy. I had heard about Pandas and knew it was the Python implementation of DataFrames, but hadn't actually worked with it. Over the past couple of months, I have had the opportunity to work with Pandas and IPython Notebooks for a project I did with my kids, and as a result I now quite enjoy the power and expressivity that these libraries provide.

So I decided to apply my newly acquired skills to do this rewrite. One of my incentives for doing this was the chance to get a fairly comprehensive guided tour of scikit-learn algorithms that I wouldn't normally use. Of course, the tour depends a lot on the guide, and the course is taught from the point of view of a statistician than a machine learning person. Since my toolchain (scikit-learn, NumPy, SciPy, Pandas, MatplotLib and a bit of statsmodels) is more focused towards Machine Learning, there were times when I wasn't able to replicate the functionality completely and accurately.

There are 9 notebooks listed below, corresponding to the exercises for Chapters 2-10 of the course. The notebooks and data can be found on my GitHub in the project statlearning-notebooks. You can also read the notebooks directly on the nbviewer.ipython.org via the links in the README.md file.


This exercise introduced me to a lot of scikit-learn algorithms that I had not used before. Since there are quite a few functionality mismatches between R and scikit-learn, trying to match it often led me to novel ideas described on sites like StackOverflow and Cross-Validated, some of which I implemented (and others I have linked to). I also learned quite a bit about plotting with matplotlib, since the original exercises use R's rich plotting features as a matter of course, some of which require additional work in Python.

Overall, I found that the group of Python libraries were more than adequate for most tasks in the exercises, and (at least in my eyes) resulted in cleaner, more readable code. Take a look at these pages to get an overview of what scikit-learn and Pandas, my two top level libraries, can do. However, R also offers lots of functionality - there is lot of overlap, but in some cases R provides algorithms that scikit-learn doesn't. However, scikit-learn has many more algorithms compared to R. So it makes sense to learn and use both as needed.

If you are considering using my group of Python libraries for data analysis, then the notebooks should be useful as examples. For more advanced programmers, if you think there are better ways to do something than what I have done, I would appreciate hearing from you (or since its on GitHub, a pull request would be good too!).

Saturday, May 10, 2014

Preprocessing data with Scalding and Amazon EMR


Lately, I have been trying to do some data analysis against (anonymized) Medicare/Mediclaim data. The data contains member information (gender, race, data of birth, chronic conditions, etc) for 6.6M patients, and inpatient and outpatient claims (1.3M and 15.8M respectively) for the period 2008-2010. I started out using Pandas to do some basic analysis - nothing fancy, basically finding distribution of various chronic conditions by race and sex (I figured age wouldn't be as interesting, since the age range is quite narrow in this case), correlations between different chronic conditions, etc. You can see the IPython notebook containing this analysis here.

While analyzing the inpatient claims data, I wondered about the possiblity of building a model to predict the probability of chronic disease given the claim codes for a patient. Pandas over IPython was okay for doing analysis with a subset (single file) of data, but got a bit irritating with the full dataset because of frequent hangs and subsequent IPython server restarts. So I began to consider using Hadoop, preferably over Amazon EMR - and Scalding was similar enough to Pandas to make this the obvious choice (obvious for me, at least - I prefer Scalding over Pig, and I haven't used (S)Crunch or Scoobi).

The last time I used Scalding was when I wrote the Scalding for the Impatient series (a Scalding version of Paco Nathan's Cascading for the Impatient series) as a way to learn Scala. Since then I have used Cascading (Scalding's clunkier but richer Java predecessor), mainly because I couldn't run Scalding in anything other than local mode. When I looked this time I found this project template from the kind folks at Snowplow Analytics to run Scalding code on Amazon EMR, so that was no longer a problem.

This post describes a mixture of Python and Scala/Scalding code that I hooked up to convert the raw Benefits and Inpatient Claims data from the Medicare/Medicaid dataset into data for an X matrix and multiple y vectors, each y corresponding to a single chronic condition. Scalding purists would probably find this somewhat inelegant and prefer a complete Scalding end-to-end solution, but my Scalding-fu extends only so far - hopefully it will improve with practice.

Project Setup


The Snowplow example project is based on custom Scala code in the project/ directory - I'm used to simpler projects, with a build.sbt in the root directory. I tried this route for a bit but soon gave up because of the complexity involved. The example project was also built for Scala 2.9, and I use 2.10 (the Eclipse based Scala IDE is tied to a specific Scala version). The change also prompted changes to other versions hardcoded in the build files, as well as some minor code changes, all of which are described below. Alternatively, you can just grab the contents of my project directory on GitHub.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Source: project/build.properties
updated sbt.version to 0.13.0

# project/plugins.sbt
* updated sbt-assembly from 0.8.5 to 0.10.1
* added sbteclipse-plugin 2.4.0

# project/ExampleScaldingProjectBuild.scala
* renamed this file and contained class to ProjectBuild.scala

# project/Dependencies.scala
* renamed ScalaToolsSnapshots alias to "Scalatools snapshots at Sonatype"
* updated specs2 to 1.13 per comment (not used in my case)

# project/BuildSettings.scala
* updated basicSettings
* fixed up compile failure caused by change in sbt-assembly API

The project also provides a JobRunner object that is used to call the selected Job class from Hadoop. After testing that the WordCount example still worked on Amazon EMR with my changes, I moved the JobRunner.scala file to my own job package and removed the package for the Word Count job.

Data Preparation


The Benefit summary and the inpatient claims data consist of 58 and 19 files respectively. I had initially thought that Scalding would provide some Source abstraction that could read off a directory, but I couldn't find one in the examples. So I wrote this Python snippet to concatenate them into 2 large files.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Source: src/main/python/catfiles.py
import os
import sys

def usage():
  print "Usage: %s indir outfile skipheader" % (sys.argv[0])
  print "where:"
  print "indir - input directory of files to concat"
  print "outfile - outfile file to concat to"
  print "skipheader - true if header to be skipped else false"
  sys.exit(-1)

def main():
  if len(sys.argv) != 4 \
     or not os.path.isdir(sys.argv[1]) \
     or sys.argv[3] not in ["true", "false"]:
    usage()
  fout = open(sys.argv[2], 'wb')
  for fn in os.listdir(sys.argv[1]):
    print "Now processing: %s" % (fn)
    fin = open(os.path.join(sys.argv[1], fn), 'rb')
    should_skip_line = sys.argv[3] == "true"
    for line in fin:
      if should_skip_line: 
        should_skip_line = False
        continue
      fout.write(line)
    fin.close()
  fout.close()

if __name__ == "__main__":
  main()

Data Preprocessing Phase I


The first phase consists in creating three views of the claims data. The input data consists of 45 odd columns for different medical codes per claim record. The code data is sparse, ie, each claim would have only a few of these columns filled out. This phase reads the claims data and normalizes it into (member_id, code_type:code_value, number_of_claims) triples. From this normalized data, we also extract the unique code_type:code_value pairs (hereafter referred to as code_id) and the unique member_ids. Scalding code for that is shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Source: src/main/scala/com/mycompany/cmspp/PreprocJob1.scala
package com.mycompany.cmspp

import scala.io.Source

import com.twitter.scalding.Args
import com.twitter.scalding.Csv
import com.twitter.scalding.Job
import com.twitter.scalding.RichPipe
import com.twitter.scalding.TextLine

class PreprocJob1(args: Args) extends Job(args) {

  def normalizeClaims(line: String): List[(String,String)] = {
    val inputColnames = Schemas.InpatientClaims.map(
      sym => sym.name)
    val outputColnames = Schemas.Codes.map(sym => sym.name)
    val ocolset = outputColnames.toSet
    val colvals = line.split(",")
    val memberId = colvals.head
    inputColnames.zip(colvals)
      .filter(nv => ocolset.contains(nv._1))
      .filter(nv => (! nv._2.isEmpty()))
      .map(nv => nv._1.split("_").head + ":" + nv._2)
      .map(code => (memberId, code))
  }
  
  val claims = TextLine(args("claims"))
    .flatMap(('line) -> ('DESYNPUF_ID, 'CLAIM_CODE)) {
      line: String => normalizeClaims(line)
    }
    .project(('DESYNPUF_ID, 'CLAIM_CODE))
    .groupBy(('DESYNPUF_ID, 'CLAIM_CODE)) { 
      grp => grp.size('NUM_CLAIMS) 
    }

  val members = RichPipe(claims)
    .project('DESYNPUF_ID)
    .unique('DESYNPUF_ID)
    .write(Csv(args("members")))

  val codes = RichPipe(claims)
    .project('CLAIM_CODE)
    .unique('CLAIM_CODE)
    .write(Csv(args("codes")))
    
  claims.write(Csv(args("output")))
}

object PreprocJob1 {
  def main(args: Array[String]): Unit = {
    // input files
    val claims = "data/inpatient_claims.csv"
    // output files
    val members = "data/members_list.csv"
    val codes = "data/codes_list.csv"
    val output = "data/claim_triples.csv"
    (new PreprocJob1(Args(List(
        "--local", "", 
        "--claims", claims,
        "--members", members,
        "--codes", codes,
        "--output", output)))
    ).run
    Console.println("==== members_list ====")
    Source.fromFile(members).getLines().slice(0, 3)
      .foreach(Console.println(_))
    Console.println("==== codes_list ====")
    Source.fromFile(codes).getLines().slice(0, 3)
      .foreach(Console.println(_))
    Console.println("==== claim_triples ====")
    Source.fromFile(output).getLines().slice(0, 3)
      .foreach(Console.println(_))
  }
}

To run this on Amazon EMR, comment out the companion object that is used for local testing, then build the fat JAR with "sbt assembly", then upload the JAR in target/scala-2.10 and the data files to S3. To launch the job in EMR, use the following parameters:

1
2
3
4
5
6
7
8
Job JAR: s3://${BUCKET}/cmspp/scalding-meddata-preproc-0.1.0.jar
Job Arguments:
    com.mycompany.cmspp.PreprocJob1 \
    --hdfs \
    --claims s3://${BUCKET}/cmspp/claims/inpatient_claims.csv \
    --members s3://${BUCKET}/cmspp/members \
    --codes s3://${BUCKET}/cmspp/codes \
    --output s3://${BUCKET}/cmspp/triples

The first 3 lines of outputs for this phase (generated using a local run against a truncated dataset) are shown below.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
==== members_list ====
0001448457F2ED81
000188A3402777A5
0001EB1229306825
==== codes_list ====
CLM:202
CLM:203
CLM:206
==== claim_triples ====
0001448457F2ED81,CLM:217,1
0001448457F2ED81,CLM:460,1
0001448457F2ED81,CLM:881,1

Assigning Numeric IDs


For the two unique lists of member_ids and code_ids, we assign a sequential value so we can convert the claim triples data into a sparse matrix. Since Scalding assumes a distributed system, assigning a unique serial number is a hard thing to do - there are ways to do it with the groupAll() method which forces using a single reducer, but I couldn't make it work. Of course, this is trivial to do in a non-distributed environment. The following Python code is run against the member_id list and code_id list to produce "dictionaries" of (member_id, int) and (code_id, int) pairs.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Source: src/main/python/add_lineno.py
import os
import sys

def usage():
  print "Usage: %s infile outfile" % (sys.argv[0])
  print "where:"
  print "infile - input CSV file without line number"
  print "outfile - output CSV file with line number as first col"
  sys.exit(-1)

def main():
  if len(sys.argv) != 3 \
      or not os.path.isfile(sys.argv[1]):
    usage()
  fin = open(sys.argv[1], 'rb')
  fout = open(sys.argv[2], 'wb')
  lno = 0
  for line in fin:
    fout.write("%d,%s" % (lno, line))
    lno += 1
  fin.close()
  fout.close()

if __name__ == "__main__":
  main()

Data Preprocessing Phase II


This phase reads the dictionaries generated by the Python code above and the claims triples data to produce an X matrix and a set of y vectors (one for each chronic condition). The X matrix is L2-normalized and in sparse format. Here is the Scalding code to do this transformation.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Source: src/main/scala/com/mycompany/cmspp/PreprocJob2.scala
package com.mycompany.cmspp

import scala.io.Source

import com.twitter.scalding.Args
import com.twitter.scalding.Csv
import com.twitter.scalding.Job
import com.twitter.scalding.mathematics.Matrix.pipeExtensions

class PreprocJob2(args: Args) extends Job(args) {

  val benefits = Csv(args("benefits"), 
    fields=Schemas.BenefitSummary)
  val claims = Csv(args("claims"), 
    fields=List('DESYNPUF_ID, 'CLAIM_CODE, 'NUM_CLAIMS))
  val memberDict = Csv(args("members"),
    fields=List('MEM_IDX, 'DESYNPUF_ID))
  val codesDict = Csv(args("codes"), 
    fields=List('COD_IDX, 'CLAIM_CODE))
  
  claims.joinWithSmaller('DESYNPUF_ID -> 'DESYNPUF_ID, memberDict)
    .joinWithSmaller('CLAIM_CODE -> 'CLAIM_CODE, codesDict)
    .toMatrix[Long,Long,Double]('MEM_IDX, 'COD_IDX, 'NUM_CLAIMS)
    .rowL2Normalize
    .pipe
    .mapTo(('row, 'col, 'val) -> ('row, 'colval)) { 
      row: (Long, Long, Double) => 
        (row._1, row._2.toString + ":" + row._3.toString)
    }
    .groupBy('row) { grp => grp.mkString('colval, ",") }
    .write(Csv(args("xmatrix")))
    
  benefits.project(Schemas.Diseases)
    .joinWithSmaller('DESYNPUF_ID -> 'DESYNPUF_ID, memberDict)
    .discard('DESYNPUF_ID)
    .project('MEM_IDX :: Schemas.Diseases.tail)
    .write(Csv(args("yvectors")))
}

object PreprocJob2 {
  def main(args: Array[String]): Unit = {
    // input files
    val benefits = "data/benefit_summary.csv"
    val triples = "data/claim_triples.csv"
    val memberDict = "data/members_dict.csv"
    val codeDict = "data/codes_dict.csv"
    // output files
    val xmatrix = "data/x_matrix.csv"
    val yvectors = "data/y_vectors.csv"
    (new PreprocJob2(Args(List(
      "--local", "",
      "--benefits", benefits,
      "--claims", triples,
      "--members", memberDict,
      "--codes", codeDict,
      "--xmatrix", xmatrix,
      "--yvectors", yvectors)))
    ).run
    Console.println("==== xmatrix.csv ====")
    Source.fromFile(xmatrix).getLines().slice(0, 3)
      .foreach(Console.println(_))
    Console.println("==== yvectors.csv ====")
    Source.fromFile(yvectors).getLines().slice(0, 3)
      .foreach(Console.println(_))
  }
}

The inputs to this phase are the original benefits file, the claims triple data generated in Phase I and the two dictionaries generated in the previous step. The fat JAR already contains the PreprocJob2, so we can just reuse the JAR from the previous step. Parameters to launch the job on Amazon EMR are shown below.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
Job JAR: s3://${BUCKET}/cmspp/scalding-meddata-preproc-0.1.0.jar
Job Arguments:
    com.mycompany.cmspp.PreprocJob2 \
    --hdfs \
    --benefits s3://${BUCKET}/cmspp/benefits/benefit_summary.csv \
    --claims s3://${BUCKET}/cmspp/triples/ \
    --members s3://${BUCKET}/cmspp/members_dict/members_dict.csv \ 
    --codes s3://${BUCKET}/cmspp/codes_dict/codes_dict.csv \
    --xmatrix s3://${BUCKET}/cmspp/xmatrix \
    --yvectors s3://${BUCKET}/cmspp/yvectors

Here is what the output data looks like. As you can see, this data can now be used to construct sparse or dense input matrices for training a classification or regression algorithm. Once again, to the question of elegance, I realize that the columns could have been sorted, and the values in the yvectors should have been {0,1} not {1,2}. But these are easy to handle in downstream programs.

1
2
3
4
5
6
7
8
==== xmatrix.csv ====
0,"69:0.15617376188860607,65:0.15617376188860607,..."
1,"70:0.16222142113076254,68:0.16222142113076254,..."
2,"52:0.3333333333333333,43:0.3333333333333333,..."
==== yvectors.csv ====
0,1,1,1,2,1,1,1,1,2,2,2
1,1,1,1,1,1,1,1,1,2,2,2
2,2,1,2,1,2,2,2,1,2,1,2

Thats all I have for today. I have wanted for a while to be able to build Scalding jobs that could be run on Amazon EMR (or a Hadoop cluster other than on my laptop), so this is quite big for me. I plan on using this data as input to some classification program - I will write about that if its interesting enough to share. All the code described here can be found in GitHub page for this project.