Sunday, August 08, 2010

Improving Schedulers for High-level, Fine-grained Concurrency Frameworks

Quite a while ago, Greg Meredith made a comment that really made me stop and think, and that has been lingering in the back of my mind ever since:

Erik, Alex, John, et al, i'm loathe to hijack this thread -- which is a good one -- but the experience with the lock up described below is really just the tip of the iceberg. Unless specific scheduling contracts and semantics can be worked out, a pluggable scheduling architecture is just asking for disaster. Worse, it means that a whole host of compiler support that could be provided to apps that use actor-based concurrency is null and void because those solutions (be they model-checking or types-based) will critically depend on specific ranges of scheduling semantics. Actors and other higher level concurrency constructs are a major appeal of languages like scala. If they prove themselves to be "not ready for prime time" the impact on perception might not be contained to just that usage of the language. Best wishes, --greg
That wasn't the first time that Greg made a comment along those lines, and it certainly wasn't the last. At one point he offered to allow a few individuals look at some older code that he believed could help. Greg's a really smart guy. If he's worried, we should all be worried.

Empirical Evidence of the Problem

Contrary to what many may think this is hardly an academic problem. A significant portion of the issues people new to Scala Actors have trace to the fact that they expect the scheduler to be making guarantees that it simply does not make. Here's a sampling:

  1. Actors not creating enough threads
  2. Scala Actors different behavior on JRE 1.5 and 1.6
  3. Actor starvation problem (related to previous)
  4. Actor as new thread
There are others, including cases where people have both rightly and wrongly blamed the default scheduler for serious problems with their programs. Most of the above can be traced back to the fact that the default scheduler for Scala Actors uses a slightly modified version of the JSR166y ForkJoinPool, which has issues described below (source):
Like all lightweight-task frameworks, ForkJoin (FJ) does not explicitly cope with blocked IO: If a worker thread blocks on IO, then (1) it is not available to help process other tasks (2) Other worker threads waiting for the task to complete (i.e., to join it) may run out of work and waste CPU cycles. Neither of these issues completely eliminates potential use, but they do require a lot of explicit care. For example, you could place tasks that may experience sustained blockages in their own small ForkJoinPools. (The Fortress folks do something along these lines mapping fortress "fair" threads onto forkjoin.) You can also dynamically increase worker pool sizes (we have methods to do this) when blockages may occur. All in all though, the reason for the restrictions and advice are that we do not have good automated support for these kinds of cases, and don't yet know of the best practices, or whether it is a good idea at all.

The Scope of the Issue

The issue here is neither limited to Scala nor to actor based concurrency. The general consensus in the concurrency community is that locks and threads are not the right abstractions for concurrency in application code (or hardly any code, for that matter), but there isn't any consensus on what the right abstractions are, or if there is consensus it is that the right abstractions are problem specific. There's no one-size-fits-all concurrency abstraction.

The one trait that all high-level or higher-order concurrency abstractions have in common is that under the hood they rely on some sort of managed thread pool and/or scheduler. For purposes here, when I say "scheduler," I mean the layer with which a concurrency framework interacts directly and likely contains framework-specific logic. When I say "thread pool," I mean the layer that directly manages the threads and is concurrency framework agnostic and may even be shared by several schedulers (sharing will be problematic, but I think ultimately necessary). This isn't a hard line, and often times they may be lumped into one. However I think it's a useful dichotomy. I'm also assuming the scheduler and thread pool rely on cooperative thread sharing, and that preemptive multitasking is left to the virtual machine and/or operating system.

The point is, any concurrency abstraction where the user can submit arbitrary code to be executing can ultimately run into serious issues if that user code does something it does not expect. For example, the fork-join scheduler from JSR-166y that the Scala Actors library uses (I think Clojure uses it as well, but it may not) doesn't automatically enlarge its pool in the presence of unmanaged blocks (including normal blocking IO) or simple long-running computations. This results in the majority of the problems I previously cited, because blocking operations are extremely common tasks, and thus a very leaky abstraction.

Key Characteristics of a Good Scheduler

