Wednesday, December 05, 2007

Adventures in Widefinding: Complexity

For the moment I'm going to ignore the complexity of my widefinder "under the hood," and just focus on the complexity of it at the top level.   The following is Tim Bray's goal.  The red asterisk indicates that the loop should be processed in parallel instead of in serial.

counts = {}
counts.default = 0

ARGF.each_line* do |line|
if line =~ %r{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) }
counts[$1] += 1

keys_by_count = counts.keys.sort { |a, b| counts[b] <=> counts[a] }
keys_by_count[0 .. 9].each do |key|
puts "#{counts[key]}: #{key}"
So here's the first part of my Scala solution:
   val pat = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+ )")
   val map = new ConcurrentHashMap[String, AtomicInteger]()
   for (file <- args; line <- LineReader(file)) {
       val mat = pat.matcher(line)
       if (mat.find()) {
         val s =
         map.putIfAbsent(s, new AtomicInteger(0))

Notice that this is not a map-reduce based solution. This is because I tried to make it look as similar to the original Ruby as possible. There are some minor differences due to the fact that regular expressions are not special in Scala the way they are in Ruby. The big, and in my opinion most intrusive difference, is that a ConcurrentHashMap has to be used to store the results, AtomicIntegers must be used for the counts, and that weird "putIfAbsent" call needs to be used for pulling counts out of the map.

So while the concurrency itself is implicit, and the code is certainly succinct, the programmer still needs to be very much aware of the fact that the block inside the for loop will be executed in a concurrent fashion. This is certainly less elegant than a pure functional approach such as map-reduce, where no synchronization would be necessary, but I also think it is less intrusive to a programmer accustomed to imperative programming.

Here's an ugly part. This is largely due to an impedance mismatch between Java collections and Scala collections. It would look a lot better if Scala had an equivalent to ConcurrentHashMap in its library. There's also probably lots of other ways to make it cleaner. I tried several different approaches and always ran into some problem.

    val array = new Array[(String, Int)](map.size)
    val iter = map.entrySet.iterator
    var i = 0
    while (iter.hasNext) {
      val e =
      array(i) = (e.getKey, e.getValue.intValue)
      i += 1
And here's the final part, once the ugliness is done:
    quickSort(array)((e) => OrderedTuple(e))
    val limit = if (array.length < 10) array.length - 1 else 9
    for(i <- 0 to limit) {
      print(": ")

Notice having less that ten results is cleanly handled, unlike in TB's code (unless Ruby magically ignores invalid indices).

Overall I would say I came pretty close to TB's goal without without requiring a fundamental change in programming methodology. I didn't alleviate the need for the programmer to be aware of concurrency, but I don't think that's possible. Sure, people can learn to program in ways that are fundamentally parallelizable, and that probably is what universities should start teaching, but that's going to require a major shift on of part of millions of programmers, or for several generations to retire and be replaced by ones trained in functional programming. Lots of cores are here NOW, so it's not practical to wait until skills catch up. We need to work with what we have.

Sphere: Related Content


Anonymous said...

That's interesting, apparently you have parallelized the line reading and splitting, but the counting itself is linear, isn't it?

So this is the opposite of my attempt, where splitting is sequential and counting (+ matching) is done in parallel. We should try to combine this :-)

Erik Engbrecht said...

Actually the only serial part is the reading data from disk into a direct buffer. The rest is all parallel. The basic Scala for loop is just syntactic sugar for a call to foreach.

Here's the main behind-the-scenes code:

Go to the foreach method in ParallelLineReader. It spawns off a coordinator actor, which then spawns 1 more workers than you have processors. Each worker reads a chunk of the file into a direct buffer and the passes the file channel on to the next worker. The workers form a circular list, so I/O is always done sequentially. Also, memory consumption is limited to the number of workers * the size of the buffers. So if the processing runs behind the input, then it will stop reading data in until it catches up rather than eating up all your memory.

The hard part is the line boundaries, because the beginning and the end of any given read most likely is the middle of a line, not an end.