Precise data types

by Eric Torreborre (@etorreborre)

Yaron Minsky, an OCaml developer from Jane Street Capital, popularised the sentence “Make illegal states unrepresentable”. I always assumed that I understood it but I don’t think I really got what he meant. Until recently.

As constrained data

What does Yaron mean? One way I interpreted his quote was “some data has constraints and we must not allow illegal values”.

For example, if I want to represent a line of characters coming from a file, I can create a type, FileLine, and I can make sure that I only create FileLine instances from strings containing no newline-like characters (\n, \r…). This has some obvious benefits. For example, I can join FileLines with "\n" then split them on "\n" characters and I will get back the initial FileLines

case class FileLine private(line: String) {
  def append(other: FileLine): FileLine =
    FileLine(line+other)
}

object FileLine {
  /** from http://www.wikiwand.com/en/Newline */
  val newlineCharacters = List('\u000a', '\u000b', '\u000c', '\u000d', '\u0085', '\u2028', '\u2029')

  /** create a FileLine only if a string doesn't contain newline characters */
  def fromString(s: String): Option[FileLine] =
    if (s.exists(newlineCharacters.contains)) None
    else Some(FileLine(s))
}

// I can use ScalaCheck to verify my invariants
Prop.forAll { fileLines: List[FileLine] =>
  fileLines.map(_.line).mkString("\n").split("\n").map(FileLine.fromString).flatten === fileLines
}

Prop.forAll { (fileLine1: FileLine, fileLine2: FileLine) =>
  // the append operation is returns a proper FileLine
  fromString((fileLine1 append fileLine2).line).isDefined
}

In this case, instead of having a naked string, which is potentially in an undesirable state, I am using a FileLine which is guaranteed to be ok.

As data structure

My other interpretation was around “Data structures”. A NonEmptyList is a good example of that. A NonEmptyList is a List which is guaranteed to have at least one element. Look at the API, there is no way to build a NonEmptyList from an empty list. There is also no def filter(A => Boolean) method which could leave the NonEmptyList empty.

Other data structures like Red-Black trees are providing similar guarantees. Red-Black trees are not arbitrary trees, or, to make a reference to Yaron’s quote, they are not in an arbitrary state.

As CS construct

Finally I was looking at carefully crafted datatypes like Parser from the Pirate command-line library. Parser[A] models the different “things” we want to be able to parse on a command line:

  • a “switch” (-v for example)
  • a “flag” (--message "first commit" for example)
  • an “argument” (20150131 for example)
  • a full “subcommand”

Parser values represent exactly what we want to parse, nothing more, nothing less.

But parsers seems like an over-studied Computer Science topic where you would expect nice data types to come up, having Applicative instances and all.

Out of those 3 interpretations for Yaron’s quote, it seemed to me that only the first one was really of practical use on my day job because I don’t get to define new data structures every day or implement yet-another-parsing-library (YAPL (tm)).

It turns out that I hadn’t really seen the light yet!

My domain

At Ambiata, we move data around and run all sorts of computations to make predictions, produce scores, provide insights… For a given customer we run many different applications using the AWS platform. Those applications take their input data from S3 and are either executed on a dedicated EC2 machine or they are executed on a Hadoop cluster to use Map-Reduce. Then the results are persisted back to S3.

When we run a Map-Reduce application we must first transfer data from S3 to the Hadoop Distributed File System (HDFS). Given the volume of data we manage this step is generally itself a Map-Reduce application (distcopy), using several machines to copy data more efficiently.

However applications are not always running on a cluster. Sometimes we want to run the same application “locally”, for testing, using Hadoop “pseudo-distributed” mode. In that case we run the application on a single server and take input data from the same server.

Finally some applications don’t require a cluster, they just run locally but they need to take their data from S3. In that case, there needs to be a first step to download the data (with aws s3 cp) and then a step to persist back the results to S3.

This doesn’t seem much but all this transfer of data back and forth, between different file systems, is a recipe for break-down! It is very easy to get the paths wrong when writing application code or scripts doing the file synchronization. What can we do about that?

Notion