Unfortunately my command of type theory is rudimentary at best, so while Greg could probably describe a well-behaved scheduler using types, I'm going to have to resort to plain English:

  1. The user should not have to think about choosing a scheduler.
  2. The user should not have to think about the size of the thread pool managed by the scheduler.
  3. The user should be able to write code naturally without worrying about any assumptions the scheduler may make
  4. The user should not have to worry about starting or stopping the scheduler
  5. Multiple flavors of concurrency abstraction should be able to share the same thread pool.
  6. The scheduler should impose minimal overhead.
There's probably others, but I think this is a good start.

The Performance Problem

So why hasn't this problem already been solved? Perhaps it has been already and I just don't know about it. If someone has, please let me know. But as far as I can tell it's because there's a perceived nasty performance trade. Here's a quote from Philipp Haller:

The starvation detection mechanism in Scala 2.7 was too expensive to support in the new default scheduler. However, there is a starvation detection mechanism for thread-blocking methods of the `Actor` object (`receive` etc.).
Similar answers have been provided when people have questioned the wisdom of using the FJ framework over the classes included with the JDK. I've tested it a bit and it's true, the FJ framework does yield a significant improvement for certain classes of tasks over JDK classes (although I believe using exceptions for control flow imposes a far larger penalty). Basically it appears that the overhead associated with known solutions to the problem overcomes the introduced benefits of fine-grained concurrency,

ForkJoinPool in a nutshell

What makes ForkJoinPool so fast (other than highly optimized code) is that uses a two-level architecture for its task queues. There's one queue for the pool, and the one dequeue per thread. Most of the data associated with the dequeues is only updated from the owning thread so that tasks can be added and removed without synchronization and minimal volatile and CAS operations. The worker threads also steal work from one another in order to spread out the load. When a worker is deciding the next task to execute, it performs the following checks, using the first one to return a task:

  1. The thread's own local dequeue
  2. The dequeues of other threads in the pool, scanned in a psuedo-random pattern with guiding hints
  3. The common task queue
Threads are only added to the pool if a ManagedBlocker that the pool can see if used to block causing the target concurrency to not the be met. The default target concurrency for the ForkJoinPool is set to the number of available processors. In Scala Actors this is overridden to be twice the number of available processors. The idea behind this is simple: If you're keeping all your processors busy, then adding more threads will just slow you down. As we've seen, problems arise when a higher level of concurrency is needed due to unmanaged blocks, long running computations, or simply the required concurrency being inherently higher than the default concurrency. This happens because as long as a worker is busy or has something in its local dequeue, it won't look elsewhere for tasks. If all the workers are in this state, tasks can simply accumulate, resulting in starvation.

A Crack at Solving the Problem

As you might have guessed by now, I'm trying to solve this problem. Right now I wouldn't use the code for anything beyond testing the code, but despite the fact I'm still wrestling with the internals of ForkJoinPool I've experienced good results so far. My approach is simple: I added in a manager thread that monitors the changes in size of the various queues in the pool, and if they've been flushed since the last check, and grows the pool it tasks appear to be being starved. The overhead imposed on each worker is minimal, as it only has to update a single additional volatile field when it clears out its queue. The overhead of the manager thread is something, but in the benchmarking I've done so far it doesn't seem to add noticeable overhead. Ironically, Scala Actors already maintain a dedicated thread for the scheduler, its just that the heuristic check on the size of the pool were removed in 2.8 (although honestly they never worked that well).

I have two simple tests/micro-benchmarks using Scala Actors with a custom scheduler built on my ManagedForkJoinPool. The SleepyActor benchmark performs extremely poorly if the scheduler doesn't significantly grow the pool, because each actor sleeps on the thread. The PingPong benchmark spawns a bunch of pairs of actors that simply message each other back-and-forth. It is the type of use case where the ForkJoinScheduler shines, at least in terms of throughput. The results on my MacBook are as follows (these are NOT scientific):

BenchmarkDefault SchedulerManagedForkJoinSchedulerResizeableThreadPoolScheduler
SleepyActor92 sec30 secnot tested
PingPong59.49 sec47.17 sec66.02 sec
As you can see, performance actually improved with my scheduler. This is because the default scheduler for Scala Actors does not always use the "fast path" and put new tasks on the dequeue of the thread creating them. It only does it about half the time. So for the PingPong test you can see how the thread local queues impact performance.


