Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Saturday, August 21, 2010

Concurrency Benchmarking, Actors, and sbt tricks

Have you ever noticed that other people's microbenchmarks tend to be hard to run and often impossible to duplicate? And are frequently caveated to the hilt? When it gets down to it, a benchmark is really an experiment, and ideally a scientific experiment. That means all factors that are relevant to the results should be clearly recorded, and the tests should be easy for others to duplicate.

Custom sbt actions for benchmarks

In order to test and run benchmarks on the work I'm doing around creating a managed variant of the JSR-166y ForkJoinPool along with supporting infrastructure for use with Scala Actors, I'm creating a test harness that captures a host of environmental factors about how it was run, and writing sbt actions to make it easy to run the benchmarks and automatically permute the variables.

It still needs a lot of work, but I had some trouble figuring out a really basic task so I thought I'd share it. Basically I wanted to build a Task object that consists of several tasks based on information in the project definition and permuted parameters. It actually pretty easy, as you can see in the snippet below from my project definition:

  /** this task executes the PingPong benchmark using each available scheduler */
  lazy val pingpongbench = pingpongTaskList
  /** produces a sequence of run tasks using all the available schedulers  */
  def pingpongTaskList = {
    val pairs = 100
    val messagesPerPair = 10000
    val tasks = for(sched <- schedulers) yield pingpongTask(sched, pairs, messagesPerPair)
    tasks.reduceLeft((a, b) => a && b)
  }

You can see the whole file here. Basically Task has an && operator that essentially allows you to concatenate one task with another task. This allows you to build a whole chain of tasks. In the example above, I'm having it run the benchmark once for each scheduler configuration. Soon, I'm going to make it permute other parameters. But right now my test harness isn't playing nicely with the schedulers included in the Scala distribution, so first things first.

There's also one other little customization, which is documented, but I think it's important for benchmarking. By default, sbt runs your code in its own process. This can cause problems with multithreaded code, especially if it doesn't terminate properly. It also means the next benchmark to run has to content with any junk that the previous benchmark left around. So I configured sbt to fork new processes. It just required one line:

override def fork = forkRun

Important variables

Here's what I'm capturing for each run right now so that the results can all be dumped into a big spreadsheet for analysis. I'd like to capture more information about the host machine, such as more information about the CPUs and the loading when the benchmark is being run, but haven't got that far yet. Currently these are all captured from within the benchmark process, mostly using system properties and the Runtime object.

  1. Test Name - obviously needed so that results from multiple benchmarks can be stored in the same file
  2. Scheduler - this is my primary variable right now, I want to run each benchmark with each scheduler while holding everything else constant
  3. # of Cores/Processors - essential so that anyone looking at the results has an idea about the hardware used
  4. Java VM Name - different VMs can perform quite differently
  5. Java VM Version - performance characteristics change from version to version (usually getting better)
  6. Java Version - same reason as above, but this is probably the more publicly known version number
  7. Scala Version - this could be important in the future, as it becomes more common for different projects to be on different version of Scala
  8. OS Name and version - again, it can affect performance
  9. Processor Architecture
  10. Approximate Concurrency (number of simultaneously alive actors) - this allows us to examine concurrency levels versus resource consumption, more concurrency does not necessarily mean that more cores or threads would be helpful
  11. Approximate Parallelism (number of simultaneously runnable actors) - this measures how many cores/threads the benchmark can really keep busy
  12. Approximate Total Messages - this estimates the amount of activity that takes place during the benchmark, generally the benchmarks I'm looking at contain very little logic because they are intended to measure overhead introduced by the framework
  13. Total Wall Clock Time (seconds) - as measured using nanoTime within the benchmark process
  14. Initial Thread and Maximum Observed Thread Count - used to examine automatic expansion of the thread pool
  15. Initial Free Memory and Minimum Observed Free Memory - threads use a fair amount of memory, so performance impacts may show up as pressure on the GC as well has contention for the CPU
  16. Initial and Maximum Observed Total Memory - threads use a lot of memory, so it's important to track usage
  17. Verbose - debugging output pretty much invalidates any of these tests

Sphere: Related Content

Sunday, August 08, 2010

Improving Schedulers for High-level, Fine-grained Concurrency Frameworks

Quite a while ago, Greg Meredith made a comment that really made me stop and think, and that has been lingering in the back of my mind ever since:

