Monday, December 27, 2010

Playing with Scala Products

Have you ever wanted something like a case class, that magically provides decent (for some definition of decent) implementations of methods like equals and hashCode, but without all of the restrictions? I know that I have. Frequently. And recently I ran into an answer on StackOverflow that gave me an idea for how to do it. The trick is to play with Products. I'm not sure how good of an idea it is, but I figure the Internet will tell me. Here's the basic idea:

import scala.runtime.ScalaRunTime

trait ProductMixin extends Product {
  def canEqual(other: Any) = toProduct(other) ne null
  protected def equalityClass = getClass
  protected def equalityClassCheck(cls: Class[_]) = equalityClass.isAssignableFrom(cls)
  protected def toProduct(a: Any): Product = a match {
    case a: Product if productArity == a.productArity && equalityClassCheck(a.getClass) => a
    case _ => null
  }
  override def equals(other: Any) = toProduct(other) match {
    case null => false
    case p if p eq this => true
    case p => {
      var i = 0
      var r = true
      while(i < productArity && r) {
 if (productElement(i) != p.productElement(i)) r = false
 i += 1
      }
      r
    }
  }
  override def productPrefix = try {
    getClass.getSimpleName()
  } catch {
    //workaround for #4118, so classes can be defined in the REPL that extend ProductMixin
    case e: InternalError if e.getMessage == "Malformed class name" => getClass.getName()
  }
  override def hashCode = ScalaRunTime._hashCode(this)
  override def toString = ScalaRunTime._toString(this)
}

Basic Usage

There's a couple ways to use ProductMixin. The first, and probably the simplest, is to extends one of the ProductN classes and provide the appropriate defs:

class Cat(val name: String, val age: Int) extends Product2[String, Int] with ProductMixin {
  def _1 = name
  def _2 = age
}

The second way is to provide them yourself:

 class Dog(val name: String, age: Int) extends ProductMixin {
  def productArity = 2
  def productElement(i: Int) = i match {
    case 0 => name
    case 1 => age
  }
}

Either way, the result is something that has many of the benefits of case classes but allows for more flexibility. Here's some sample usage:

scala> val c1 = new Cat("Felix", 2)
val c1 = new Cat("Felix", 2)
c1: Cat = Cat(Felix,2)

scala> val c2 = new Cat("Alley Cat", 1)
val c2 = new Cat("Alley Cat", 1)
c2: Cat = Cat(Alley Cat,1)

scala> val c3 = new Cat("Alley Cat", 1)
val c3 = new Cat("Alley Cat", 1)
c3: Cat = Cat(Alley Cat,1)

scala> c2 == c3
c2 == c3
res0: Boolean = true

scala> c1 == c2
c1 == c2
res1: Boolean = false

Dealing with Inheritance

Equality in the presence of inheritance is very tricky. But, possibly foolishly, ProductMixin makes it easy! Let's say you have a hierarchy, and you want all the classes under some root class to be able to equal each other. Here's how it would be done by overriding the equalityClass so that it is the root of the hierarchy (using a not-so-good example):

scala> abstract class Animal(val name: String, val age: Int) extends Product2[String, Int] with ProductMixin {
     | override protected def equalityClass = classOf[Animal]
     | def _1 = name
     | def _2 = age
     | }
defined class Animal

scala> class Dog(name: String, age: Int) extends Animal(name, age)
class Dog(name: String, age: Int) extends Animal(name, age)
defined class Dog

scala> class Cat(name: String, age: Int) extends Animal(name, age)
class Cat(name: String, age: Int) extends Animal(name, age)
defined class Cat

scala> val c = new Cat("Felix", 1)
val c = new Cat("Felix", 1)
c: Cat = Cat(Felix,1)

scala> val d = new Dog("Felix", 1)
val d = new Dog("Felix", 1)
d: Dog = Dog(Felix,1)

scala> c == d
c == d
res0: Boolean = true

The reverse can also be accomplished by overriding equalityClassCheck such that it checks that the classes are equal instead of using isAssignableFrom. That would mean two objects could be equal if and only if they are the same class.

Wrapup

