I've been trying to write a parallelized version of Tim Bray's Widefinder problem in Scala where the parallization is "hidden" in some library-like code so that the programmer can write something that looks as clean...nay...cleaner than Tim's Ruby code using Scala. I'm also trying to figure out what the heck a monad is, and thinking about how I'll have to use ugly, imperative Java IO classes, so I decided to write some classes to make a file look like a list of strings, with each element representing one line. One critical aspect of this list is that it is lazily generated, so you don't have the wait for the entire file to be loaded to start working with it. Another critical trait is that there are no "back references" from subsequent lines, so that if you no longer hold a reference to previous nodes in the list, those nodes can be garbage collected. The interface supports all the required methods for "for" comprehensions - map, foreach, filter, and flatMap. I think it qualifies as a monad, or at least is something close. I ended up with something where usage looked kind of like this:
def main(args : Array[String]) : Unit = { val pat = Pattern.compile("GET /ongoing/When/") for (file <- args; line <- Line.open(file) if pat.matcher(line).find) cnt = cnt + 1 }Note that I'm just counting lines there, not performing the full Widefinder functionality. But still, that's pretty concise and easy on the eyes. The "for" comprehension is translated into calls to foreach. It works great for a lot of files, but it blows up with a OutOfMemoryError on large files. So why is that? I'll give you a hint, fully expanded it would look kind of like this:
args.foreach((file) => Line.open(file).filter((line) => pat.matcher(line).find).foreach((line) => cnt = cnt + 1))Can you see the problem? No? Try this:
args.foreach((file) => { val firstLine = Line.open(file) firstList.filter((line) => pat.matcher(line).find).foreach((line) => cnt = cnt + 1)) })Can you see it now? The problem is that there's a reference to the first line hanging around. Even if you don't declare the variable it's still there, lurking as the implicit "this" parameter for filter. That reference makes it so the first line is not collectable, and as a result none of the lines are collectable because they are all reachable from the first line. So the whole file is loaded into memory, resulting in an OutOfMemoryError. That seems pretty dangerous to me. So how can we solve this problem? Well, we have to make it so that references to lines disappear as we get done with them. There are a couple ways to do it. The obvious imperative way is the use a var and a while loop, but then you might was well use BufferedReader directly or use an iterator. The functional way is to use tail recursion, so you write:
def scanLines(line: Line, pat: Pattern, cnt: Int): Int = { if (line == EndOfFile) cnt else { val v = if (pat.matcher(line.value).find) 1 else 0 scanLines(line.next, pat, cnt + v) } }...and call it for each file like this:
for(file <- args) cnt = cnt + scanLines(Line.open(file), pat, 0)Notice that there is no val declaration holding the first line. This is critical, because if there is, it will run out of memory. So what do we do about it? Well, it would be easy enough to refactor methods like foreach that trigger immediate file traversal out of the normal interface and into a tail-recursive method on the companion object. Unfortunately that would break usage in for comprehensions, be inconsistent with other collection-like objects, and in general feel like poor OO. Another way to fix it would be to fix Scala so it supported full tail call optimization. Of course, that would also require adding full tail call support to the JVM. That way the unneeded "this" reference could silently disappear from the stack. This would also allow many methods to be expressed in a much cleaner way. For example:
final def foreach(f: (String) => Unit): Unit = { def fe(line: Line): Unit = { if (line != EndOfFile) { f(line.value) fe(line.next) } } fe(this) }Could be simplified down to:
final def foreach(f: (String) => Unit): Unit = { f(value) next.foreach(f) }For those of you who want to take a look at the code I have so far, here it is as an Eclipse project. I've tried to comment it a bit, but it's still a work-in-progress. There is a working (at least I think it works) mapreduce function hiding in there that allows lines to be parallel processed using actors. Unfortunately it is slower (but not substantially slower) than just processing the lines serially. But then I'm running it on an old uniprocessor PC, so maybe with more cores it would do better. If I get some free time at work I'll try it out on a multicore machine and see what happens, but I suspect that unless (or until) I hack together something that uses nio in an optimal way the task will remain IO bound...and even then it may remain IO bound. Sphere: Related Content
11 comments:
would a more expensive line-match-function make it more obvious if you are working in parallel?
Don't you have an error in second portion of code? You define val firstLine and then do firstList.filter
re: qrilka
No, that's intentional. It looks kind of stupid here because the expression is simple, but sometimes I have big chains of operations and breaking them apart into vals makes them clearer. Other times it makes debugging easier because you can see something at a more specific point in time.
erik: I think qrilka was talking about firstLine vs firstList, not you use of val.
Have you looked at Scala Streams? They will give you a List-esque experience for your file reading.
re: qrilka & Anonymous
Ooops. I'm blind.
re: Seth
This is a horrible newbie question and I'll post it to the Scala lists but...how do I open a file as a Scala Stream? Or using some other Scala API? File I/O seems to be conspicuously absent from the documentation I've seen, but then I've already demonstrated blindness so I'd appreciate a link.
I found one:
http://www.scala-lang.org/docu/files/api/scala/io/Source.html
I suppose I should have clicked on all the IO classes before rolling my own. Source is a rather non-obvious name for an input stream but it kind of makes sense once you think about it...
From what I can tell you really don't want to use scala.io.Source for any file that won't fit in memory.
/** Creates Source from file, using given character encoding, setting its
* description to filename.
*
* @param file ...
* @param enc ...
* @return ...
*/
def fromFile(file: File, enc: String): Source = {
val arr: Array[Byte] = new Array[Byte](file.length().asInstanceOf[Int])
val is = new FileInputStream(file)
is.read(arr)
val s = fromBytes(arr, enc)
s.descr = file.getName()
return setFileDescriptor(file, s)
}
Similarly:
def fromInputStream(istream: InputStream, enc: String, maxlen: Option[Int]): Source = {
val BUFSIZE = 1024
val limit = maxlen match { case Some(i) => i; case None => 0 }
val bi = new BufferedInputStream(istream, BUFSIZE)
val bytes = new collection.mutable.ArrayBuffer[Byte]()
var b = 0
var i = 0
while( {b = bi.read; i += 1; b} != -1 && (limit <= 0 || i < limit)) {
bytes += b.toByte;
}
if(limit <= 0) bi.close
fromBytes(bytes.toArray, enc)
}
Erik, could you package me up a .jar or something I can easily run without having Eclipse around? I'll try it on the big iron. Mail to tim dot bray at sun dot com, or post it with a link to my blog, or whatever.
What I didn't understand in the Erlang discussion, in the Ruby discussion and now in the Scala discussion, why people don't use memory mapped files for counting lines or parsing large files.
Because the task is IO-bound memory mapped files that work with the OS virutal memory system will be as fast as possible.
Peace
-stephan
--
Stephan Schmidt :: stephan@reposita.org
Reposita Open Source - Monitor your software development
http://www.reposita.org
Blog at http://stephan.reposita.org - No signal. No noise.
Gosh, there's a great deal of useful material above!
Post a Comment