Erik, Alex, John, et al, i'm loathe to hijack this thread -- which is a good one -- but the experience with the lock up described below is really just the tip of the iceberg. Unless specific scheduling contracts and semantics can be worked out, a pluggable scheduling architecture is just asking for disaster. Worse, it means that a whole host of compiler support that could be provided to apps that use actor-based concurrency is null and void because those solutions (be they model-checking or types-based) will critically depend on specific ranges of scheduling semantics. Actors and other higher level concurrency constructs are a major appeal of languages like scala. If they prove themselves to be "not ready for prime time" the impact on perception might not be contained to just that usage of the language. Best wishes, --greg
That wasn't the first time that Greg made a comment along those lines, and it certainly wasn't the last. At one point he offered to allow a few individuals look at some older code that he believed could help. Greg's a really smart guy. If he's worried, we should all be worried.

Empirical Evidence of the Problem

Contrary to what many may think this is hardly an academic problem. A significant portion of the issues people new to Scala Actors have trace to the fact that they expect the scheduler to be making guarantees that it simply does not make. Here's a sampling:

  1. Actors not creating enough threads
  2. Scala Actors different behavior on JRE 1.5 and 1.6
  3. Actor starvation problem (related to previous)
  4. Actor as new thread
There are others, including cases where people have both rightly and wrongly blamed the default scheduler for serious problems with their programs. Most of the above can be traced back to the fact that the default scheduler for Scala Actors uses a slightly modified version of the JSR166y ForkJoinPool, which has issues described below (source):
Like all lightweight-task frameworks, ForkJoin (FJ) does not explicitly cope with blocked IO: If a worker thread blocks on IO, then (1) it is not available to help process other tasks (2) Other worker threads waiting for the task to complete (i.e., to join it) may run out of work and waste CPU cycles. Neither of these issues completely eliminates potential use, but they do require a lot of explicit care. For example, you could place tasks that may experience sustained blockages in their own small ForkJoinPools. (The Fortress folks do something along these lines mapping fortress "fair" threads onto forkjoin.) You can also dynamically increase worker pool sizes (we have methods to do this) when blockages may occur. All in all though, the reason for the restrictions and advice are that we do not have good automated support for these kinds of cases, and don't yet know of the best practices, or whether it is a good idea at all.

The Scope of the Issue

The issue here is neither limited to Scala nor to actor based concurrency. The general consensus in the concurrency community is that locks and threads are not the right abstractions for concurrency in application code (or hardly any code, for that matter), but there isn't any consensus on what the right abstractions are, or if there is consensus it is that the right abstractions are problem specific. There's no one-size-fits-all concurrency abstraction.

The one trait that all high-level or higher-order concurrency abstractions have in common is that under the hood they rely on some sort of managed thread pool and/or scheduler. For purposes here, when I say "scheduler," I mean the layer with which a concurrency framework interacts directly and likely contains framework-specific logic. When I say "thread pool," I mean the layer that directly manages the threads and is concurrency framework agnostic and may even be shared by several schedulers (sharing will be problematic, but I think ultimately necessary). This isn't a hard line, and often times they may be lumped into one. However I think it's a useful dichotomy. I'm also assuming the scheduler and thread pool rely on cooperative thread sharing, and that preemptive multitasking is left to the virtual machine and/or operating system.

The point is, any concurrency abstraction where the user can submit arbitrary code to be executing can ultimately run into serious issues if that user code does something it does not expect. For example, the fork-join scheduler from JSR-166y that the Scala Actors library uses (I think Clojure uses it as well, but it may not) doesn't automatically enlarge its pool in the presence of unmanaged blocks (including normal blocking IO) or simple long-running computations. This results in the majority of the problems I previously cited, because blocking operations are extremely common tasks, and thus a very leaky abstraction.

Key Characteristics of a Good Scheduler

Unfortunately my command of type theory is rudimentary at best, so while Greg could probably describe a well-behaved scheduler using types, I'm going to have to resort to plain English:

  1. The user should not have to think about choosing a scheduler.
  2. The user should not have to think about the size of the thread pool managed by the scheduler.
  3. The user should be able to write code naturally without worrying about any assumptions the scheduler may make
  4. The user should not have to worry about starting or stopping the scheduler
  5. Multiple flavors of concurrency abstraction should be able to share the same thread pool.
  6. The scheduler should impose minimal overhead.
There's probably others, but I think this is a good start.

The Performance Problem