I don't know to what extent this is a good idea. I've only tested it a bit in the REPL. It's neat, but there is one problem that comes to mind: performance. Any primitive members that are elements of the product will end up being boxed prior to usage in the hashCode and equality methods, the extra layers of indirection aren't free, and neither is the loop. That being said, case classes use the exact same method for hashing, so in what way they pay the same penalty, and unless code is really hashing and equality heavy it probably wouldn't be noticeable.

Sphere: Related Content

Thursday, August 26, 2010

Scala is for VB programmers

Normally I don't go for flame bait titles. But I haven't finished my morning coffee yet so I can't help myself. There's once again a debate raging across the internet about whether Scala is more or less complex than Java, along with the more nuanced argument that yes, it is more complex but only framework and library developers have to contend with this complexity. This argument usually happens when someone posts a little bit of code like this:

def map[B, That](f: A => B)(implicit bf: CanBuildFrom[Repr, B, That]): That

And then someone responds like this:

Why do not people realize that Java is too difficult for the average programmer? That is the true purpose of Scala, to escape the complexity of Java code! Framework code in Scala, with heavy use of implicit keywords and all kinds of type abstractions, is very difficult. This is correct, but this code is not meant for innocent eyes. You do not use that sort of code when you write an application.

I've seen this type of thinking before. A few years ago I had a bout of insanity and lead an ASP.NET project using ASP.NET 2.0. I had no .NET experience prior to this project. The project failed, although the reasons for that failure are legion and unimportant here. But I noticed something about ASP.NET developers: they have no clue how the technology works. It's a black box. Do you why? Because it is a black box. I searched and searched and couldn't even find a good illustration of the lifecycle for an ASP.NET page that's using events. This type of information is put front and center in the Java world. It's absolutely buried in the Microsoft world. Or at least the parts of it that target the hoards of VB programmers that are undyingly loyal to MS. The framework is magic dust created by the great wizards in Redmond so that you can build solutions for your customers. Do not question the dust. Think about VB. Or, no don't, it might damage your brain. My coffee hasn't quite kicked in, so I should have some immunity, so I'll do it for you. VB is a black box (well, at old school VB6). It was designed to allow people who do not know how to really program, and who will probably never know how to program, to create applications. It's completely flat, opaque abstraction. The dichotomy between the application developer and the framework developer is as high and as thick as the gates of Mordor.

There are many people in the Scala community that claim Scala's complexity can be hidden from the application program. I don't believe them, but there's a chance that they are right. It's technically feasible, and I can see how it could happen if Scala started attracting lots of VB programmers. I can't see how it's going to attract lots of VB programmers, but apparently many people in the Scala community think Scala is for VB programmers. So we'll just have to wait and see...

Sphere: Related Content

Sunday, August 22, 2010

Scala Actors: loop, react, and schedulers

One of the unfortunate aspects of many of the "published" (meaning blogged) Scala Actor benchmarks out there is that they rarely pay much attention, if any, to the affects of seemingly idiomatic patterns on performance. Some of the main culprits are:

  1. react versus receive (event-based versus threaded)
  2. loop/react versus recursive react
  3. loop/receive versus receive/while
  4. tweaking (or failing to tweak) the scheduler

I've been working on setting up a "benchmarking framework" in conjunction with experimenting with modifications to the underlying thread pool so that all the possible permutations are automatically tested. What I have right now is a classic "ring" benchmark setup to permute the schedulers and loop/react versus recursive react. The loop/react pattern is more idiomatic (or at least more common), but higher overhead, and it looks something like this:

loop {
  react {
    case Msg(m) => // do stuff
    case Stop => exit()
  }
}

The reason it is high-overhead is that both loop and react raise control flow exceptions that result in the creation of new tasks for the thread pool when they are hit, so for each loop, two exceptions are raised and two tasks are executed. There's overhead in both of the operations, especially raising the exceptions. The recursive react pattern looks like this, so it can avoid the extra exception/task:

def rloop(): Unit = react {  //this would be called by the act() method
  case Msg(m) => {
    // do stuff
    rloop()
  }
  case Stop => // just drop out or call exit()
}