The notion project provides a notion (see the README :-)) of what a “location” is. Locations are very different depending on the file system they refer to. When we have a HdfsLocation it is possible to create a Hadoop Path from it and pass it to a Map-Reduce job. When we have a S3Location, it has a bucket and a key. Finally when we have a LocalLocation we can create a java.io.InputStream out of it.

Are we there yet? Not really. Ideally we want to pass any kind of Location to our application and have:

  • files being downloaded to the place they are going to be used: locally or on the cluster
  • no files being copied unless it is necessary
  • result files being uploaded to the place they are going to be persisted (on S3, but maybe just back to a “local” machine)

This means that we need to deal with lots of possible combinations of <storage>, <sync>, <execution>

                    <storage>
              +-------> S3 <-------+
             /                      \
            /                        \
<sync>     /                          \  <sync>
          /                            \
         v                              v
       Local <-----------------------> Hdfs
    <storage>         <sync>          <storage>
    <execution>                      <execution>

In practice, this means that a typical “Map-Reduce” application will receive:

  • some Locations for its input files
  • some Locations for where the outputs should be stored
  • an optional “sync directory” which is a location where we can temporarily store the copy of input files and create output files
  • a Hadoop Configuration indicating if the application executes locally or on the cluster

There are many variations with the above values. For example:

  • the “sync directory” is on Hdfs
  • the input locations are on S3, so the corresponding files need to be copied in the sync directory
  • the output locations are local, meaning that we don’t want to persist the results
  • the Configuration is set-up for a Map-Reduce application on the cluster

There are also some situations which don’t make sense. For example:

  • the “sync directory” can not be on S3
  • the input / output locations which are passed to a Map-Reduce job on the cluster can not be LocalLocations, they have to be HdfsLocations
  • if the application runs on the cluster and all the application locations are HdfsLocations then it doesn’t make sense to have a sync directory

The execution location

How can we make sure we properly deal with all those possibilities? Exactly like the FileLine example. We check the rules we want to check on un-restricted data then we build values for a datatype which “can’t go wrong”.

For example, we know that the sync directory location has to be on a place where we can execute code (i.e. not on S3). Let’s create a data type for that:

sealed trait ExecutionLocation

What are the 2 possible cases? LocalExecution and HdfsExecution:

case class LocalExecution(location: LocalLocation) extends ExecutionLocation
case class HdfsExecution(location: HdfsLocation) extends ExecutionLocation

Each case class encapsulates a location with a precise location type. Now, every time we pass a location around we know we can use it for execution. For example we can create a Hadoop Path out of it:

sealed trait ExecutionLocation {
  def path: Path
}
case class LocalExecution(location: LocalLocation) extends ExecutionLocation {
  def path: Path =
    new Path(location.path)
}

case class HdfsExecution(location: HdfsLocation) extends ExecutionLocation {
  def path: Path =
    new Path(location.path)
}

We can build a Path object because both LocalLocation and HdfsLocation define a path method representing the path as a String on their file system and Path knows how to interpret it.

Now, how do we create an ExecutionLocation for the sync directory?

The user can pass on the command line a Location for the sync directory. We represent that as Option[Location] to indicate that this argument can be omitted. What do we need to check in order to transform this value into a valid Option[ExecutionLocation], representing an executable location? We can create the following method, getSyncDir

def getSyncDir(syncDir:     Option[Location],
               locations:     List[Location],
               configuration: Configuration): String \/ Option[ExecutionLocation]

getSyncDir checks all the relevant rules:

  • is it ok to have an undefined sync directory? Yes if all the application files are located at the same place than the application execution
  • is it ok to have a sync directory on S3? No
  • is it ok to have a local sync directory when the configuration is a cluster one? No

The verification of each rule gives rise to a String \/ Option[ExecutionLocation] value representing the fact that we have a valid sync directory or not

syncDirectory match {
  case Some(S3Location(_,_)) =>
    "A sync directory cannot be defined on S3".left

  case None if isLocal(configuration) && locations.forall(isLocalLocation) =>
    None.right

  case Some(l @ HdfsLocation(_)) if !locations.forall(isHdfsLocation) =>
    Some(HdfsExecutionLocation(l)).right

  // more cases
  ...
}

In itself the ExecutionLocation we get for the sync directory is not enough to run the application but this is going to help us build the exact file paths that the application will need for its execution.

Input paths