It's too early to draw solid conclusions, but based on what I've done so far I think I can develop a solid heuristic for managing the thread pool size, and that the overhead will be negligible. The key is to not impose any more overhead on the worker threads, and to keep the computational overhead of the manager thread a low as possible. This means that the heuristic needs to be simple, and that the wait times between checks should be a long as possible, probably dynamically sized based on how smoothly the pool seems to be running.

What I need right now are more tests and benchmarks to cover "real world" scenarios. I also need to test on machines with more cores. My MacBook doesn't exactly present a powerful system. If anyone has suggestions, please post them!

Sphere: Related Content


bsdemon said...

Hmm... it seems best result we can achieve by making all I/O non-blocking, this is how it's done in Erlang BEAM and GHC runtime. All this things are satisifed in these runtimes.

PetrolHead said...

"and grows the pool it tasks appear to be being starved."

Of course you can't endlessly grow the pool and so for particularly "ill" user code you'll still hit a problem.

Erik Engbrecht said...

Yes, but primary the problem with making all I/O non-blocking is that it requires a significant paradigm shift. The problem with having a runtime with special support have to have a specialized runtime that won't run tons of existing code. A secondary problem with non-blocking IO, at least on the JVM, is that it is slower.

The problem you hit is that you run out of memory, just like what happens to all sorts of theoretically correct code when run on a machine with finite resources. The only substantial overhead associated with each thread is its stack (which is preallocated with the thread is created) and its task dequeue (which is likely small relative to the stack). So I don't think this is a special problem.

jherber said...

Erik, that's a great optimization on current scheduling.

Stepping back, right now we are asking schedulers to read minds. Longer term we could add typing at task level. This way blocking lO, short, and long running tasks are type checked to run on appropriate schedulers.

Tasks should also be composable, so that composing elements of the executing task are scheduled appropriately and optimally. At this point, we may as well throw in dependency or independence between composable elements, for further optimization.

The JVM would have to help with the last leg of optimization. If we could understand the amount of data moved between threads (cores) for a type of task, the cost of context switch on a type of task, and the cost graph between context changes and data passing of the underlying computer architecture. We might have a chance at making Scala's next generation of task scheduling libraries machine level optimal.

PetrolHead said...

"So I don't think this is a special problem."

No indeed and that's really the point - you're making things a little better maybe but there's still a bunch of things about the overall approach of blocking I/O and thread pools that aren't easily addressed.

So for example whilst you've done a better job of smartly allocating threads and perhaps scaling I/O code a little better in some cases the trade off is that you can falsely exhaust thread resources when the real problem is I/O bottleneck leading to errors that misdirect developers.

"A secondary problem with non-blocking IO, at least on the JVM, is that it is slower."

Non-blocking I/O might be slower but I/O is slow compared to CPU and thus burning up lots of threads and blocking them might work better for small loads but it doesn't work so well for large concurrent network loads. Concurrent I/O on threads doesn't work so well for transactional actions against disk either where batching of operations to minimise impact of disk syncs is required.

"but primary the problem with making all I/O non-blocking is that it requires a significant paradigm shift."

Equally not shifting paradigms leaves you to lots of dark corners and thread pooling strategies that can never quite cover them all off unless the developer lends a hand. Something which jherber hits on:

"Stepping back, right now we are asking schedulers to read minds."

Unknown said...

Good discussion, glad to see that others recognize this potentially dangerous performance issue for Actors.

Your improved FJ sounds like its doing what Haller/Ordesky originally described their scheduler to be like in the paper describing actors.

Here they say:

The basic idea is that actors provide the scheduler with life-beats during their
execution. That is, the send (!) and receive methods call a tick method of
the scheduler. The scheduler then looks up the worker thread which is currently
executing the corresponding actor, and updates its time stamp. When a new
receiver task item is submitted to the scheduler, it rst checks if all worker
threads are blocked. Worker threads with \recent" time stamps are assumed not
to be blocked. Only if all worker threads are assumed to be blocked (because
of old time stamps), a new worker thread is created.

The problem I see in any 'growing' implementation is if the code is not 'blocked', but rather busy with a CPU intensive task.

You end up creating MORE threads which would slow down performance due to the context switches.

Gene Tani said...

Clojure and fork-join: the work is in the "par" branch but I haven't found a detailed description of how it works.