Using loop instead of recursive react effectively doubles the number of tasks that the thread pool has to execute in order to accomplish the same amount of work, which in turn makes it so any overhead in the scheduler is far more pronounced when using loop. Now, I should point out that the overhead really isn't that large, so if the actor is performing significant computations it will be lost in the noise. But it's fairly common to have actors do fairly little with each message. Here's some results from the ring benchmark using 10 rings of 10,000 actors passing a token around them 100 times before exiting. I'm using multiple rings because otherwise there is no parallelism in the benchmark. These are being run on my dual core Macbook.

SchedulerReactMethodTime (sec)
ManagedForkJoinSchedulerLoopReact45.416058
ManagedForkJoinSchedulerRecursiveReact25.509482
ForkJoinSchedulerLoopReact65.268584
ForkJoinSchedulerRecursiveReact45.85605
ResizableThreadPoolSchedulerLoopReact98.084794
ResizableThreadPoolSchedulerRecursiveReact53.379757

The fork/join schedulers are faster than the ResizableThreadPoolScheduler because rather than have all of the worker threads pull tasks off of a single, shared queue; each thread maintains its own local dequeue where it can place tasks directly onto if they are generated while it is running a task. This creates a kind of "fast path" for the tasks that involves much less overhead.

I believe the primary reason ManagedForkJoinScheduler is faster because ForkJoinScheduler does not always leverage the "fast path," even when in theory it could be used. I'm unsure about some of the rationale behind it, but I know some of the time the fast path is bypassed probabilistically in order to reduce the chances of starvation causing deadlock in the presence of long running or blocking tasks. ManagedForkJoinScheduler escapes this particular issue by more actively monitoring the underlying thread pool, and growing it when tasks are being starved. The second reason, and I'm somewhat unsure of the actual degree of the affects, if that ForkJoinScheduler configures the underlying thread pool so that the threads work through the local dequeues in FIFO order, while ManagedForkJoinScheduler configures the pool such that the local dequeues are processed in LIFO order. Processing in LIFO order allows the pool to take advantage of locality with regard to the tasks, basically assuming that the last task generated is the most likely to use data that's currently in cache, and thus reduce cache misses.

The benchmark outputs a lot more information than I captured in the above table. If you'd like to run it, you can obtain the code here. The project uses sbt, so you'll need to have it working on your computer. After you run update in sbt to download all of the dependencies, you can run the ring benchmark as follows:

$ sbt
[info] Building project ManagedForkJoinPool 1.0 against Scala 2.8.0
[info]    using ManagedForkJoinPoolProject with sbt 0.7.4 and Scala 2.7.7
> ringbenchmark
[info] 
[info] == compile ==
[info]   Source analysis: 1 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling main sources...
[info] Compilation successful.
[info]   Post-analysis: 79 classes.
[info] == compile ==
[info] 
[info] == copy-resources ==
[info] == copy-resources ==
[info] 
[info] == ringbenchmark ==
[info] RingBenchmark ManagedForkJoinScheduler LoopReact 2 ....output truncated...

You can tweak the benchmarks by modifying the sbt project file. If you do run them, I'm very interested in the results.

Sphere: Related Content

Saturday, August 21, 2010

Concurrency Benchmarking, Actors, and sbt tricks

Have you ever noticed that other people's microbenchmarks tend to be hard to run and often impossible to duplicate? And are frequently caveated to the hilt? When it gets down to it, a benchmark is really an experiment, and ideally a scientific experiment. That means all factors that are relevant to the results should be clearly recorded, and the tests should be easy for others to duplicate.

Custom sbt actions for benchmarks

In order to test and run benchmarks on the work I'm doing around creating a managed variant of the JSR-166y ForkJoinPool along with supporting infrastructure for use with Scala Actors, I'm creating a test harness that captures a host of environmental factors about how it was run, and writing sbt actions to make it easy to run the benchmarks and automatically permute the variables.

