Sunday, December 29, 2013

Akka Content Ingestion Pipeline, Part III


In this post, I add a JSON/HTTP front end to my Akka Content Ingestion Pipeline. This allows clients remote access (over HTTP) to the pipeline, so they can submit jobs to it and make some rudimentary queries against it. As you already know, a client can send Fetch messages to the pipeline to have a document be crawled off a website, parsed and indexed into a Solr index, a Stats message to query the size of the pipeline's internal queues, and a Stop message to terminate the pipeline.

The front end is a HTTP server that listens on a specified host and port and forwards HTTP GET and PUT requests to an Actor adapted for listening to HTTP requests (via the HttpServiceActor mixin). The PUT requests are accompanied by JSON payloads which correspond to the data in the message case classes. The actor's receive() method responds to these requests by transforming the JSON to the equivalent message case classes and sending the message to the controller Actor.

In addition, the front end HTTP server has a shutdown hook that will terminate the Controller and its children in an orderly fashion (waiting until the queues are all drained) by sending it a Stop request.

In pictures, this work corresponds to the top block of the diagram (updated from last week) below:


To build the HTTP server, I used the Spray library, which allows you to build an HTTP server in just a few lines of Scala code:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Source: src/main/scala/com/mycompany/delsym/rest/Main.scala
package com.mycompany.delsym.rest

import com.typesafe.config.ConfigFactory

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.io.IO
import spray.can.Http
import spray.httpx.RequestBuilding.Get

object Main extends App {

  implicit val system = ActorSystem("DelSym")
  
  val conf = ConfigFactory.load()
  val host = conf.getString("delsym.rest.host")
  val port = conf.getInt("delsym.rest.port")

  val api = system.actorOf(Props[RestActor], "api")
  IO(Http) ! Http.Bind(api, host, port = port)
  
  sys.addShutdownHook {
    Console.println("Shutting down...")
    api ! Get("/stop")
  }
}

The HTTP Server starts up a RestActor which is a specialized Actor (providing a HTTP Service). Its receive method does pattern matching on the requests and accordingly calls messages on the underlying Controller. The receive method is built off a routing table that is built using the Spray routing DSL. The code for the RestActor is shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Source: src/main/scala/com/mycompany/delsym/rest/RestActor.scala
package com.mycompany.delsym.rest

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

import com.mycompany.delsym.actors.Controller
import com.mycompany.delsym.actors.Fetch
import com.mycompany.delsym.actors.Index
import com.mycompany.delsym.actors.MessageProtocol
import com.mycompany.delsym.actors.Parse
import com.mycompany.delsym.actors.Stats
import com.mycompany.delsym.actors.Stop
import com.typesafe.config.ConfigFactory

import akka.actor.Actor
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.util.Timeout
import spray.httpx.SprayJsonSupport.sprayJsonUnmarshaller
import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable
import spray.json.pimpAny
import spray.routing.Directive.pimpApply
import spray.routing.HttpService

class RestActor extends Actor with RestService {

  val conf = ConfigFactory.load()
  implicit val timeout = Timeout(
    conf.getInt("delsym.rest.timeout").seconds)

  val controller = actorRefFactory.actorOf(
    Props[Controller], "controller")

  def actorRefFactory = context
  
  def receive = runRoute {
    (get & path("stats")) {
      jsonpWithParameter("callback") {
        complete {
          val future = (controller ? Stats(Map.empty))
            .mapTo[Stats]
          val result = Await.result(future, timeout.duration)
          import MessageProtocol.statsFormat
          result.toJson.prettyPrint
        }
      }
    } ~
    (put & path("fetch")) { 
      jsonpWithParameter("callback") {
        import MessageProtocol.fetchFormat
        entity(as[Fetch]) { fetch => 
          complete {
            controller ! fetch
            "Got(" + fetch.toJson.compactPrint + ")"
          }  
        }
      }
    } ~
    (put & path("parse")) { 
      jsonpWithParameter("callback") {
        import MessageProtocol.parseFormat
        entity(as[Parse]) { parse => 
          complete {
            controller ! parse
            "Got(" + parse.toJson.compactPrint + ")"
          }  
        }
      }
    } ~
    (put & path("index")) { 
      jsonpWithParameter("callback") {
        import MessageProtocol.indexFormat
        entity(as[Index]) { index => 
          complete {
            controller ! index
            "Got(" + index.toJson.compactPrint + ")"
          }  
        }
      }
    } ~
    (get & path("stop")) { 
      complete {
        import MessageProtocol.stopFormat
        controller ! Stop(0)
        "Stop signal sent"
      }
    }    
  }
}