So why hasn't this problem already been solved? Perhaps it has been already and I just don't know about it. If someone has, please let me know. But as far as I can tell it's because there's a perceived nasty performance trade. Here's a quote from Philipp Haller:

The starvation detection mechanism in Scala 2.7 was too expensive to support in the new default scheduler. However, there is a starvation detection mechanism for thread-blocking methods of the `Actor` object (`receive` etc.).
Similar answers have been provided when people have questioned the wisdom of using the FJ framework over the classes included with the JDK. I've tested it a bit and it's true, the FJ framework does yield a significant improvement for certain classes of tasks over JDK classes (although I believe using exceptions for control flow imposes a far larger penalty). Basically it appears that the overhead associated with known solutions to the problem overcomes the introduced benefits of fine-grained concurrency,

ForkJoinPool in a nutshell

What makes ForkJoinPool so fast (other than highly optimized code) is that uses a two-level architecture for its task queues. There's one queue for the pool, and the one dequeue per thread. Most of the data associated with the dequeues is only updated from the owning thread so that tasks can be added and removed without synchronization and minimal volatile and CAS operations. The worker threads also steal work from one another in order to spread out the load. When a worker is deciding the next task to execute, it performs the following checks, using the first one to return a task:

  1. The thread's own local dequeue
  2. The dequeues of other threads in the pool, scanned in a psuedo-random pattern with guiding hints
  3. The common task queue
Threads are only added to the pool if a ManagedBlocker that the pool can see if used to block causing the target concurrency to not the be met. The default target concurrency for the ForkJoinPool is set to the number of available processors. In Scala Actors this is overridden to be twice the number of available processors. The idea behind this is simple: If you're keeping all your processors busy, then adding more threads will just slow you down. As we've seen, problems arise when a higher level of concurrency is needed due to unmanaged blocks, long running computations, or simply the required concurrency being inherently higher than the default concurrency. This happens because as long as a worker is busy or has something in its local dequeue, it won't look elsewhere for tasks. If all the workers are in this state, tasks can simply accumulate, resulting in starvation.

A Crack at Solving the Problem

As you might have guessed by now, I'm trying to solve this problem. Right now I wouldn't use the code for anything beyond testing the code, but despite the fact I'm still wrestling with the internals of ForkJoinPool I've experienced good results so far. My approach is simple: I added in a manager thread that monitors the changes in size of the various queues in the pool, and if they've been flushed since the last check, and grows the pool it tasks appear to be being starved. The overhead imposed on each worker is minimal, as it only has to update a single additional volatile field when it clears out its queue. The overhead of the manager thread is something, but in the benchmarking I've done so far it doesn't seem to add noticeable overhead. Ironically, Scala Actors already maintain a dedicated thread for the scheduler, its just that the heuristic check on the size of the pool were removed in 2.8 (although honestly they never worked that well).

I have two simple tests/micro-benchmarks using Scala Actors with a custom scheduler built on my ManagedForkJoinPool. The SleepyActor benchmark performs extremely poorly if the scheduler doesn't significantly grow the pool, because each actor sleeps on the thread. The PingPong benchmark spawns a bunch of pairs of actors that simply message each other back-and-forth. It is the type of use case where the ForkJoinScheduler shines, at least in terms of throughput. The results on my MacBook are as follows (these are NOT scientific):

BenchmarkDefault SchedulerManagedForkJoinSchedulerResizeableThreadPoolScheduler
SleepyActor92 sec30 secnot tested
PingPong59.49 sec47.17 sec66.02 sec
As you can see, performance actually improved with my scheduler. This is because the default scheduler for Scala Actors does not always use the "fast path" and put new tasks on the dequeue of the thread creating them. It only does it about half the time. So for the PingPong test you can see how the thread local queues impact performance.

Conclusions

It's too early to draw solid conclusions, but based on what I've done so far I think I can develop a solid heuristic for managing the thread pool size, and that the overhead will be negligible. The key is to not impose any more overhead on the worker threads, and to keep the computational overhead of the manager thread a low as possible. This means that the heuristic needs to be simple, and that the wait times between checks should be a long as possible, probably dynamically sized based on how smoothly the pool seems to be running.

What I need right now are more tests and benchmarks to cover "real world" scenarios. I also need to test on machines with more cores. My MacBook doesn't exactly present a powerful system. If anyone has suggestions, please post them!

Sphere: Related Content

Sunday, June 21, 2009

Pondering Actor Design Trades