It still needs a lot of work, but I had some trouble figuring out a really basic task so I thought I'd share it. Basically I wanted to build a Task object that consists of several tasks based on information in the project definition and permuted parameters. It actually pretty easy, as you can see in the snippet below from my project definition:

  /** this task executes the PingPong benchmark using each available scheduler */
  lazy val pingpongbench = pingpongTaskList
  /** produces a sequence of run tasks using all the available schedulers  */
  def pingpongTaskList = {
    val pairs = 100
    val messagesPerPair = 10000
    val tasks = for(sched <- schedulers) yield pingpongTask(sched, pairs, messagesPerPair)
    tasks.reduceLeft((a, b) => a && b)
  }

You can see the whole file here. Basically Task has an && operator that essentially allows you to concatenate one task with another task. This allows you to build a whole chain of tasks. In the example above, I'm having it run the benchmark once for each scheduler configuration. Soon, I'm going to make it permute other parameters. But right now my test harness isn't playing nicely with the schedulers included in the Scala distribution, so first things first.

There's also one other little customization, which is documented, but I think it's important for benchmarking. By default, sbt runs your code in its own process. This can cause problems with multithreaded code, especially if it doesn't terminate properly. It also means the next benchmark to run has to content with any junk that the previous benchmark left around. So I configured sbt to fork new processes. It just required one line:

override def fork = forkRun

Important variables

Here's what I'm capturing for each run right now so that the results can all be dumped into a big spreadsheet for analysis. I'd like to capture more information about the host machine, such as more information about the CPUs and the loading when the benchmark is being run, but haven't got that far yet. Currently these are all captured from within the benchmark process, mostly using system properties and the Runtime object.

  1. Test Name - obviously needed so that results from multiple benchmarks can be stored in the same file
  2. Scheduler - this is my primary variable right now, I want to run each benchmark with each scheduler while holding everything else constant
  3. # of Cores/Processors - essential so that anyone looking at the results has an idea about the hardware used
  4. Java VM Name - different VMs can perform quite differently
  5. Java VM Version - performance characteristics change from version to version (usually getting better)
  6. Java Version - same reason as above, but this is probably the more publicly known version number
  7. Scala Version - this could be important in the future, as it becomes more common for different projects to be on different version of Scala
  8. OS Name and version - again, it can affect performance
  9. Processor Architecture
  10. Approximate Concurrency (number of simultaneously alive actors) - this allows us to examine concurrency levels versus resource consumption, more concurrency does not necessarily mean that more cores or threads would be helpful
  11. Approximate Parallelism (number of simultaneously runnable actors) - this measures how many cores/threads the benchmark can really keep busy
  12. Approximate Total Messages - this estimates the amount of activity that takes place during the benchmark, generally the benchmarks I'm looking at contain very little logic because they are intended to measure overhead introduced by the framework
  13. Total Wall Clock Time (seconds) - as measured using nanoTime within the benchmark process
  14. Initial Thread and Maximum Observed Thread Count - used to examine automatic expansion of the thread pool
  15. Initial Free Memory and Minimum Observed Free Memory - threads use a fair amount of memory, so performance impacts may show up as pressure on the GC as well has contention for the CPU
  16. Initial and Maximum Observed Total Memory - threads use a lot of memory, so it's important to track usage
  17. Verbose - debugging output pretty much invalidates any of these tests

Sphere: Related Content

Sunday, August 08, 2010

Improving Schedulers for High-level, Fine-grained Concurrency Frameworks

Quite a while ago, Greg Meredith made a comment that really made me stop and think, and that has been lingering in the back of my mind ever since:

Erik, Alex, John, et al, i'm loathe to hijack this thread -- which is a good one -- but the experience with the lock up described below is really just the tip of the iceberg. Unless specific scheduling contracts and semantics can be worked out, a pluggable scheduling architecture is just asking for disaster. Worse, it means that a whole host of compiler support that could be provided to apps that use actor-based concurrency is null and void because those solutions (be they model-checking or types-based) will critically depend on specific ranges of scheduling semantics. Actors and other higher level concurrency constructs are a major appeal of languages like scala. If they prove themselves to be "not ready for prime time" the impact on perception might not be contained to just that usage of the language. Best wishes, --greg
That wasn't the first time that Greg made a comment along those lines, and it certainly wasn't the last. At one point he offered to allow a few individuals look at some older code that he believed could help. Greg's a really smart guy. If he's worried, we should all be worried.