trait RestService extends HttpService {

  implicit def executionContext = 
    actorRefFactory.dispatcher
}

Spray also provides JSON marshalling/unmarshalling facilities. This is automatic for native types and collections, but for case classes, it is necessary to specify the protocol. Since our messages are all case classes, we specify the protocol as below. This protocol needs to be brought into scope just before the actual JSON marshalling/unmarshalling, which is why we have the import MessageProtocol.*Format calls in the code above.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Source: src/main/scala/com/mycompany/delsym/actors/DelsymMessage.scala
package com.mycompany.delsym.actors

import akka.actor.ActorRef
import spray.json._
import DefaultJsonProtocol._

...

/////////////// Message <--> JSON ser/deser ////////////

object MessageProtocol extends DefaultJsonProtocol {
  implicit val fetchFormat = jsonFormat3(Fetch)
  implicit val parseFormat = jsonFormat1(Parse)
  implicit val indexFormat = jsonFormat1(Index)
  implicit val statsFormat = jsonFormat1(Stats)
  implicit val stopFormat = jsonFormat1(Stop)
}

To test this, I used cURL to send in a stats GET request and a fetch PUT request. The commands and their outputs are shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
sujit@tsunami:~/Projects/delsym$ curl localhost:8080/stats
{
  "stats": {
    "parsers": 0,
    "fetchers": 0,
    "indexers": 0
  }
}
sujit@tsunami:~/Projects/delsym$ curl -X PUT \
    -H "Content-Type: application/json \
    -d '{"url":"http://www.foo.com/bar", "depth":0, "metadata": {}}' \
    http://localhost:8080/fetch
Got({"url":"http://www.foo.com/bar","depth":0,"metadata":{}})

Although there is not that much code to show for it, this took me almost 2 days of experimentation to get right, mainly because Spray seems to make heavy use of implicits which are not really evident unless you read the documentation thoroughly. Here are some sites that helped me figuring things out.

  • GitHub Gist from Ayose Crzorla demonstrating a very simple Scala application that talks to two HttpServiceActors. This is what I started with.
  • Spray (REST on Akka) slides from Mathias Doenitz's talk in Paris Scala IO. This gave me some directive patterns that I used to implicitly convert JSON into message case classes and generate JSONP (although the wrapping in the callback doesn't work because I don't know how to set the content type).
  • Brandon Amos's Blog Post on adding shutdown hooks in Scala.

In addition to this, I also studied the code samples from the Akka in Action book, and downloaded the examples provided by Spray looking for useful patterns.

This part got done a bit earlier than planned, probably because I can sit around doing this at home all day over our Christmas to New Year office closure, but I am going to publish it anyway and move on to looking at how to distribute this application across multiple servers next. So in (the very likely) case that I don't post again before next year, belated Merry Christmas wishes and I hope you have a very Happy New Year and good times ahead in 2014.

Update 2014-01-01: For the remoting work, sbt gave me errors trying to download akka-remote for Akka 2.1.2 (and Spray 1.1-20130123) which I was working with so far (based on the version in the code for the Akka in Action book). So I upgraded Akka to the current latest stable version (Akka 2.2.3 and Spray 1.2.0) as a result of which both the classes in this post failed to compile. I had to rewrite them against the new API (using code examples from spray-routing examples). I have updated the code in the post to match the one in the DelSym GitHub repo.

Friday, December 27, 2013

Akka Content Ingestion Pipeline, Part II


Last week, I described the beginnings of an Akka based Content Ingestion Pipeline. In that version, all the messages and actors were defined, but the actor's "behavior" consisted only of log.info() messages describing what it did. In this post, I describe changes to add a HTTP fetcher to fetch pages from remote (or local, more on that later) webservers, a NoSQL database (I used MongoDB) to hold the document being processed and the Solr Index into which the content will finally be written out for searching.

I call these components DAOs (Data Access Objects) because they wrap data repositories. Factoring out this functionality out of the Actors makes it possible to test them in isolation. There are two other components that don't quite fit the DAO definition, but still perform non-trivial work that benefits from being unit tested - these are the Outlinks Finder and the Parser. The first finds URL patterns inside the text, and is really only applicable for situations where you want to use the pipeline to crawl a website. The parser converts content to plain text - in case of HTML or XML its basically just removing the tags, in cases like PDF its more complicated - I use the Apache Tika parser for this.