There's been a lot of discussion of the Scala actors library lately, much of it critical, and a recent flurry of alternate implementations.  The alternate implementations (except my languishing state-based one ;-) all have one thing in common:  They are several orders of magnitude simpler.  Writing a basic actor implementation is actually pretty trivial, especially given java.util.concurrent classes that provide a decent chunk of the functionality in Scala actors, all for free on JDK5+.  So this begs the question few questions:

  1. Why is the standard Scala actor implementation so complex when others have done it in a such simpler fashion?
  2. Is it better to have one, big actor library that supports a wide variety of use cases, or a bunch of smaller ones targeted at specific niches and programming styles?
  3. If there are to be a bunch, should they just be conceptually similar (e.g. all based on the actor model), or should there be interoperability among them?

I'm not going to answer these questions now.  Instead, I'm going to try to start laying out some of what I believe to be the key characteristics of an actor implementation, and how they detract or enforce one another.  So here it goes:

  1. Guarantees
  2. Expressivity
  3. Extensibility
  4. Performance
  5. Scalability

Guarantees

The purpose of a concurrency framework is to make concurrency easier.  Concurrency is hard largely because it is extremely difficult to reason about, and thus concurrent code tends to be hard to write, laden with bugs, and subject to various odd pitfalls.  By providing various guarantees, a concurrency framework makes it easier to reason about concurrent code.  Actors are intended to free the programmer from worrying about things like locks, semaphores, thread management, etc. by encapsulating all that complexity behind a simple interface, assuming the programmer follows some basic rules like "no shared mutable state among actors."

The problem with guarantees is that in they tend to break down in the presence of limited CPU and memory resources.

Expressivity

Expressivity is difficult to define.  For purposes here, I'm going to define it as the degree to which a concise, natural expression of the programmer's intent is supported, and illustrate it by comparing Scala Actor to Lift Actor.  Scala Actors allow you to execute logic independent of message processing (note: this a violation of the theoretical model for actors) by simply placing it in the act method.  Lift Actors, on the other hand, are only triggered when they receive of message (this is consistent with the theoretical model).  For example, this makes it so that Scala Actors can do things such as perform possibly costly setup operations in their own thread before they start listening for messages.  In order to accomplish this in the Lift model, the programmer must create the actor and then send it some sort of "init" message.  The same effect can be achieved with both implementations, but it is more naturally supported by Scala Actors.  Of course there is a tradeoff here, as deviating from the theoretical model potentially weakens any guarantees that the model may provide.  The Scala Actor way also implies that an Actor has an explicit lifecycle, which as we'll see later has other significant implications.

Another example is what I'll call the "nested react pattern."  It is relatively common to want an actor to take on a different behavior after processing a message, thus altering which messages are ignored and how the received messages are processed.

loop {
 react {
    case 'foo => { 
      // do some stuff...
      react {
        case 'bar => // do some other stuff... 
      } 
    } 
  } 
}

The code above alternates between processing 'foo messages and 'bar messages.  This can be done with Lift Actor as well, but the expression is a little less natural:

class MyActor extends LiftActor {
  private val fooMode: PartialFunction[Any, Unit] = {
    case 'foo => {
      // do some stuff
      mode = barMode
    }
  }
  private val barMode: PartialFunction[Any, Unit] = {
    case 'bar => {
      // do some other stuff...
      mode = fooMode
    }
  }
  private var mode = fooMode
  protected def messageHandler = mode
}

Finally, Lift Actors exclusively use an event-based model and have no support for blocking on a thread while waiting for a message, and thus looses the ability to express patterns such as the following:

loop {
  react {
    case 'converToNumber => {
      val i: Int = receive {
        case 'one => 1
        case 'two => 2
        case 'three => 3
      }
      reply(i)
    }
  }
}

Extensibility

For purposes here, I'm going to use "extensible" to mean that a piece of software is extensible if capabilities can be added without modifying the core or breaking its semantics in a amount of effort proportional to the size of the extension.  This is narrower than the traditional definition of extensibility, which also covers the ability of a system to evolve internally.  A good example of extensibility is the ability of both Scala Actors and Lift Actors to allow the user to specify a custom scheduler.  Other examples could include adding control structures, using a different data structure for a mailbox.