The next issue looks a lot more complex. We have a bunch of files, located on either S3, locally or on Hdfs and depending on the place of execution we need to know:

  • if they need to be downloaded
  • where they should be downloaded

Let’s apply the previous approach and build a data type for that, SynchronizedLocation.

sealed trait SynchronizedLocation

Then we can enumerate all the different possibilities

// the input file and the execution are local, no need to copy the file
case class LocalNoSync(location: LocalLocation) extends SynchronizedLocation

// the input file and the execution are on Hdfs, no need to copy the file
case class HdfsNoSync(location: HdfsLocation) extends SynchronizedLocation

// the input file is local, the execution is on Hdfs we need to do some synchronization
case class LocalHdfsSync(local: LocalLocation, hdfs: HdfsLocation) extends SynchronizedLocation

// the input file is on S3, the execution is local we need to do some synchronization
case class S3LocalSync(s3: S3Location, local: LocalLocation) extends SynchronizedLocation

// the input file is on S3, the execution is on Hdfs we need to do some synchronization
case class S3HdfsSync(s3: S3Location, hdfs: HdfsLocation) extends SynchronizedLocation

In some case classes above, when there is an additional location, like hdfs: HdfsLocation in S3HdfsSync, this location represents the place where the input file must be downloaded. Also for each case class we have an executionLocation method which we use to pass a valid input location to a Map-Reduce job. For example:

sealed trait SynchronizedLocation {
  def executionLocation: ExecutionLocation
}
// the original file comes from S3 but it will be copied to Hdfs
case class S3HdfsSync(s3: S3Location, hdfs: HdfsLocation) extends SynchronizedLocation {
  def executionLocation: ExecutionLocation =
    HdfsExecutionLocation(hdfs)
}

Now, same question as before: how do we build values for that type? We create a createSyncLocation method, using the sync directory ExecutionLocation that we previously validated

def createSyncLocation(syncDir: Option[ExecutionLocation], location: Location):
  String \/ SynchronizedLocation

Then we go through all the possible combinations of Option[ExecutionLocation] and Location (9 combinations). Here are two of them:

(syncDir, location) match {
  // sync directory on Hdfs, input file on S3
  // we return a S3HdfsSync and build a download location relative to the sync directory
  case (Some(sd @ HdfsExecutionLocation(_)), s @ S3Location(_, k)) =>
    S3HdfsSync(s, (sd </> FilePath.unsafe(k)).location).right

  // no sync directory defined, input file on S3
  // this is an invalid situation
  case (None, s @ S3Location(_, _)) =>
    s"A synchronized location can not be on S3 when no sync directory is defined. Got ${s.render}".left
}

If we do that for each input file location we are left with exactly what we want:

  • SynchronizedLocations which can be passed to a synchronize method which deciding what to do (download or not download) based on each case
  • ExecutionLocations (from the SynchronizedLocation.executionLocation method) which can be safely passed to a Map-Reduce job

Conclusion

The work is not completely done here because we need to do the same with output locations and decided when files need to be uploaded to a local machine or to S3. But it turns out that the exact same datatype, SynchronizedLocation can be used for that. This was not completely obvious to me at first and I went through several steps of more intermediary and more complex datatypes before refactoring and simplifying:

  • I had different datatypes for inputs / outputs
  • the LocalHdfsSync case class was embedding a HdfsExecutionLocation instead of simply using a HdfsLocation
  • I had an additional field to model if a location was supposed to be a file or a directory because I thought I needed this information to do the synchronization

This process of expanding and collapsing code took longer than I thought but I suppose that this is the sane way to go in the face of complexity. The most direct datatype is not always obvious and needs to go through some refinements. One big rule of thumb I get back from this experience is the following:

When some piece of logic is showing up as complex conditionals or pattern matching across the code base, there is a datatype waiting to be written.

Notes

Finally a blog!

by Tim McGilchrist (@lambda_foo)

So Ambiata finally has a blog! Ambiata is a big commercial user of functional programming, in particular Haskell and Scala, and we’d like to think we’ve learnt some tricks along the way. We plan to use this space to share the things we’ve learnt and the things we’re excited about. Hopefully over time we can contribute to the resources on commercial uses of functional programming.

We’ll get our first real post out soon.