As you can imagine, with all this unit testing, I've also grown reasonably comfortable with ScalaTest. In order to test the entire Actor flow, I built Mock versions of the DAOs above, which just updates a bunch of counters, so I can verify that the actors called them as expected. A nice side effect of this is that the components are all swappable. Currently I have a somewhat lame selection criteria based on the configuration variable "testuser" to determine whether to use the mock or the real implementation. In a production system, it should probably be replaced with faactories supplying the appropriate implementation for the content type.

So in pictures, the work that I describe in this post can be summarized as the bottom layer of the diagram (updated from last week's diagram) below.


I describe the actors and their (real) supporting components in a depth-first manner. The Fetch Worker checks to see if the URL it got has been crawled within the last 30 days (configurable via application.conf), if so its not fetched again. If not, it synchronously invokes the fetch method on its HttpFetcher. While one usually associates fetching documents over HTTP with web crawling, this can be a good model for crawling files off a local filesystem also - exposing the files via an HTTP server (such as Apache HTTPD or Lighttpd for example) frees you from having to collocate your fetchers with your filesystem.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Source: src/main/scala/com/mycompany/delsym/actors/FetchWorker.scala
package com.mycompany.delsym.actors

import java.util.Date
import com.mycompany.delsym.daos.HttpFetcher
import com.mycompany.delsym.daos.MongoDbDao
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.actorRef2Scala
import com.mycompany.delsym.daos.MockDbDao
import com.mycompany.delsym.daos.MockHttpFetcher

class FetchWorker extends Actor with ActorLogging {

  val conf = ConfigFactory.load()
  
  val testUser = conf.getBoolean("delsym.testuser")
  val mongoDbDao = if (testUser) new MockDbDao() 
                   else new MongoDbDao()
  val httpFetcher = if (testUser) new MockHttpFetcher() 
                    else new HttpFetcher()
                    
  val refreshInterval = conf.getLong(
    "delsym.fetchers.refreshIntervalDays") * 8640000L
  
  def receive = {
    case m: Fetch => {
      if (shouldFetch(m.url)) {
        log.info("Fetching URL: {}", m.url)
        httpFetcher.fetch(m.url) match {
          case Left(f) => log.error(f.e, f.msg)
          case Right(content) => 
            mongoDbDao.insertFetched(
              m.url, m.depth, m.metadata, content) match {
              case Left(f) => log.error(f.e, f.msg)
              case _ => {}
            }
        }
      }
      sender ! FetchComplete(m.url)
    }
    case _ => log.info("Unknown message.")
  }

  /**
   * Return true if id does not exist or if date
   * fetched is too far into the past.
   */
  def shouldFetch(url: String): Boolean = {
    val current = new Date().getTime()
    val lastFetched = mongoDbDao.getByUrl(
        url, List("fts")) match {
      case Right(row) => row.getOrElse("fts", current)
                            .asInstanceOf[Long]
      case Left(f) => log.error(f.e, f.msg); current
    }
    lastFetched + refreshInterval > current
  }
}

The HttpFetcher is a simple HttpClient implementation that reads a web page and either returns the contents if the read was successful, or the exception if not. The Fetcher Worker processes the Either returned as follows. If an exception, it logs it and processes the next URL in the queue. If a result, the content is persisted into MongoDB and a FetchComplete reply is sent back to the Fetcher router. The FetchComplete contains a forward parameter that is set to false if the page was too new to be fetched - if the forward parameter is false, the controller will terminate the message flow.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Source: src/main/scala/com/mycompany/delsym/daos/HttpFetcher.scala
package com.mycompany.delsym.daos

import org.apache.commons.httpclient.HttpClient
import org.apache.commons.httpclient.methods.GetMethod
import org.apache.commons.httpclient.params.HttpMethodParams
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler
import org.apache.commons.httpclient.HttpStatus
import org.apache.commons.httpclient.HttpException
import com.typesafe.config.ConfigFactory
import java.io.IOException
import scala.io.Source

class HttpFetcher extends BaseHttpFetcher {
  
  val conf = ConfigFactory.load()
  val numRetries = conf.getInt("delsym.fetchers.numRetries")

  override def fetch(url: String): Either[FailResult,String] = {
    val httpclient = new HttpClient()
    val getmethod = new GetMethod(url)
    getmethod.getParams().setParameter(
      HttpMethodParams.RETRY_HANDLER, 
      new DefaultHttpMethodRetryHandler(numRetries, false))
    try {
      val status = httpclient.executeMethod(getmethod)
      if (status == HttpStatus.SC_OK) {
        val is = getmethod.getResponseBodyAsStream()
        Right(Source.fromInputStream(is).mkString)
      } else {
        val message = "Fetch of %s failed (code=%d): %s"
          .format(url, status, getmethod.getStatusLine())
        Left(FailResult(message, null))
      } 
    } catch {
      case e: HttpException => 
        Left(FailResult("Fetch of [%s] failed, protocol violation"
        .format(url), e)) 
      case e: IOException => 
        Left(FailResult("Fetch of [%s] failed, transport error"
        .format(url), e))
    } finally {
      getmethod.releaseConnection()
    }
  }
}

The MongoDbDao is invoked from the HttpFetcher component in order to (optionally create a document record and) insert the fetched data into the document record. It is also invoked from the ParseWorker and IndexWorker components to update the document record. The MongoDbDao component depends on a database (named in the application.conf file) being available and an index on the "url" field being declared on the specified collection (also named in application.conf). This can be done with the following commands in the mongo shell.

1
2
3
4
// create database delsymdb
use delsymdb
// create index on field "url" in collection "documents" in "delsymdb"
db.documents.ensureIndex({"url" : 1}, {unique : true })

The code for the MongoDbDao is shown below. It uses Casbah, the Scala driver for MongoDB. I chose MongoDB because its a document oriented database, and as such is a perfect fit for the application (each row represents a single document). I found MongoDB easy to learn when I took the excellent (and free) M101P course conducted by MongoDB University (formerly 10gen) last year, and the intuitive mongo shell makes it very simple to use as well.

Like the HttpFetcher, the MongoDbDao also returns an Either structure back to the Actor. It has four methods, one each to insert fields retrieved by the Fetcher, Parser and Index workers, and one to return the Document as a Map of key value pairs given the URL.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
// Source: src/main/scala/com/mycompany/delsym/daos/MongoDbDao.scala
package com.mycompany.delsym.daos

import java.util.Date
import scala.collection.JavaConversions.asScalaSet
import com.mongodb.casbah.Imports.MongoClient
import com.mongodb.casbah.Imports.MongoDBObject
import com.mongodb.casbah.Imports.wrapDBObj
import com.typesafe.config.ConfigFactory
import com.mongodb.casbah.WriteConcern

class MongoDbDao extends BaseDbDao {

  val conf = ConfigFactory.load()
  val mongoClient = MongoClient(
    conf.getString("delsym.mongodb.host"),
    conf.getInt("delsym.mongodb.port"))
  val dbname = conf.getString("delsym.mongodb.dbname")
  val collname = conf.getString("delsym.mongodb.collname")
  val mongoColl = mongoClient(dbname)(collname)
  
  override def insertFetched(url: String, depth: Int, 
      fetchMetadata: Map[String,Any],
      content: String): Either[FailResult,Unit] = {
    val query = MongoDBObject("url" -> url)
    val builder = MongoDBObject.newBuilder
    builder += "content" -> content
    builder += "url" -> url
    builder += "depth" -> depth
    fetchMetadata.foreach(
      kv => builder += "f_" + kv._1 -> kv._2)
    builder += "fts" -> new Date().getTime()
    try {
      mongoColl.update(query, builder.result, true, 
        false, WriteConcern.Safe)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting fetch data", e))
    }
  }
  
  override def insertParsed(url: String, text: String, 
      parseMetadata: Map[String,Any]): 
      Either[FailResult,Unit] = {
    val query = MongoDBObject("url" -> url)
    val builder = MongoDBObject.newBuilder
    parseMetadata.foreach(
      kv => builder += "p_" + kv._1 -> kv._2)
    builder += "textContent" -> text
    builder += "pts" -> new Date().getTime()
    val update = MongoDBObject("$set" -> builder.result)
    try {
      mongoColl.update(query, update, true, 
        false, WriteConcern.Safe)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting parse data", e))
    }
  }
  
  override def insertIndexed(url: String): 
      Either[FailResult,Unit] = {
    val query = MongoDBObject("url" -> url)
    val update = MongoDBObject("$set" -> MongoDBObject(
      "its" -> new Date().getTime()))
    try {
      mongoColl.update(query, update, true, 
        false, WriteConcern.Safe)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting index data", e))
    }
  }
      
  override def getByUrl(url: String, fields: List[String]): 
      Either[FailResult,Map[String,Any]] = {
    try {
      val query = MongoDBObject("url" -> url)
      mongoColl.findOne(query) match {
        case Some(row) => {
          if (fields.isEmpty) {
            Right(Map() ++ row.keySet()
              .map(field => (field, row(field))))
          } else {
            Right(Map() ++ fields
              .map(field => (field, row(field))))
          }
        }
        case None => Right(Map.empty)
      }
    } catch {
      case e: Exception => Left(FailResult(e.getMessage(), e))
    }
  }
  
  override def close(): Unit = mongoClient.close()
}

The next actor is the Parse Actor. This worker responds to a message containing a Parse(url) request, by retrieving the contents of the url from MongoDB, getting the appropriate parser (I use an array of Apache Tika parsers) for the content type, parsing out the text and metadata, then updating MongoDB with the text, the parsed metadata and the parse timestamp. Once done, it sends back a ParseComplete reply to its router.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Source: src/main/scala/com/mycompany/delsym/actors/ParseWorker.scala
package com.mycompany.delsym.actors

import com.mycompany.delsym.daos.MongoDbDao
import com.mycompany.delsym.daos.TikaParser
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.actorRef2Scala
import com.mycompany.delsym.daos.MockDbDao
import com.mycompany.delsym.daos.MockParser

class ParseWorker extends Actor with ActorLogging {

  val conf = ConfigFactory.load()

  val testUser = conf.getBoolean("delsym.testuser")
  val mongoDbDao = if (testUser) new MockDbDao()
                   else new MongoDbDao()
  val parser = if (testUser) new MockParser()
               else new TikaParser()

  def receive = {
    case m: Parse => {
      parse(m.url)
      sender ! ParseComplete(m.url)
    }
    case _ => log.info("Unknown message received.")
  }
  
  def parse(url: String): Unit = {
    log.info("Parsing URL: {}", url)
    try {
      mongoDbDao.getByUrl(url, List.empty) match {
        case Right(row) => {
          if (! row.isEmpty) {
            val content = row("content").asInstanceOf[String]
            parser.parse(url, content) match {
              case Right(textmeta) => {
                mongoDbDao.insertParsed(
                    url, textmeta._1, textmeta._2) match {
                  case Left(f) => log.error(f.e, f.msg)
                  case _ => {}
                }
              }
              case Left(f) => log.error(f.e, f.msg)
            }
          }
        }
        case Left(f) => log.error(f.e, f.msg)
      }
    } catch {
      case e: Exception => 
        log.error(e, "Error parsing URL: " + url)
    }
  }
}

The Parser deals with the mechanics of converting the raw content for a URL into text, and extracting additional metadata such as title and author from it. Apache Tika hides the details and provides a common facade for this functionality.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Source: src/main/scala/com/mycompany/delsym/daos/TikaParser.scala
package com.mycompany.delsym.daos

import java.io.ByteArrayInputStream

import org.apache.tika.metadata.Metadata
import org.apache.tika.parser.AutoDetectParser
import org.apache.tika.parser.ParseContext
import org.apache.tika.parser.Parser
import org.apache.tika.parser.audio.AudioParser
import org.apache.tika.parser.html.HtmlParser
import org.apache.tika.parser.image.ImageParser
import org.apache.tika.parser.microsoft.OfficeParser
import org.apache.tika.parser.opendocument.OpenOfficeParser
import org.apache.tika.parser.pdf.PDFParser
import org.apache.tika.parser.rtf.RTFParser
import org.apache.tika.parser.txt.TXTParser
import org.apache.tika.parser.xml.XMLParser
import org.apache.tika.sax.WriteOutContentHandler

class TikaParser extends BaseParser {

  override def parse(url: String, content: String): 
      Either[FailResult,Pair[String,Map[String,Any]]] = {
    try {
      val handler = new WriteOutContentHandler(-1)
      val metadata = new Metadata()
      val ctx = new ParseContext()
      val parser = getParser(url)
      parser.parse(new ByteArrayInputStream(
        content.getBytes), handler, metadata, ctx)
      val parseMetadata = List(
        ("title", metadata.get(Metadata.TITLE)),
        ("author", metadata.get(Metadata.CREATOR)))
        .filter(x => x._2 != null)
        .toMap
      Right(Pair(handler.toString(), parseMetadata))      
    } catch {
      case e: Exception => Left(FailResult(
        "Parsing of URL:" + url + " failed", e))
    }
  }
  
  
  def getParser(url: String): Parser = {
    val suffix = url.slice(
      url.lastIndexOf("."), url.length())
    suffix match {
      case "text" | "txt" => new TXTParser()
      case "html" | "htm" => new HtmlParser()
      case "xml"          => new XMLParser()
      case "pdf"          => new PDFParser()
      case "rtf"          => new RTFParser()
      case "odt"          => new OpenOfficeParser()
      case "xls" | "xlsx" => new OfficeParser()
      case "doc" | "docx" => new OfficeParser()
      case "ppt" | "pptx" => new OfficeParser()
      case "pst"          => new OfficeParser()
      case "vsd"          => new OfficeParser()
      case "png"          => new ImageParser()
      case "jpg" | "jpeg" => new ImageParser()
      case "mp3"          => new AudioParser()
      case _              => new AutoDetectParser()
    }
  }
}

An optional (web page crawling only) step is the creation of outlinks. An HTML page contains embedded links to other HTML pages. The client signals a desire to crawl such links to the pipeline by passing in a positive depth parameter in the Fetch method (which is stored in MongoDB). When the Controller receives the ParseComplete message and a positive depth, it parses the content for links and sends an array of Fetch messages (with depth = depth - 1 and the same fetch metadata as its parent). We show the OutlinkFinder code below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Source: src/main/scala/com/mycompany/delsym/daos/HtmlOutlinkFinder.scala
package com.mycompany.delsym.daos

import java.util.regex.Pattern
import com.typesafe.config.ConfigFactory

class HtmlOutlinkFinder extends BaseOutlinkFinder {

  val OutlinkPattern = 
    Pattern.compile("""(https|http)\://\S+\.\S{2,3}(/\w*)?""")
  
  val conf = ConfigFactory.load()
  val testUser = conf.getBoolean("delsym.testuser")
  val mongoDbDao = if (testUser) new MockDbDao()
                   else new MongoDbDao()
                   
  override def findOutlinks(url: String):
      Either[FailResult,List[(String,Int,Map[String,Any])]] = {
    try {
      mongoDbDao.getByUrl(url, List.empty) match {
        case Right(row) => { 
          if (row.contains("content")) { 
            val content = row("content").asInstanceOf[String]
            val depth = row.getOrElse("depth", 0)
                           .asInstanceOf[Int]
            val fetchMeta = row.keys
              .filter(k => ! k.startsWith("f_"))
              .map(k => (k, row(k)))
              .toMap
            if (depth > 0) {
              val matcher = OutlinkPattern.matcher(content)
              val matches = Stream.continually(matcher.find())
                .takeWhile(m => m == true)
                .map(m => matcher.group())
                .toList
              Right(matches.map(m => 
                (m, depth - 1, fetchMeta)))
            } else {
              Right(List.empty)
            }
          } else {
            Right(List.empty)
          }
        }
        case _ => Right(List.empty)
      }
    } catch {
      case e: Exception => 
        Left(FailResult("Error finding outlinks", e))
    }
  }
}

After the (optional) outlink finding process, the controller sends an Index(url) message to the Indexer Worker via the Index Router. The Index Worker retrieves the document from MongoDB and passes it to the SolrPublisher. Once done it sends an IndexComplete message to its router to complete the pipeline for a single URL.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Source: src/main/scala/com/mycompany/delsym/actors/IndexWorker.scala
package com.mycompany.delsym.actors

import com.mycompany.delsym.daos.MongoDbDao
import com.mycompany.delsym.daos.SolrPublisher
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.actorRef2Scala
import com.typesafe.config.ConfigFactory
import com.mycompany.delsym.daos.MockDbDao
import com.mycompany.delsym.daos.MockSolrPublisher

class IndexWorker extends Actor with ActorLogging {

  val conf = ConfigFactory.load()
  
  val testUser = conf.getBoolean("delsym.testuser")
  val mongoDbDao = if (testUser) new MockDbDao()
                   else new MongoDbDao()
  val solrPublisher = if (testUser) new MockSolrPublisher() 
                      else new SolrPublisher()
  
  override def postStop() = solrPublisher.commit()
  
  def receive = {
    case m: Index => {
      index(m.url)
      sender ! IndexComplete(m.url)
    }
    case _ => log.info("Unknown message received.")
  }

  def index(url: String): Unit = {
    log.info("Indexing URL: {}", url)
    try {
      mongoDbDao.getByUrl(url, List.empty) match {
        case Right(row) => {
          if (! row.isEmpty) {
            solrPublisher.publish(url, row)
            mongoDbDao.insertIndexed(url) match {
              case Left(f) => log.error(f.e, f.msg)
              case _ => {}
            }
          }
        }
        case Left(f) => log.error(f.e, f.msg) 
      }
    } catch {
      case e: Exception => 
        log.error(e, "Error indexing URL:" + url)
    }
  }
}

The SolrPublisher expects an Apache Solr server with a schema.xml defining the fields it needs to write (configured in application.conf:delsym.solr.fieldnames). I used this opportunity to upgrade the Solr installation on my notebook to 4.6 (the latest) and it turns out that all the fields are already configured in the example schema. However, these fields are named differently in MongoDB so we needed to specify a mapping (in application.conf) from the MongoDB names to the Solr names. Apart from this, all I had to do was start up the example app using "java -jar start.jar" and view the index via its web interface at http://localhost:8983/solr/.

The SolrPublisher converts the MongoDB field names into equivalent Solr field names, creates a SolrInputDocument with these fields, and sends an addDocument request to the Solr server. It keeps a count of how many it wrote, and commits after every commitInterval (configurable via application.conf) requests. It has a commit() method which is called from the IndexWorker's postStop() lifecycle hook so all messages are committed when the IndexWorker terminates.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Source: src/main/scala/com/mycompany/delsym/daos/SolrPublisher.scala
package com.mycompany.delsym.daos

import java.util.concurrent.atomic.AtomicLong

import org.apache.solr.client.solrj.impl.HttpSolrServer
import org.apache.solr.common.SolrInputDocument

import com.typesafe.config.ConfigFactory

class SolrPublisher extends BaseSolrPublisher {

  val conf = ConfigFactory.load()

  val solrServer = new HttpSolrServer(
    conf.getString("delsym.solr.server"))
  val dbFieldNames = conf.getString(
    "delsym.solr.dbfieldnames").split(",")
  val solrFieldNames = conf.getString(
    "delsym.solr.solrfieldnames").split(",")
  val fieldNameMap = solrFieldNames.zip(dbFieldNames).toMap
  val commitInterval = conf.getInt("delsym.solr.commitInterval")

  val numIndexed = new AtomicLong(0L)

  override def publish(url: String, row: Map[String,Any]):
      Either[FailResult,Unit] = {
    try {
      val doc = new SolrInputDocument()
      solrFieldNames
        .filter(f => row.contains(fieldNameMap(f)))
        .foreach(f =>
          doc.addField(f, row(fieldNameMap(f))
             .asInstanceOf[String]))
      solrServer.add(doc)
      if (numIndexed.incrementAndGet() % 
          commitInterval == 0) commit()
      Right()
    } catch {
      case e: Exception => Right(
        FailResult("Error publishing to Solr", e))
    }   
  }
  
  override def commit(): Unit = solrServer.commit()
  
  override def close(): Unit = solrServer.shutdown()
}

For each of these components, I constructed unit tests to verify that they run as expected. If you are interested, you can find the code in my GitHub page for this project. In addition, I also built a unit test to test the message flow through the various actors. To do this, I had to replace the DAOs with Mock versions which did nothing but update some counters. Both the "real" and mock DAOs extend from a single parent type, so I can switch between them in the workers (at the moment using an if condition). I show the Mock DAOs below (without much explanation since its not really needed) below:

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Source: src/main/scala/com/mycompany/delsym/daos/BaseDao.scala
package com.mycompany.delsym.daos

import java.util.concurrent.atomic.AtomicLong

trait BaseDao {}

abstract class BaseHttpFetcher extends BaseDao {
  def fetch(url: String): Either[FailResult,String]
}

abstract class BaseDbDao extends BaseDao {
  def insertFetched(url: String, depth: Int, 
    fetchMetadata: Map[String,Any], 
    content: String): Either[FailResult,Unit]
  
  def insertParsed(url: String, text: String, 
    parseMetadata: Map[String,Any]): 
    Either[FailResult,Unit]
    
  def insertIndexed(url: String): Either[FailResult,Unit]
  
  def getByUrl(url: String, fields: List[String]): 
    Either[FailResult,Map[String,Any]]
  
  def close(): Unit
}

abstract class BaseParser extends BaseDao {
  def parse(url: String, content: String): 
    Either[FailResult,Pair[String,Map[String,Any]]]
}

abstract class BaseOutlinkFinder extends BaseDao {
  def findOutlinks(url: String): 
    Either[FailResult,List[(String,Int,Map[String,Any])]]
}

abstract class BaseSolrPublisher extends BaseDao {
  def publish(url: String, row: Map[String,Any]):
    Either[FailResult,Unit]
  
  def commit(): Unit
  
  def close(): Unit
}

case class FailResult(msg: String, e: Exception)

////////////////// Mock DAOs for unit testing //////////////

object MockCounters {
  val fetched = new AtomicLong(0L)
  val parsed = new AtomicLong(0L)
  val indexed = new AtomicLong(0L)
  val dbFetched = new AtomicLong(0L)
  val dbParsed = new AtomicLong(0L)
  val dbIndexed = new AtomicLong(0L)
  val outlinkCalled = new AtomicLong(0L)
}

class MockHttpFetcher extends BaseHttpFetcher { 
  def fetch(url: String): Either[FailResult,String] = {
    MockCounters.fetched.incrementAndGet()
    Right(null)
  }
}

class MockDbDao extends BaseDbDao {
  def insertFetched(url: String, depth: Int, 
      fetchMetadata: Map[String,Any], 
      content: String): Either[FailResult,Unit] = {
    MockCounters.dbFetched.incrementAndGet()
    Right()
  }
  
  def insertParsed(url: String, text: String, 
      parseMetadata: Map[String,Any]): 
      Either[FailResult,Unit] = {
    MockCounters.dbParsed.incrementAndGet()
    Right()
  }
    
  def insertIndexed(url: String): 
      Either[FailResult,Unit] = {
    MockCounters.dbIndexed.incrementAndGet()
    Right()
  }
  
  def getByUrl(url: String, fields: List[String]): 
      Either[FailResult,Map[String,Any]] = {
    Right(Map("url" -> url, "content" -> "test"))
  }
  
  def close(): Unit = {}
}

class MockParser extends BaseParser { 
  def parse(url: String, content: String): 
      Either[FailResult,Pair[String,Map[String,Any]]] = {
    MockCounters.parsed.incrementAndGet()
    Right("test", Map("url" -> url, "content" -> "test"))
  }
}

class MockOutlinkFinder extends BaseOutlinkFinder { 
  def findOutlinks(url: String): 
      Either[FailResult,List[(String,Int,Map[String,Any])]] = {
    MockCounters.outlinkCalled.incrementAndGet()
    Right(List.empty)
  }
}

class MockSolrPublisher extends BaseSolrPublisher { 
  def publish(url: String, row: Map[String,Any]):
    Either[FailResult,Unit] = {
    MockCounters.indexed.incrementAndGet()
    Right()
  }
  
  def commit(): Unit = {}
  
  def close(): Unit = {}
}

////////////////////// end Mock DAOs //////////////////////

And here is the ScalaTest code to test the message flow:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Source: src/test/scala/com/mycompany/delsym/actors/ActorFlowTest.scala
package com.mycompany.delsym.actors

import akka.testkit.TestKit
import akka.actor.ActorSystem
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import akka.testkit.ImplicitSender
import java.util.Stack
import java.util.EmptyStackException
import akka.actor.Props
import com.mycompany.delsym.daos.MockCounters
import akka.actor.Actor
import com.typesafe.config.ConfigFactory

class ActorFlowTest(sys: ActorSystem) 
    extends TestKit(sys) 
    with FunSuite
    with BeforeAndAfterAll 
    with ImplicitSender {
  
  def this() = this(ActorSystem("DelsymTest"))
  
  test("test message flow across actors") {
    val controller = 
      system.actorOf(Props[Controller], "controller")
    val numMessages = 10
    (0 until numMessages).foreach(i => {
      controller ! Fetch(i.toString, 0, Map())
    })
    controller ! Stop
    system.awaitTermination
    Console.println("Counters=" + List(
      MockCounters.fetched.longValue(),
      MockCounters.parsed.longValue(),
      MockCounters.indexed.longValue(),
      MockCounters.dbFetched.longValue(),
      MockCounters.dbParsed.longValue(),
      MockCounters.dbIndexed.longValue(),
      MockCounters.outlinkCalled.longValue()))
    assert(MockCounters.fetched.longValue() == numMessages)
    assert(MockCounters.parsed.longValue() == numMessages)
    assert(MockCounters.indexed.longValue() == numMessages)
    assert(MockCounters.dbFetched.longValue() == numMessages)
    assert(MockCounters.dbParsed.longValue() == numMessages)
    assert(MockCounters.dbIndexed.longValue() == numMessages)
    assert(MockCounters.outlinkCalled.longValue() == numMessages)
  }
}

One thing to note is that this needs the delsym.testuser parameter in application.conf to be set to true (while all the other DAO unit tests require it to be set to false). I haven't found a good way to handle this, but I figure I should get the rest of the application working first and worry about this later. My next step is to build a REST interface on top of this actor pipeline so clients can send it requests and do limited queries against it (using the Stats message).