I've been trying to write a parallelized version of Tim Bray's
Widefinder problem in Scala where the parallization is "hidden" in some library-like code so that the programmer can write something that looks as clean...nay...cleaner than Tim's Ruby code using Scala.
I'm also trying to figure out what the heck a monad is, and thinking about how I'll have to use ugly, imperative Java IO classes, so I decided to write some classes to make a file look like a list of strings, with each element representing one line. One critical aspect of this list is that it is lazily generated, so you don't have the wait for the entire file to be loaded to start working with it. Another critical trait is that there are no "back references" from subsequent lines, so that if you no longer hold a reference to previous nodes in the list, those nodes can be garbage collected. The interface supports all the required methods for "for" comprehensions - map, foreach, filter, and flatMap. I think it qualifies as a monad, or at least is something close.
I ended up with something where usage looked kind of like this:
def main(args : Array[String]) : Unit = {
val pat = Pattern.compile("GET /ongoing/When/")
for (file <- args; line <- Line.open(file) if pat.matcher(line).find) cnt = cnt + 1 }
Note that I'm just counting lines there, not performing the full Widefinder functionality. But still, that's pretty concise and easy on the eyes. The "for" comprehension is translated into calls to foreach. It works great for a lot of files, but it blows up with a OutOfMemoryError on large files. So why is that? I'll give you a hint, fully expanded it would look kind of like this:
args.foreach((file) => Line.open(file).filter((line) => pat.matcher(line).find).foreach((line) => cnt = cnt + 1))
Can you see the problem? No? Try this:
args.foreach((file) => {
val firstLine = Line.open(file)
firstList.filter((line) => pat.matcher(line).find).foreach((line) => cnt = cnt + 1))
})
Can you see it now? The problem is that there's a reference to the first line hanging around. Even if you don't declare the variable it's still there, lurking as the implicit "this" parameter for filter. That reference makes it so the first line is not collectable, and as a result none of the lines are collectable because they are all reachable from the first line. So the whole file is loaded into memory, resulting in an OutOfMemoryError. That seems pretty dangerous to me.
So how can we solve this problem? Well, we have to make it so that references to lines disappear as we get done with them. There are a couple ways to do it. The obvious imperative way is the use a var and a while loop, but then you might was well use BufferedReader directly or use an iterator. The functional way is to use tail recursion, so you write:
def scanLines(line: Line, pat: Pattern, cnt: Int): Int = {
if (line == EndOfFile) cnt
else {
val v = if (pat.matcher(line.value).find) 1 else 0
scanLines(line.next, pat, cnt + v)
}
}
...and call it for each file like this:
for(file <- args) cnt = cnt + scanLines(Line.open(file), pat, 0)
Notice that there is no val declaration holding the first line. This is critical, because if there is, it will run out of memory.
So what do we do about it? Well, it would be easy enough to refactor methods like foreach that trigger immediate file traversal out of the normal interface and into a tail-recursive method on the companion object. Unfortunately that would break usage in for comprehensions, be inconsistent with other collection-like objects, and in general feel like poor OO. Another way to fix it would be to fix Scala so it supported full tail call optimization. Of course, that would also require adding full tail call support to the JVM. That way the unneeded "this" reference could silently disappear from the stack. This would also allow many methods to be expressed in a much cleaner way. For example:
final def foreach(f: (String) => Unit): Unit = {
def fe(line: Line): Unit = {
if (line != EndOfFile) {
f(line.value)
fe(line.next)
}
}
fe(this)
}
Could be simplified down to:
final def foreach(f: (String) => Unit): Unit = {
f(value)
next.foreach(f)
}
For those of you who want to take a look at the code I have so far,
here it is as an Eclipse project. I've tried to comment it a bit, but it's still a work-in-progress. There is a working (at least I think it works) mapreduce function hiding in there that allows lines to be parallel processed using actors. Unfortunately it is slower (but not substantially slower) than just processing the lines serially. But then I'm running it on an old uniprocessor PC, so maybe with more cores it would do better. If I get some free time at work I'll try it out on a multicore machine and see what happens, but I suspect that unless (or until) I hack together something that uses
nio in an optimal way the task will remain IO bound...and even then it may remain IO bound.