The challenge with extensibility is that in order to enable it, what could otherwise be treated as the internal bits of the library must instead have well defined interfaces for components along with appropriate hooks for inserting them.  For example, a while ago I did some work to make the MessageQueue used for the mailbox overrideable (it has temporarily been overcome-by-events due to other changes).  This is a small example, but it shows how extensibility requires a greater degree of forethought.

Extensibility also benefits substantially from simplicity.  Scala Actors are almost impossible to extend from outside the scala.actors package because of their heavy reliance on package-private methods and state (mostly fixed here, but I broke remote actors in the process so no patch yet).  Lift Actors, on the other hand, are very extensible, at least within the bounds of their design (purely event-based actors with no explicit lifecyle).  Many of the flow control mechanisms could be implemented on top of the baseline approach.

At this point we see that extensibility has an interesting relationship with expressivity.  I previously claimed that Scala Actors were more expressive because the wide variety of control structures they provide (and I didn't even touch on some of the DSL-like functionality that enables all sorts of interesting things).  However, given Lift Actors far simpler and more extensible foundation, there is much more opportunity to create custom control structures as extensions to Lift Actors without modifying the core.  Thus, if you are willing to do some lower-level programming, it could be argued that Lift Actors are in reality more expressive due to their extensibility.

Performance and Scalability

For purposes here, I'm going to treat performance as the rate a which an actor can receive and process messages at a relatively small, fixed number of simultaneous actors.   This means that improving performance in largely a matter of reducing the time it takes from when a message is initially sent to when user-code within the actor begins processing the message, including minimizing any pause between when an actor finishes processing one message and is available to start processing the next.  For moderate numbers of actors, performance is often maximized by having one thread per actor, and having the actor block while waiting for a message.  Given enough actors, the memory requirements of using a thread for each actor will eventually cause more slowdown than cost of scheduling a new reaction for each message.  This is illustrated in Philipp Haller's paper, "Actors that Unify Threads and Events" in the following graph:

Note that the above graph covers a microbenchmark running a simple, non-memory intensive task, and that the thread line is not a measurement of thread-bound actors, but rather of a simple threaded implementation.  However, my own benchmarking has shown that receive-based (ones that block on a thread) compare to event-based actors in almost the same way as threads to event-based actors in the above graph.  Also, remember that given a real application where heap space is needed for things besides the stacks of thousands of threads the point where the JVM throws an OutOfMemoryError will be much farther to the left.  There are also more subtle issues.  One of my first experiences with the Scala Actors library was creating a deadlock.  I created more thread-bound actors than the scheduler wanted to create threads, and thus actors were stuck blocking on threads waiting for messages from an actor that hadn't started yet because there were no available threads.  In other words, blocking can lead to situations such as deadlock, starvation, and simply extreme forms of unfairness with respect to how much CPU time is allocated each actor.  These all go against highly desirable guarantees that a actor library should provide outside of extreme circumstances.

Ultimately event-based actors make the better model.  For one, part of the reason why event-based Scala Actors are so expensive is that they suspend by throwing an exception to return control from user code to the library.  While exceptions have been heavily optimized in the JVM, especially in recent versions, they are still substantially slower than normal return paths.  Scala Actors need to use exceptions to suspend is a consequence of their expressivity.  Basically, because the library as little or no knowledge of what an actor is doing within a reaction, it cannot rely on traditional returns without introducing special control structures (see reactWhile numbers in one of my previous blogs).  Lift Actors, on the other hand, have do not need to use exceptions for control flow because the message processing cycle is essentially fixed - user code cannot intersperse weird (or even not-so-weird) patterns within it, or mix in blocking receives with event-based ones.  Another potential optimization of event-based actors is to have them block if there are plenty of threads available, and then release it if the thread they are on is needed by the scheduler.  To my knowledge this optimization is not implemented anywhere, but I think it would be relatively straight forward.  The only problem is that the actor becomes more tightly bound to its scheduler.

Parting Thoughts

Ultimately, time and community willing, I'd like to evolve what is here, plus solid treatment of a lot of lower-level details, into a Scala Improvement Document (SID).  There are a lot of subtle trades involved, and I think producing a general-purpose actors library is at least an order-of-magnitude more difficult than producing a special-purpose one.  I also believe that if an actor implementation is part of the standard library, then it should provide the necessary extension points for when users need something special-purpose they can create it and still leverage components of the standard library and interoperate with other actors.  In order words, I think it should define both the interface portion of an API along with providing a solid implementation.  I don't think we'll even get their without a clear and common understanding of the various considerations involved.

Sphere: Related Content