Empirical Evidence of the Problem

Contrary to what many may think this is hardly an academic problem. A significant portion of the issues people new to Scala Actors have trace to the fact that they expect the scheduler to be making guarantees that it simply does not make. Here's a sampling:

  1. Actors not creating enough threads
  2. Scala Actors different behavior on JRE 1.5 and 1.6
  3. Actor starvation problem (related to previous)
  4. Actor as new thread
There are others, including cases where people have both rightly and wrongly blamed the default scheduler for serious problems with their programs. Most of the above can be traced back to the fact that the default scheduler for Scala Actors uses a slightly modified version of the JSR166y ForkJoinPool, which has issues described below (source):
Like all lightweight-task frameworks, ForkJoin (FJ) does not explicitly cope with blocked IO: If a worker thread blocks on IO, then (1) it is not available to help process other tasks (2) Other worker threads waiting for the task to complete (i.e., to join it) may run out of work and waste CPU cycles. Neither of these issues completely eliminates potential use, but they do require a lot of explicit care. For example, you could place tasks that may experience sustained blockages in their own small ForkJoinPools. (The Fortress folks do something along these lines mapping fortress "fair" threads onto forkjoin.) You can also dynamically increase worker pool sizes (we have methods to do this) when blockages may occur. All in all though, the reason for the restrictions and advice are that we do not have good automated support for these kinds of cases, and don't yet know of the best practices, or whether it is a good idea at all.

The Scope of the Issue

The issue here is neither limited to Scala nor to actor based concurrency. The general consensus in the concurrency community is that locks and threads are not the right abstractions for concurrency in application code (or hardly any code, for that matter), but there isn't any consensus on what the right abstractions are, or if there is consensus it is that the right abstractions are problem specific. There's no one-size-fits-all concurrency abstraction.

The one trait that all high-level or higher-order concurrency abstractions have in common is that under the hood they rely on some sort of managed thread pool and/or scheduler. For purposes here, when I say "scheduler," I mean the layer with which a concurrency framework interacts directly and likely contains framework-specific logic. When I say "thread pool," I mean the layer that directly manages the threads and is concurrency framework agnostic and may even be shared by several schedulers (sharing will be problematic, but I think ultimately necessary). This isn't a hard line, and often times they may be lumped into one. However I think it's a useful dichotomy. I'm also assuming the scheduler and thread pool rely on cooperative thread sharing, and that preemptive multitasking is left to the virtual machine and/or operating system.

The point is, any concurrency abstraction where the user can submit arbitrary code to be executing can ultimately run into serious issues if that user code does something it does not expect. For example, the fork-join scheduler from JSR-166y that the Scala Actors library uses (I think Clojure uses it as well, but it may not) doesn't automatically enlarge its pool in the presence of unmanaged blocks (including normal blocking IO) or simple long-running computations. This results in the majority of the problems I previously cited, because blocking operations are extremely common tasks, and thus a very leaky abstraction.

Key Characteristics of a Good Scheduler

Unfortunately my command of type theory is rudimentary at best, so while Greg could probably describe a well-behaved scheduler using types, I'm going to have to resort to plain English:

  1. The user should not have to think about choosing a scheduler.
  2. The user should not have to think about the size of the thread pool managed by the scheduler.
  3. The user should be able to write code naturally without worrying about any assumptions the scheduler may make
  4. The user should not have to worry about starting or stopping the scheduler
  5. Multiple flavors of concurrency abstraction should be able to share the same thread pool.
  6. The scheduler should impose minimal overhead.
There's probably others, but I think this is a good start.

The Performance Problem

So why hasn't this problem already been solved? Perhaps it has been already and I just don't know about it. If someone has, please let me know. But as far as I can tell it's because there's a perceived nasty performance trade. Here's a quote from Philipp Haller:

The starvation detection mechanism in Scala 2.7 was too expensive to support in the new default scheduler. However, there is a starvation detection mechanism for thread-blocking methods of the `Actor` object (`receive` etc.).
Similar answers have been provided when people have questioned the wisdom of using the FJ framework over the classes included with the JDK. I've tested it a bit and it's true, the FJ framework does yield a significant improvement for certain classes of tasks over JDK classes (although I believe using exceptions for control flow imposes a far larger penalty). Basically it appears that the overhead associated with known solutions to the problem overcomes the introduced benefits of fine-grained concurrency,

ForkJoinPool in a nutshell

What makes ForkJoinPool so fast (other than highly optimized code) is that uses a two-level architecture for its task queues. There's one queue for the pool, and the one dequeue per thread. Most of the data associated with the dequeues is only updated from the owning thread so that tasks can be added and removed without synchronization and minimal volatile and CAS operations. The worker threads also steal work from one another in order to spread out the load. When a worker is deciding the next task to execute, it performs the following checks, using the first one to return a task:

  1. The thread's own local dequeue
  2. The dequeues of other threads in the pool, scanned in a psuedo-random pattern with guiding hints
  3. The common task queue
Threads are only added to the pool if a ManagedBlocker that the pool can see if used to block causing the target concurrency to not the be met. The default target concurrency for the ForkJoinPool is set to the number of available processors. In Scala Actors this is overridden to be twice the number of available processors. The idea behind this is simple: If you're keeping all your processors busy, then adding more threads will just slow you down. As we've seen, problems arise when a higher level of concurrency is needed due to unmanaged blocks, long running computations, or simply the required concurrency being inherently higher than the default concurrency. This happens because as long as a worker is busy or has something in its local dequeue, it won't look elsewhere for tasks. If all the workers are in this state, tasks can simply accumulate, resulting in starvation.

A Crack at Solving the Problem

As you might have guessed by now, I'm trying to solve this problem. Right now I wouldn't use the code for anything beyond testing the code, but despite the fact I'm still wrestling with the internals of ForkJoinPool I've experienced good results so far. My approach is simple: I added in a manager thread that monitors the changes in size of the various queues in the pool, and if they've been flushed since the last check, and grows the pool it tasks appear to be being starved. The overhead imposed on each worker is minimal, as it only has to update a single additional volatile field when it clears out its queue. The overhead of the manager thread is something, but in the benchmarking I've done so far it doesn't seem to add noticeable overhead. Ironically, Scala Actors already maintain a dedicated thread for the scheduler, its just that the heuristic check on the size of the pool were removed in 2.8 (although honestly they never worked that well).

I have two simple tests/micro-benchmarks using Scala Actors with a custom scheduler built on my ManagedForkJoinPool. The SleepyActor benchmark performs extremely poorly if the scheduler doesn't significantly grow the pool, because each actor sleeps on the thread. The PingPong benchmark spawns a bunch of pairs of actors that simply message each other back-and-forth. It is the type of use case where the ForkJoinScheduler shines, at least in terms of throughput. The results on my MacBook are as follows (these are NOT scientific):

BenchmarkDefault SchedulerManagedForkJoinSchedulerResizeableThreadPoolScheduler
SleepyActor92 sec30 secnot tested
PingPong59.49 sec47.17 sec66.02 sec
As you can see, performance actually improved with my scheduler. This is because the default scheduler for Scala Actors does not always use the "fast path" and put new tasks on the dequeue of the thread creating them. It only does it about half the time. So for the PingPong test you can see how the thread local queues impact performance.

Conclusions

It's too early to draw solid conclusions, but based on what I've done so far I think I can develop a solid heuristic for managing the thread pool size, and that the overhead will be negligible. The key is to not impose any more overhead on the worker threads, and to keep the computational overhead of the manager thread a low as possible. This means that the heuristic needs to be simple, and that the wait times between checks should be a long as possible, probably dynamically sized based on how smoothly the pool seems to be running.

What I need right now are more tests and benchmarks to cover "real world" scenarios. I also need to test on machines with more cores. My MacBook doesn't exactly present a powerful system. If anyone has suggestions, please post them!

Sphere: Related Content