[m-rev.] for post-commit review: Parallel runtime system improvments.

Paul Bone pbone at csse.unimelb.edu.au
Wed Apr 13 23:21:38 AEST 2011


For post commit review by Zoltan, although we've talked a lot about the design
of this change.


Improve work stealing.  Spark deques have been associated with contexts so far.
This is a problem for the following reasons:

    The work stealing code must take a lock to access the resizeable array of
    work stealing dequeues.  This adds global contention that can be avoided if
    this array has a fixed size.

    If a context is blocked on a future then that engine cannot execute the
    sparks from that context, instead it tries to find global work, this is
    more expensive than necessary.

    If there are a few dozen contexts then there may be just as many work
    stealing queues to take work from, the density of these queues will be
    higher if they are fewer.  Therefore work stealing will be more successful
    on average.

This change associates spark deques with Mercury Engines rather than Contexts
to avoid these problems.

This has invalidated some invariants that allowed the runtime system to make
some worth-while optimisations.  These optimisations have been maintained.
Mercury's idle loop has been reimplemented to allow for this.  This
re-implementation has allowed for a number of other improvements:

    Polling was used to check for new global sparks.  This has been removed and
    each engine now sleeps using it's own semaphore.

    Checks for work can be done in different orders depending on how an engine
    joins the idle loop.

    When global work becomes available a particular engine can be woken up
    rather than any arbitrary engine.  We take advantage of this when making
    contexts runnable, we try to schedule them on the engine that last executed
    them.

    When an engine is woken up it can be instructed with what it should do upon
    waking up.

    When a engine looks for a context to run, it will try to pick a context
    that was last executed on it.  This may avoid cache misses when the context
    begins to run.

In the future we should consider:
    Experiment with telling engines which context to run.

    Improve the selection of which engine work should be scheduled on to be
    hardware and memory-hierarchy aware.

Things that need doing next (probably next week):
    ./configure should check for POSIX semaphore support.

    Profiling times have been broken by this change, they will need fixing.

    The threadscope event long now breaks an invariants that the threadscope
    graphical tool requires.

    Semaphores are setup but never released, this is not a big problem but the
    manual page says that some implementations may leak resources.

runtime/mercury_context.h:
runtime/mercury_context.c:
    Remove the spark deque field from the MR_Context structure.

    Export the new array of spark deques so that other modules may fill in
    elements as engines are setup.

    Modify the resume_owner_thread field of the MR_Context structure, this was
    used to ensure that a context returning through C code would be resumed on
    the engine with the correct C stack and depth.  This field is now an engine
    id and has been renamed to resume_owner_engine, it is advisory unless
    resume_engine_required is also set.  This way it is used to advise which
    engine most recently executed this context and therefore may have a warm
    cache.

    Remove code that dynamically resized the array of spark deques.  Including
    the lock that protected against updating this array while it was being read
    from other thread.

    Introduce code that initialises the statically sized array of spark deques.

    Reimplement the idle loop.  This replaces MR_runnext and MR_do_runnext with
    MR_idle and MR_do_idle respectively.  There are also two new entry points
    into the idle loop.  Which one to use depends on the state of the engine.

    Introduce new mechanisms for waking a particular engine.  For example the
    engine that last executed a context that is now runnable.

    Change the algorithm for selecting which context to run, try to select
    contexts that where last used on the current engine to avoid cache misses.

    Use an engine's victim counter rather than a global victim counter when
    trying to steal work.

    Introduce some conditionally-compiled code that can be used to profile how
    quickly new contexts can be created.

    Rename MR_init_thread_stuff and MR_finalize_thread_stuff.  The term thread
    has been replaced with context since they're in mercury_context.c.  This
    allows the creation of a new function MR_init_thread_stuff() in
    mercury_thread.c I also found the mismatch between the function names and
    file name confusing.  Move some of the code from MR_init_context_stuff to
    the new MR_init_thread_stuff function where it belongs.

    Refactor the thread pinning code so that even when thread pinning is
    disabled it can be used to allocate each thread to a CPU but not actually
    pin them.

    Fix some whitespace errors.

runtime/mercury_thread.h:
runtime/mercury_thread.c:
    In MR_init_engine():
        Allocate an engine id for each engine.

        A number of arrays had one slot per engine and where setup using a
        lock.  Now engine ids are used to index each array and setup is done
        without a lock, each engine simply sets up its own slot.

        Setup the new per-engine work stealing deques.

    The MR_all_engine_bases array has been moved to this file.

    Implement a new MR_init_thread_stuff function which initialises some global
    variables and locks.  Some of MR_init_thread_stuff has been moved from
    mercury_context.c

    Pin threads as part of MR_init_thread, excluding the primordial thread
    which must be pinned before threadscope is initialised.

    Add functions for debugging the use of semaphores.

    Add corresponding macros that can be used to redirect semaphore calls to
    debugging functions as above.

    Improved thread debugging code, ensured that stderr is flushed after every
    use, and that logging is done after calls return as well as before they're
    called.

    Conform to changes in mercury_context.h

runtime/mercury_engine.h:
runtime/mercury_engine.c:
    Add spark deque and victim counter fields to the MercuryEngine structure.

    Make the MR_eng_id field of the MercuryEngine structure available in all
    thread safe grades, formerly it was used in only threadscope grades.

    Move the MR_all_engine_bases variable to mercury_thread.[ch]

    Put a reference to the engine's spark queue into the global array.  This is
    done here, so that it is after thread pinning because the original plan was
    to have this array sorted by CPU rather then engine - we may yet do this in
    the future.

    Initialise an engine's spark deque when an engine is initialised.

    Setup the engine specific threadscope data in mercury_thread.c

    Conform to changes in mercury_context.h

runtime/mercury_wrapper.c:
    The engine base array is no longer setup here, that code has been moved to
    mercury_thread.c

    Conform to changes in mercury_context.h and mercury_thread.h

runtime/mercury_wsdeque.h:
runtime/mercury_wsdeque.c:
    The original implementation allocated an array for a spark queue only if
    one wasn't already allocated, which could happen when a context was reused.
    Now that spark queues are associated with engines arrays are always
    allocated.

    Replaced two macros with a single macro since there's no-longer a
    distinction between global and local work queues, all work queues are
    local.

runtime/mercury_wsdeque.c:
runtime/mercury_wsdeque.h:
    Remove the --worksteal-max-attempts and --worksteal-sleep-msecs options as
    they are no-longer used.

runtime/mercury_threadscope.h:
runtime/mercury_threadscope.c:
    The MR_EngineId type has been moved to mercury_types.h

    Engine IDs are no-longer allocated here, this is done in mercury_thread.c

    The run spark and steal spark messages now write 0xFFFFFFFF for the context
    id if there is no current context.  Previously this would dereference a
    null pointer.

runtime/mercury_memory_zones.c:
    When checking for an existing memory zone check the free_zones_list
    variable before taking a lock.  This can prevent taking the lock in cases
    where there are no free zones.

    Introduce some conditionally-compiled code that can be used to profile how
    quickly new contexts can be created.

runtime/mercury_bootstrap.h:
    Remove macros that no-longer resolve to functions due to changes in the
    runtime system.

runtime/mercury_types.h:
    Move the MR_EngineId type from mercury_threadscope.h to mercury_types.h

runtime/mercury_grade.h:
    Introduce a parallel grade version number, this change brakes binary
    compatibility with existing parallel code.

runtime/mercury_backjump.c:
runtime/mercury_par_builtin.c:
runtime/mercury_mm_own_stacks.c:
library/stm_builtin.m:
library/thread.m:
library/thread.semaphore.m:
    Conform to changes in mercury_context.h.

library/io.m:
    Make this module compatible with MR_debug_threads.

doc/user_guide.texi
    Remove the documentation for the --worksteal-max-attempts and
    --worksteal-sleep-msecs options.  The documentation was already commented
    out.

Index: doc/user_guide.texi
===================================================================
RCS file: /home/mercury1/repository/mercury/doc/user_guide.texi,v
retrieving revision 1.625
diff -u -p -b -r1.625 user_guide.texi
--- doc/user_guide.texi	13 Apr 2011 06:29:53 -0000	1.625
+++ doc/user_guide.texi	13 Apr 2011 12:03:38 -0000
@@ -10214,18 +10214,6 @@ multiplied by the word size in bytes.
 @c Sets the size of the redzone on the trail to @var{size} kilobytes
 @c multiplied by the word size in bytes.
 
- at c @sp 1
- at c @item --worksteal-max-attempts @var{attempts}
- at c @findex --worksteal-max-attempts (runtime option)
- at c Tells idle Mercury engines to attempt to steal parallel conjuncts
- at c up to a maximum of @var{attempts} times before sleeping.
-
- at c @sp 1
- at c @item --worksteal-sleep-msecs @var{milliseconds}
- at c @findex --worksteal-sleep-msecs (runtime option)
- at c Sets the amount of time that an idle Mercury engine should sleep before
- at c attempting more work-stealing attempts.
-
 @sp 1
 @item -i @var{filename}
 @itemx --mdb-in @var{filename}
Index: library/io.m
===================================================================
RCS file: /home/mercury1/repository/mercury/library/io.m,v
retrieving revision 1.456
diff -u -p -b -r1.456 io.m
--- library/io.m	4 Apr 2011 07:10:39 -0000	1.456
+++ library/io.m	13 Apr 2011 12:03:39 -0000
@@ -2550,7 +2550,7 @@ io.check_err(Stream, Res, !IO) :-
     }
 
     ML_maybe_make_err_msg(RetVal != 0, errno, ""read failed: "",
-        MR_PROC_LABEL, MR_TRUE, RetStr);
+        ""io.ferror/5"", MR_TRUE, RetStr);
     MR_update_io(IO0, IO);
 ").
 
@@ -2635,7 +2635,8 @@ io.make_err_msg(Msg0, Msg, !IO) :-
     [will_not_call_mercury, promise_pure, tabled_for_io,
         does_not_affect_liveness, no_sharing],
 "
-    ML_maybe_make_err_msg(MR_TRUE, Error, Msg0, MR_PROC_LABEL, MR_FALSE, Msg);
+    ML_maybe_make_err_msg(MR_TRUE, Error, Msg0, ""io.make_err_msg/5"",
+        MR_FALSE, Msg);
     MR_update_io(IO0, IO);
 ").
 
@@ -2852,7 +2853,7 @@ io.file_modification_time(File, Result, 
         Status = 1;
     } else {
         ML_maybe_make_err_msg(MR_TRUE, errno, ""stat() failed: "",
-            MR_PROC_LABEL, MR_TRUE, Msg);
+            ""io.file_modification_time_2/6"", MR_TRUE, Msg);
         Status = 0;
     }
 #else
@@ -3937,7 +3938,7 @@ io.file_id(FileName, Result, !IO) :-
         Status = 1;
     } else {
         ML_maybe_make_err_msg(MR_TRUE, errno, ""stat() failed: "",
-            MR_PROC_LABEL, MR_TRUE, Msg);
+            ""io.file_id_2/6"", MR_TRUE, Msg);
         Status = 0;
     }
     MR_update_io(IO0, IO);
@@ -5071,7 +5072,7 @@ source_name(stderr) = "<standard error>"
     [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
         no_sharing],
 "
-    MR_LOCK(&ML_io_stream_db_lock, MR_PROC_LABEL);
+    MR_LOCK(&ML_io_stream_db_lock, ""io.lock_stream_db/2"");
     IO = IO0;
 ").
 
@@ -5084,7 +5085,7 @@ io.lock_stream_db(!IO).
     [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
         no_sharing],
 "
-    MR_UNLOCK(&ML_io_stream_db_lock, MR_PROC_LABEL);
+    MR_UNLOCK(&ML_io_stream_db_lock, ""io.unlock_stream_db/2"");
     IO = IO0;
 ").
 
@@ -9634,7 +9635,7 @@ io.close_binary_output(binary_output_str
     argv[3] = NULL;
 
     /* Protect `environ' from concurrent modifications. */
-    MR_OBTAIN_GLOBAL_LOCK(MR_PROC_LABEL);
+    MR_OBTAIN_GLOBAL_LOCK(""io.call_system_code/5"");
 
     /*
     ** See the comment at the head of the body of preceding foreign_decl
@@ -9647,13 +9648,14 @@ io.close_binary_output(binary_output_str
         err = posix_spawn(&pid, ""/bin/sh"", NULL, NULL, argv, environ);
     #endif
 
-    MR_RELEASE_GLOBAL_LOCK(MR_PROC_LABEL);
+    MR_RELEASE_GLOBAL_LOCK(""io.call_system_code/5"");
 
     if (err != 0) {
         /* Spawn failed. */
         Status = 127;
         ML_maybe_make_err_msg(MR_TRUE, errno,
-            ""error invoking system command: "", MR_PROC_LABEL, MR_TRUE, Msg);
+            ""error invoking system command: "", ""io.call_system_code/5"",
+            MR_TRUE, Msg);
     } else {
         /* Wait for the spawned process to exit. */
         do {
@@ -9662,8 +9664,8 @@ io.close_binary_output(binary_output_str
         if (err == -1) {
             Status = 127;
             ML_maybe_make_err_msg(MR_TRUE, errno,
-                ""error invoking system command: "", MR_PROC_LABEL, MR_TRUE,
-                Msg);
+                ""error invoking system command: "", ""io.call_system_code/5"",
+                MR_TRUE, Msg);
         } else {
             Status = st;
             Msg = MR_make_string_const("""");
@@ -9681,7 +9683,8 @@ io.close_binary_output(binary_output_str
         */
         Status = 127;
         ML_maybe_make_err_msg(MR_TRUE, errno,
-            ""error invoking system command: "", MR_PROC_LABEL, MR_TRUE, Msg);
+            ""error invoking system command: "", ""io.call_system_code/5"",
+            MR_TRUE, Msg);
     } else {
         Msg = MR_make_string_const("""");
     }
@@ -10210,7 +10213,7 @@ io.make_temp(Dir, Prefix, Name, !IO) :-
     fd = mkstemp(FileName);
     if (fd == -1) {
         ML_maybe_make_err_msg(MR_TRUE, errno,
-            ""error opening temporary file: "", MR_PROC_LABEL, MR_TRUE,
+            ""error opening temporary file: "", ""io.do_make_temp/8"", MR_TRUE,
             ErrorMessage);
         Error = -1;
     } else {
@@ -10218,7 +10221,7 @@ io.make_temp(Dir, Prefix, Name, !IO) :-
             err = close(fd);
         } while (err == -1 && MR_is_eintr(errno));
         ML_maybe_make_err_msg(err, errno,
-            ""error closing temporary file: "", MR_PROC_LABEL, MR_TRUE,
+            ""error closing temporary file: "", ""io.do_make_temp/8"", MR_TRUE,
             ErrorMessage);
         Error = err;
     }
@@ -10261,16 +10264,16 @@ io.make_temp(Dir, Prefix, Name, !IO) :-
         num_tries < ML_MAX_TEMPNAME_TRIES);
     if (fd == -1) {
         ML_maybe_make_err_msg(MR_TRUE, errno,
-            ""error opening temporary file: "", MR_PROC_LABEL, MR_TRUE,
-            ErrorMessage);
+            ""error opening temporary file: "", ""io.do_make_temp/8"",
+            MR_TRUE, ErrorMessage);
         Error = -1;
     }  else {
         do {
             err = close(fd);
         } while (err == -1 && MR_is_eintr(errno));
         ML_maybe_make_err_msg(err, errno,
-            ""error closing temporary file: "", MR_PROC_LABEL, MR_TRUE,
-            ErrorMessage);
+            ""error closing temporary file: "", ""io.do_make_temp/8"",
+            MR_TRUE, ErrorMessage);
         Error = err;
     }
 #endif
@@ -10565,7 +10568,7 @@ io.remove_file(FileName, Result, !IO) :-
 "{
     RetVal = remove(FileName);
     ML_maybe_make_err_msg(RetVal != 0, errno, ""remove failed: "",
-        MR_PROC_LABEL, MR_TRUE, RetStr);
+        ""io.remove_file_2/5"", MR_TRUE, RetStr);
     MR_update_io(IO0, IO);
 }").
 
@@ -10704,7 +10707,7 @@ io.rename_file(OldFileName, NewFileName,
 "{
     RetVal = rename(OldFileName, NewFileName);
     ML_maybe_make_err_msg(RetVal != 0, errno, ""rename failed: "",
-        MR_PROC_LABEL, MR_TRUE, RetStr);
+        ""io.rename_file_2/6"", MR_TRUE, RetStr);
     MR_update_io(IO0, IO);
 }").
 
Index: library/stm_builtin.m
===================================================================
RCS file: /home/mercury1/repository/mercury/library/stm_builtin.m,v
retrieving revision 1.17
diff -u -p -b -r1.17 stm_builtin.m
--- library/stm_builtin.m	19 Nov 2009 00:08:34 -0000	1.17
+++ library/stm_builtin.m	13 Apr 2011 12:03:38 -0000
@@ -395,10 +395,10 @@
     MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume =
         MR_ENTRY(mercury__stm_builtin__block_thread_resume);
 
-    MR_ENGINE(MR_eng_this_context) = NULL;
     MR_UNLOCK(&MR_STM_lock, ""MR_STM_block_thread"");
-    MR_runnext();
 
+    MR_ENGINE(MR_eng_this_context) = NULL;
+    MR_idle();
 #endif
 ").
 
Index: library/thread.m
===================================================================
RCS file: /home/mercury1/repository/mercury/library/thread.m,v
retrieving revision 1.23
diff -u -p -b -r1.23 thread.m
--- library/thread.m	9 Nov 2010 03:46:32 -0000	1.23
+++ library/thread.m	13 Apr 2011 12:03:38 -0000
@@ -190,7 +190,7 @@
   #endif
     MR_schedule_context(MR_ENGINE(MR_eng_this_context));
     MR_ENGINE(MR_eng_this_context) = NULL;
-    MR_runnext();
+    MR_idle();
 
   #ifndef ML_THREAD_AVOID_LABEL_ADDRS
     yield_skip_to_the_end:
@@ -275,7 +275,7 @@ INIT mercury_sys_init_thread_modules
 
         MR_destroy_context(MR_ENGINE(MR_eng_this_context));
         MR_ENGINE(MR_eng_this_context) = NULL;
-        MR_runnext();
+        MR_idle();
     }
 
     MR_define_entry(mercury__thread__yield_resume);
Index: library/thread.semaphore.m
===================================================================
RCS file: /home/mercury1/repository/mercury/library/thread.semaphore.m,v
retrieving revision 1.20
diff -u -p -b -r1.20 thread.semaphore.m
--- library/thread.semaphore.m	30 Sep 2010 07:23:33 -0000	1.20
+++ library/thread.semaphore.m	13 Apr 2011 12:03:38 -0000
@@ -218,7 +218,7 @@ ML_finalize_semaphore(void *obj, void *c
         MR_schedule_context(MR_ENGINE(MR_eng_this_context));
 
         MR_ENGINE(MR_eng_this_context) = NULL;
-        MR_runnext();
+        MR_idle();
 
       #ifndef ML_THREAD_AVOID_LABEL_ADDRS
         signal_skip_to_the_end_1: ;
@@ -239,7 +239,7 @@ ML_finalize_semaphore(void *obj, void *c
         MR_schedule_context(MR_ENGINE(MR_eng_this_context));
 
         MR_ENGINE(MR_eng_this_context) = NULL;
-        MR_runnext();
+        MR_idle();
 
       #ifndef ML_THREAD_AVOID_LABEL_ADDRS
         signal_skip_to_the_end_2: ;
@@ -317,7 +317,7 @@ ML_finalize_semaphore(void *obj, void *c
 
         /* Make the current engine do something else. */
         MR_ENGINE(MR_eng_this_context) = NULL;
-        MR_runnext();
+        MR_idle();
 
       #ifndef ML_THREAD_AVOID_LABEL_ADDRS
         wait_skip_to_the_end: ;
Index: runtime/mercury_backjump.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_backjump.h,v
retrieving revision 1.1
diff -u -p -b -r1.1 mercury_backjump.h
--- runtime/mercury_backjump.h	19 Mar 2008 05:30:00 -0000	1.1
+++ runtime/mercury_backjump.h	13 Apr 2011 12:03:38 -0000
@@ -60,7 +60,7 @@ typedef struct MR_BackJumpHandler_Struct
         ** MR_backjump_next_choice_id_key stores a key that can be used
         ** to get the next available backjump choice id for the current thread.
         ** NOTE: changes here may need to be reflected in the function
-        ** MR_init_thread_stuff() in mercury_context.c.
+        ** MR_init_context_stuff() in mercury_context.c.
         */
         extern MercuryThreadKey MR_backjump_handler_key;
         extern MercuryThreadKey MR_backjump_next_choice_id_key;
Index: runtime/mercury_bootstrap.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_bootstrap.h,v
retrieving revision 1.43
diff -u -p -b -r1.43 mercury_bootstrap.h
--- runtime/mercury_bootstrap.h	12 Jul 2010 14:25:59 -0000	1.43
+++ runtime/mercury_bootstrap.h	13 Apr 2011 12:03:38 -0000
@@ -217,9 +217,7 @@ typedef MR_Bool			Bool;
 #define	init_context(context)		MR_init_context(context)
 #define	create_context()		MR_create_context()
 #define	destroy_context(context)	MR_destroy_context(context)
-#define	init_thread_stuff(context)	MR_init_thread_stuff(context)
 #define	flounder()			MR_flounder()
-#define	runnext()			MR_runnext()
 #define	schedule(context)		MR_schedule(context)
 #define	set_min_heap_reclamation_point(c) MR_set_min_heap_reclamation_point(c)
 #define	save_hp_in_context(context)	MR_save_hp_in_context(context)
Index: runtime/mercury_context.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.c,v
retrieving revision 1.89
diff -u -p -b -r1.89 mercury_context.c
--- runtime/mercury_context.c	13 Apr 2011 06:29:54 -0000	1.89
+++ runtime/mercury_context.c	13 Apr 2011 12:49:25 -0000
@@ -19,6 +19,9 @@ ENDINIT
 #ifdef MR_THREAD_SAFE
   #include "mercury_thread.h"
   #include "mercury_stm.h"
+  #ifndef MR_HIGHLEVEL_CODE
+    #include <semaphore.h>
+  #endif
 #endif
 #ifdef MR_CAN_DO_PENDING_IO
   #include <sys/types.h>	/* for fd_set */
@@ -63,13 +66,62 @@ MR_init_context_maybe_generator(MR_Conte
 
 #ifdef  MR_LL_PARALLEL_CONJ
 static void
-MR_add_spark_deque(MR_SparkDeque *sd);
-static void
-MR_delete_spark_deque(const MR_SparkDeque *sd);
-static void
 MR_milliseconds_from_now(struct timespec *timeout, unsigned int msecs);
 #endif
 
+#ifdef MR_THREAD_SAFE
+
+/*
+** These states are bitfields so they can be combined when passed to
+** try_wake_engine.  The definitions of the starts are:
+**
+** working      the engine has work to do and is working on it.
+**
+** sleeping     The engine has no work to do and is sleeping on it's sleep
+**              semaphore.
+**
+** idle         The engine has recently finished it's work and is looking for
+**              more work before it goes to sleep.  This state is useful when
+**              there are no sleeping engines but there are idle engines,
+**              signalling an idle engine will prevent it from sleeping and
+**              allow it to re-check the work queues.
+**
+** woken        The engine was either sleeping or idle and has been signaled
+**              and possibly been given work to do.  DO NOT signal these
+**              engines again doing so may leak work.
+*/
+#define ENGINE_STATE_WORKING    0x0001
+#define ENGINE_STATE_SLEEPING   0x0002
+#define ENGINE_STATE_IDLE       0x0004
+#define ENGINE_STATE_WOKEN      0x0008
+#define ENGINE_STATE_ALL        0xFFFF
+
+struct engine_sleep_sync_i {
+    sem_t                               es_sleep_semaphore;
+    sem_t                               es_wake_semaphore;
+    volatile unsigned                   es_state;
+    volatile unsigned                   es_action;
+    union MR_engine_wake_action_data    es_action_data;
+};
+
+#define CACHE_LINE_SIZE 64
+#define PAD_CACHE_LINE(s) \
+    ((CACHE_LINE_SIZE) > (s) ? (CACHE_LINE_SIZE) - (s) : 0)
+
+typedef struct {
+    struct engine_sleep_sync_i d;
+    /*
+    ** Padding ensures that engine sleep synchronisation data for different
+    ** engines doesn't share cache lines.
+    */
+    char padding[PAD_CACHE_LINE(sizeof(struct engine_sleep_sync_i))];
+} engine_sleep_sync;
+
+static
+engine_sleep_sync *engine_sleep_sync_data;
+#endif /* MR_THREAD_SAFE */
+
+
 /*
 ** The run queue is protected with MR_runqueue_lock and signalled with
 ** MR_runqueue_cond.
@@ -78,7 +130,6 @@ MR_Context              *MR_runqueue_hea
 MR_Context              *MR_runqueue_tail;
 #ifdef  MR_THREAD_SAFE
   MercuryLock           MR_runqueue_lock;
-  MercuryCond           MR_runqueue_cond;
 #endif
 
 MR_PendingContext       *MR_pending_contexts;
@@ -113,14 +164,13 @@ static MR_Integer       MR_profile_paral
 /*
 ** Local variables for thread pinning.
 */
-#if defined(MR_LL_PARALLEL_CONJ) && defined(MR_HAVE_SCHED_SETAFFINITY)
+#ifdef MR_LL_PARALLEL_CONJ
 static MercuryLock      MR_next_cpu_lock;
 MR_bool                 MR_thread_pinning_configured = MR_TRUE;
 MR_bool                 MR_thread_pinning_in_use;
 static MR_Unsigned      MR_next_cpu = 0;
-  #ifdef  MR_HAVE_SCHED_GETCPU
-static MR_Integer       MR_primordial_thread_cpu = -1;
-  #endif
+/* This is initialised the first the MR_pin_primordial_thread() is called */
+MR_Unsigned             MR_primordial_thread_cpu;
 #endif
 
 #if defined(MR_LL_PARALLEL_CONJ) && \
@@ -156,17 +206,31 @@ static MR_Context       *free_small_cont
 MR_Integer volatile         MR_num_idle_engines = 0;
 MR_Unsigned volatile        MR_num_exited_engines = 0;
 static MR_Integer volatile  MR_num_outstanding_contexts = 0;
+static sem_t                shutdown_semaphore;
 
 static MercuryLock MR_par_cond_stats_lock;
-static MercuryLock      spark_deques_lock;
-static MR_SparkDeque    **MR_spark_deques = NULL;
-static MR_Integer       MR_max_spark_deques = 0;
-static MR_Integer       MR_victim_counter = 0;
-
+/*
+** The spark deques are kept in engine id order.
+**
+** This array will contain MR_num_threads pointers to deques.
+*/
+MR_SparkDeque           **MR_spark_deques = NULL;
 #endif
 
 /*---------------------------------------------------------------------------*/
 
+#ifdef MR_LL_PARALLEL_CONJ
+/*
+** Try to wake up a sleeping message and tell it to do action.  The engine is
+** only woken if the engine is in one of the states in the bitfield states.  If
+** the engine is woekn the result of this function is MR_TRUE, otherwise it's
+** MR_FALSE.
+*/
+static MR_bool
+try_wake_engine(MR_EngineId engine_id, int action,
+    union MR_engine_wake_action_data *action_data, unsigned states);
+#endif
+
 /*
 ** Write out the profiling data that we collect during execution.
 */
@@ -181,43 +245,33 @@ MR_do_pin_thread(int cpu);
 /*---------------------------------------------------------------------------*/
 
 void
-MR_init_thread_stuff(void)
+MR_init_context_stuff(void)
 {
+#ifdef MR_LL_PARALLEL_CONJ
+    unsigned i;
+#endif
+
 #ifdef  MR_THREAD_SAFE
 
     pthread_mutex_init(&MR_runqueue_lock, MR_MUTEX_ATTR);
-    pthread_cond_init(&MR_runqueue_cond, MR_COND_ATTR);
     pthread_mutex_init(&free_context_list_lock, MR_MUTEX_ATTR);
-    pthread_mutex_init(&MR_global_lock, MR_MUTEX_ATTR);
     pthread_mutex_init(&MR_pending_contexts_lock, MR_MUTEX_ATTR);
   #ifdef MR_LL_PARALLEL_CONJ
-    pthread_mutex_init(&spark_deques_lock, MR_MUTEX_ATTR);
     #ifdef MR_HAVE_SCHED_SETAFFINITY
     pthread_mutex_init(&MR_next_cpu_lock, MR_MUTEX_ATTR);
     #endif
     #ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
     pthread_mutex_init(&MR_par_cond_stats_lock, MR_MUTEX_ATTR);
     #endif
+    sem_init(&shutdown_semaphore, 0, 0);
   #endif
     pthread_mutex_init(&MR_STM_lock, MR_MUTEX_ATTR);
-  #ifndef MR_THREAD_LOCAL_STORAGE
-    MR_KEY_CREATE(&MR_engine_base_key, NULL);
-  #endif
-    MR_KEY_CREATE(&MR_exception_handler_key, NULL);
 
   #ifdef MR_HIGHLEVEL_CODE
     MR_KEY_CREATE(&MR_backjump_handler_key, NULL);
     MR_KEY_CREATE(&MR_backjump_next_choice_id_key, (void *)0);
   #endif
 
-    /* These are actually in mercury_thread.c. */
-    pthread_mutex_init(&MR_thread_barrier_lock, MR_MUTEX_ATTR);
-  #ifdef MR_HIGHLEVEL_CODE
-    pthread_cond_init(&MR_thread_barrier_cond, MR_COND_ATTR);
-  #else
-    pthread_mutex_init(&MR_init_engine_array_lock, MR_MUTEX_ATTR);
-  #endif
-
     /*
     ** If MR_num_threads is unset, configure it to match number of processors
     ** on the system. If we do this, then we prepare to set processor
@@ -247,6 +301,21 @@ MR_init_thread_stuff(void)
   #ifdef MR_LL_PARALLEL_CONJ
     MR_granularity_wsdeque_length =
         MR_granularity_wsdeque_length_factor * MR_num_threads;
+
+    MR_spark_deques = MR_GC_NEW_ARRAY(MR_SparkDeque*, MR_num_threads);
+    engine_sleep_sync_data = MR_GC_NEW_ARRAY(engine_sleep_sync, MR_num_threads);
+    for (i = 0; i < MR_num_threads; i++) {
+        MR_spark_deques[i] = NULL;
+
+        sem_init(&(engine_sleep_sync_data[i].d.es_sleep_semaphore), 0, 0);
+        sem_init(&(engine_sleep_sync_data[i].d.es_wake_semaphore), 0, 1);
+        /*
+        ** All engines are initially working (because telling them to wake up
+        ** before they're started would be useless.
+        */
+        engine_sleep_sync_data[i].d.es_state = ENGINE_STATE_WORKING;
+        engine_sleep_sync_data[i].d.es_action = MR_ENGINE_ACTION_NONE;
+    }
   #endif
 #endif /* MR_THREAD_SAFE */
 }
@@ -255,51 +324,75 @@ MR_init_thread_stuff(void)
 ** Pin the primordial thread first to the CPU it is currently using (where
 ** support is available).
 */
-
-void
+#if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
+unsigned
 MR_pin_primordial_thread(void)
 {
-#if defined(MR_LL_PARALLEL_CONJ) && defined(MR_HAVE_SCHED_SETAFFINITY)
-  #ifdef MR_HAVE_SCHED_GETCPU
+    unsigned    cpu;
+    int         temp;
+
     /*
     ** We don't need locking to pin the primordial thread as it is called
     ** before any other threads exist.
     */
+    /*
+    ** We go through the motions of thread pinning even when thread pinning is
+    ** not supported as the allocation of CPUs to threads may be used later.
+    */
+  #ifdef MR_HAVE_SCHED_GETCPU
+    temp = sched_getcpu();
+    if (temp == -1) {
+        MR_primordial_thread_cpu = 0;
+    #ifdef MR_HAVE_SCHED_SET_AFFINITY
     if (MR_thread_pinning_configured && MR_thread_pinning_in_use) {
-        MR_primordial_thread_cpu = sched_getcpu();
-        if (MR_primordial_thread_cpu == -1) {
             perror("Warning: unable to determine the current CPU for "
                 "the primordial thread: ");
-        } else {
-            MR_do_pin_thread(MR_primordial_thread_cpu);
-        }
     }
-    if (MR_primordial_thread_cpu == -1) {
-        MR_pin_thread();
+    #endif
+    } else {
+        MR_primordial_thread_cpu = temp;
     }
   #else
-    MR_pin_thread();
+    MR_primordial_thread_cpu = 0;
   #endif
-#endif
+  #ifdef MR_HAVE_SCHED_SET_AFFINITY
+    if (MR_thread_pinning) {
+        MR_do_pin_thread(MR_primordial_thread_cpu);
+    }
+  #endif
+    return MR_primordial_thread_cpu;
 }
+#endif /* defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ) */
 
-void
+#if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
+unsigned
 MR_pin_thread(void)
 {
-#if defined(MR_LL_PARALLEL_CONJ) && defined(MR_HAVE_SCHED_SETAFFINITY)
+    unsigned cpu;
+
+    /*
+    ** We go through the motions of thread pinning even when thread pinning is
+    ** not supported as the allocation of CPUs to threads may be used later.
+    */
     MR_LOCK(&MR_next_cpu_lock, "MR_pin_thread");
-    if (MR_thread_pinning_configured && MR_thread_pinning_in_use) {
-#if defined(MR_HAVE_SCHED_GETCPU)
         if (MR_next_cpu == MR_primordial_thread_cpu) {
+        /*
+        ** Skip the CPU that the primordial thread was pinned on.
+        */
             MR_next_cpu++;
         }
-#endif
-        MR_do_pin_thread(MR_next_cpu);
-        MR_next_cpu++;
-    }
+    cpu = MR_next_cpu++;
     MR_UNLOCK(&MR_next_cpu_lock, "MR_pin_thread");
+
+#ifdef MR_HAVE_SCHED_SETAFFINITY
+    if (MR_thread_pinning_configured && MR_thread_pinning_in_use) {
+        MR_do_pin_thread(cpu);
+    }
 #endif
+
+    return cpu;
 }
+#endif /* defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ) */
 
 #if defined(MR_LL_PARALLEL_CONJ) && defined(MR_HAVE_SCHED_SETAFFINITY)
 static void
@@ -327,16 +420,14 @@ MR_do_pin_thread(int cpu)
 #endif
 
 void
-MR_finalize_thread_stuff(void)
+MR_finalize_context_stuff(void)
 {
 #ifdef MR_THREAD_SAFE
     pthread_mutex_destroy(&MR_runqueue_lock);
-    pthread_cond_destroy(&MR_runqueue_cond);
     pthread_mutex_destroy(&free_context_list_lock);
-#endif
-
-#ifdef  MR_LL_PARALLEL_CONJ
-    pthread_mutex_destroy(&spark_deques_lock);
+  #ifdef MR_LL_PARALLEL_CONJ
+    sem_destroy(&shutdown_semaphore);
+  #endif
 #endif
 
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
@@ -487,7 +578,8 @@ MR_init_context_maybe_generator(MR_Conte
     c->MR_ctxt_next = NULL;
     c->MR_ctxt_resume = NULL;
 #ifdef  MR_THREAD_SAFE
-    c->MR_ctxt_resume_owner_thread = MR_null_thread();
+    c->MR_ctxt_resume_owner_engine = 0;
+    c->MR_ctxt_resume_engine_required = MR_FALSE;
     c->MR_ctxt_resume_c_depth = 0;
     c->MR_ctxt_saved_owners = NULL;
 #endif
@@ -512,6 +604,9 @@ MR_init_context_maybe_generator(MR_Conte
 #endif
     }
 
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+    MR_debug_log_message("Allocating det stack");
+#endif
     if (c->MR_ctxt_detstack_zone == NULL) {
         if (gen != NULL) {
             c->MR_ctxt_detstack_zone = MR_create_or_reuse_zone("gen_detstack",
@@ -531,9 +626,15 @@ MR_init_context_maybe_generator(MR_Conte
             MR_fatal_error("MR_init_context_maybe_generator: prev det stack");
         }
     }
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+    MR_debug_log_message("done");
+#endif
     c->MR_ctxt_prev_detstack_zones = NULL;
     c->MR_ctxt_sp = c->MR_ctxt_detstack_zone->MR_zone_min;
 
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+    MR_debug_log_message("Allocating nondet stack");
+#endif
     if (c->MR_ctxt_nondetstack_zone == NULL) {
         if (gen != NULL) {
             c->MR_ctxt_nondetstack_zone = MR_create_or_reuse_zone("gen_nondetstack",
@@ -554,6 +655,9 @@ MR_init_context_maybe_generator(MR_Conte
                 "MR_init_context_maybe_generator: prev nondet stack");
         }
     }
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+    MR_debug_log_message("done");
+#endif
     c->MR_ctxt_prev_nondetstack_zones = NULL;
     /*
     ** Note that maxfr and curfr point to the last word in the frame,
@@ -606,9 +710,6 @@ MR_init_context_maybe_generator(MR_Conte
 
   #ifdef MR_LL_PARALLEL_CONJ
     c->MR_ctxt_parent_sp = NULL;
-    MR_init_wsdeque(&c->MR_ctxt_spark_deque,
-        MR_INITIAL_LOCAL_SPARK_DEQUE_SIZE);
-    MR_add_spark_deque(&c->MR_ctxt_spark_deque);
   #endif /* MR_LL_PARALLEL_CONJ */
 
 #endif /* !MR_HIGHLEVEL_CODE */
@@ -707,9 +808,6 @@ MR_create_context(const char *id, MR_Con
         c->MR_ctxt_detstack_zone = NULL;
         c->MR_ctxt_nondetstack_zone = NULL;
 #endif
-#ifdef MR_LL_PARALLEL_CONJ
-        c->MR_ctxt_spark_deque.MR_sd_active_array = NULL;
-#endif
 #ifdef MR_USE_TRAIL
         c->MR_ctxt_trail_zone = NULL;
 #endif
@@ -718,6 +816,9 @@ MR_create_context(const char *id, MR_Con
     c->MR_ctxt_num_id = allocate_context_id();
 #endif
 
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+    MR_debug_log_message("Calling MR_init_context_maybe_generator");
+#endif
     MR_init_context_maybe_generator(c, id, gen);
     return c;
 }
@@ -742,6 +843,14 @@ MR_destroy_context(MR_Context *c)
     ** Save the context first, even though we're not saving a computation
     ** that's in progress we are saving some bookkeeping information.
     */
+    /*
+    ** TODO: When retrieving a context from the cached contexts, try to
+    ** retrieve one with a matching engine ID, or give each engine a local
+    ** cache of spare contexts.
+    */
+#ifdef MR_LL_PARALLEL_CONJ
+    c->MR_ctxt_resume_owner_engine = MR_ENGINE(MR_eng_id);
+#endif
     MR_save_context(c);
 
     /* XXX not sure if this is an overall win yet */
@@ -755,7 +864,6 @@ MR_destroy_context(MR_Context *c)
 
 #ifdef MR_LL_PARALLEL_CONJ
     MR_atomic_dec_int(&MR_num_outstanding_contexts);
-    MR_delete_spark_deque(&c->MR_ctxt_spark_deque);
 #endif
 
     MR_LOCK(&free_context_list_lock, "destroy_context");
@@ -793,133 +901,137 @@ allocate_context_id(void) {
 
 #ifdef MR_LL_PARALLEL_CONJ
 
-static void
-MR_add_spark_deque(MR_SparkDeque *sd)
-{
-    int slot;
-
-    MR_LOCK(&spark_deques_lock, "create_spark_deque");
-
-    for (slot = 0; slot < MR_max_spark_deques; slot++) {
-        if (MR_spark_deques[slot] == NULL) {
-            break;
-        }
-    }
-
-    if (slot == MR_max_spark_deques) {
-        if (MR_max_spark_deques == 0) {
-            MR_max_spark_deques = 1;
-        } else if (MR_max_spark_deques < 32) {
-            MR_max_spark_deques *= 2;
-        } else {
-            MR_max_spark_deques += 16;
-        }
-        MR_spark_deques = MR_GC_RESIZE_ARRAY(MR_spark_deques,
-            MR_SparkDeque *, MR_max_spark_deques);
-    }
-
-    MR_spark_deques[slot] = sd;
-
-    MR_UNLOCK(&spark_deques_lock, "create_spark_deque");
-}
-
-static void
-MR_delete_spark_deque(const MR_SparkDeque *sd)
-{
-    int i;
-
-    MR_LOCK(&spark_deques_lock, "delete_spark_deque");
-
-    for (i = 0; i < MR_max_spark_deques; i++) {
-        if (MR_spark_deques[i] == sd) {
-            MR_spark_deques[i] = NULL;
-            break;
-        }
-    }
-
-    MR_UNLOCK(&spark_deques_lock, "delete_spark_deque");
-}
-
 /* Search for a ready context which we can handle. */
 static MR_Context *
-MR_find_ready_context(MercuryThread thd, MR_Unsigned depth)
+MR_find_ready_context(void)
 {
     MR_Context  *cur;
     MR_Context  *prev;
+    MR_Context  *preferred_context;
+    MR_Context  *preferred_context_prev;
+    MR_EngineId engine_id = MR_ENGINE(MR_eng_id);
+    MR_Unsigned depth = MR_ENGINE(MR_eng_c_depth);
 
-    cur = MR_runqueue_head;
     /* XXX check pending io */
+
+    /*
+    ** Give preference to contexts as follows:
+    **
+    **  A context that must be run on this engine.
+    **  A context that prefers to be run on this engine.
+    **  Any runnable context that may be ran on this engine.
+    **
+    ** TODO: There are other scheduling decisions we should test, such as
+    ** running older versus younger contexts, or more recently stopped/runnable
+    ** contexts.
+    */
+    cur = MR_runqueue_head;
     prev = NULL;
+    preferred_context = NULL;
+    preferred_context_prev = NULL;
     while (cur != NULL) {
-        if (MR_thread_equal(cur->MR_ctxt_resume_owner_thread, thd) &&
-            cur->MR_ctxt_resume_c_depth == depth)
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Eng: %d, c_depth: %d, Considering context %p\n",
+                MR_SELF_THREAD_ID, engine_id, depth, cur);
+        }
+#endif
+        if (cur->MR_ctxt_resume_engine_required == MR_TRUE) {
+#ifdef MR_DEBUG_THREADS
+            if (MR_debug_threads) {
+                fprintf(stderr, "%ld Context requires engine %d and c_depth %d\n",
+                    MR_SELF_THREAD_ID, cur->MR_ctxt_resume_owner_engine,
+                    cur->MR_ctxt_resume_c_depth);
+            }
+#endif
+            if ((cur->MR_ctxt_resume_owner_engine == engine_id) &&
+                (cur->MR_ctxt_resume_c_depth == depth))
         {
-            cur->MR_ctxt_resume_owner_thread = MR_null_thread();
-            cur->MR_ctxt_resume_c_depth = 0;
+                preferred_context = cur;
+                preferred_context_prev = prev;
+                cur->MR_ctxt_resume_engine_required = MR_FALSE;
+                /*
+                ** This is the best thread to resume.
+                */
             break;
         }
-
-        if (MR_thread_equal(cur->MR_ctxt_resume_owner_thread, MR_null_thread())) {
-            break;
+        } else {
+#ifdef MR_DEBUG_THREADS
+            if (MR_debug_threads) {
+                fprintf(stderr, "%ld Context prefers engine %d\n",
+                    MR_SELF_THREAD_ID, cur->MR_ctxt_resume_owner_engine);
+            }
+#endif
+            if (cur->MR_ctxt_resume_owner_engine == engine_id) {
+                /*
+                ** This context prefers to be ran on this engine.
+                */
+                preferred_context = cur;
+                preferred_context_prev = prev;
+            } else if (preferred_context == NULL) {
+                /*
+                ** There is no preferred context yet, and this context is okay.
+                */
+                preferred_context = cur;
+                preferred_context_prev = prev;
+            }
         }
 
         prev = cur;
         cur = cur->MR_ctxt_next;
     }
 
-    if (cur != NULL) {
-        if (prev != NULL) {
-            prev->MR_ctxt_next = cur->MR_ctxt_next;
+    if (preferred_context != NULL) {
+        if (preferred_context_prev != NULL) {
+            preferred_context_prev->MR_ctxt_next =
+                preferred_context->MR_ctxt_next;
         } else {
-            MR_runqueue_head = cur->MR_ctxt_next;
+            MR_runqueue_head = preferred_context->MR_ctxt_next;
+        }
+        if (MR_runqueue_tail == preferred_context) {
+            MR_runqueue_tail = preferred_context_prev;
+        }
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Will run context %p\n",
+                MR_SELF_THREAD_ID, preferred_context);
         }
-        if (MR_runqueue_tail == cur) {
-            MR_runqueue_tail = prev;
+#endif
+    } else {
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld No suitable context to run\n",
+                MR_SELF_THREAD_ID);
         }
+#endif
     }
 
-    return cur;
+    return preferred_context;
 }
 
 static MR_bool
 MR_attempt_steal_spark(MR_Spark *spark)
 {
-    int             max_attempts;
-    int             attempt;
+    int             i;
+    int             offset;
     MR_SparkDeque   *victim;
-    int             steal_top;
+    int             result = MR_FALSE;
 
-    /*
-    ** Protect against concurrent updates of MR_spark_deques and
-    ** MR_num_spark_deques. This allows only one thread to try to steal
-    ** work at any time, which may be a good thing as it limits the
-    ** amount of wasted effort.
-    */
-    MR_LOCK(&spark_deques_lock, "attempt_steal_spark");
-
-    if (MR_max_spark_deques < MR_worksteal_max_attempts) {
-        max_attempts = MR_max_spark_deques;
-    } else {
-        max_attempts = MR_worksteal_max_attempts;
-    }
+    offset = MR_ENGINE(MR_eng_victim_counter);
 
-    for (attempt = 0; attempt < max_attempts; attempt++) {
-        MR_victim_counter++;
-        victim = MR_spark_deques[MR_victim_counter % MR_max_spark_deques];
+    for (i = 0; i < MR_num_threads; i++) {
+        victim = MR_spark_deques[(i + offset) % MR_num_threads];
         if (victim != NULL) {
-            steal_top = MR_wsdeque_steal_top(victim, spark);
-            if (steal_top == 1) {
+            result = (MR_wsdeque_steal_top(victim, spark)) == 1;
+            if (result) {
                 /* Steal successful. */
-                MR_UNLOCK(&spark_deques_lock, "attempt_steal_spark");
-                return MR_TRUE;
+                break;
             }
         }
     }
 
-    MR_UNLOCK(&spark_deques_lock, "attempt_steal_spark");
-
-    /* Steal unsuccessful. */
-    return MR_FALSE;
+    MR_ENGINE(MR_eng_victim_counter) = (i % MR_num_threads);
+    return result;
 }
 
 static void
@@ -1089,9 +1201,61 @@ MR_check_pending_contexts(MR_bool block)
 void
 MR_schedule_context(MR_Context *ctxt)
 {
+#ifdef MR_THREAD_SAFE
+    MR_EngineId engine_id;
+    union MR_engine_wake_action_data wake_action_data;
+    wake_action_data.MR_ewa_context = ctxt;
+
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
     MR_threadscope_post_context_runnable(ctxt);
 #endif
+
+    /*
+    ** Try to give this context straight to the engine that would execute it.
+    */
+    engine_id = ctxt->MR_ctxt_resume_owner_engine;
+#ifdef MR_DEBUG_THREADS
+    if (MR_debug_threads) {
+        fprintf(stderr, "%ld Scheduling context %p desired engine: %d\n",
+            MR_SELF_THREAD_ID, ctxt, engine_id);
+    }
+#endif
+    if (ctxt->MR_ctxt_resume_engine_required == MR_TRUE) {
+        /*
+        ** Only engine_id may execute this context, attempt to wake it.
+        */
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Context _must_ run on this engine\n",
+                MR_SELF_THREAD_ID);
+        }
+#endif
+        if (try_wake_engine(engine_id, MR_ENGINE_ACTION_CONTEXT,
+            &wake_action_data, ENGINE_STATE_IDLE | ENGINE_STATE_SLEEPING))
+        {
+            /*
+            ** We've successfully given the context to the correct engine.
+            */
+            return;
+        }
+    } else {
+        /*
+        ** If there is some idle engine try to wake it up, starting with the
+        ** preferred engine.
+        */
+        if (MR_num_idle_engines > 0) {
+            if (MR_try_wake_an_engine(engine_id, MR_ENGINE_ACTION_CONTEXT,
+                &wake_action_data, NULL))
+            {
+                /*
+                ** THe context has been given to a engine.
+                */
+                return;
+            }
+        }
+    }
+#endif
+
     MR_LOCK(&MR_runqueue_lock, "schedule_context");
     ctxt->MR_ctxt_next = NULL;
     if (MR_runqueue_tail) {
@@ -1101,254 +1265,579 @@ MR_schedule_context(MR_Context *ctxt)
         MR_runqueue_head = ctxt;
         MR_runqueue_tail = ctxt;
     }
-#ifdef MR_THREAD_SAFE
-    /*
-    ** Wake one or more threads waiting in MR_do_runnext. If there is a
-    ** possibility that a woken thread might not accept this context then
-    ** we wake up all the waiting threads.
-    */
-    if (MR_thread_equal(ctxt->MR_ctxt_resume_owner_thread, MR_null_thread())) {
-        MR_SIGNAL(&MR_runqueue_cond, "schedule_context");
-    } else {
-        MR_BROADCAST(&MR_runqueue_cond, "schedule_context");
-    }
-#endif
     MR_UNLOCK(&MR_runqueue_lock, "schedule_context");
 }
 
-#ifndef MR_HIGHLEVEL_CODE
-
-MR_define_extern_entry(MR_do_runnext);
+#ifdef MR_LL_PARALLEL_CONJ
+/*
+** Try to wake an engine, starting at the preferred engine
+*/
+MR_bool
+MR_try_wake_an_engine(MR_EngineId preferred_engine, int action,
+    union MR_engine_wake_action_data *action_data, MR_EngineId *target_eng)
+{
+    MR_EngineId current_engine;
+    int i = 0;
+    int state;
+    MR_bool result;
+
+    /*
+    ** Right now this algorithm is naive, it searches from the preferred engine
+    ** around the loop until it finds an engine.
+    */
+    for (i = 0; i < MR_num_threads; i++) {
+        current_engine = (i + preferred_engine) % MR_num_threads;
+        if (current_engine == MR_ENGINE(MR_eng_id)) {
+            /*
+            ** Don't post superfluous events to ourself
+            */
+            continue;
+        }
+        state = engine_sleep_sync_data[current_engine].d.es_state;
+        if (state == ENGINE_STATE_SLEEPING) {
+            result = try_wake_engine(current_engine, action, action_data,
+                    ENGINE_STATE_SLEEPING);
+            if (result) {
+                if (target_eng) {
+                    *target_eng = current_engine;
+                }
+                return MR_TRUE;
+            }
+        }
+    }
 
-MR_BEGIN_MODULE(scheduler_module)
-    MR_init_entry_an(MR_do_runnext);
-MR_BEGIN_CODE
+    return MR_FALSE;
+}
 
-MR_define_entry(MR_do_runnext);
-  #ifdef MR_THREAD_SAFE
+static MR_bool
+try_wake_engine(MR_EngineId engine_id, int action,
+    union MR_engine_wake_action_data *action_data, unsigned states)
 {
-    MR_Context          *ready_context;
-    MR_Code             *resume_point;
-    MR_Spark            spark;
-    MR_Unsigned         depth;
-    MercuryThread       thd;
-    struct timespec     timeout;
-
-    #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
-    MR_Timer            runnext_timer;
-    #endif
+    MR_bool success = MR_FALSE;
+    engine_sleep_sync *esync = &(engine_sleep_sync_data[engine_id]);
 
     /*
-    ** If this engine is holding onto a context, the context should not be
-    ** in the middle of running some code.
+    ** This engine is probably in the state our caller checked that it was in.
+    ** Wait on the semaphore then re-check the state to be sure.
     */
-    MR_assert(
-        MR_ENGINE(MR_eng_this_context) == NULL
-    ||
-        MR_wsdeque_is_empty(
-            &MR_ENGINE(MR_eng_this_context)->MR_ctxt_spark_deque)
-    );
-
-    depth = MR_ENGINE(MR_eng_c_depth);
-    thd = MR_ENGINE(MR_eng_owner_thread);
-
-    MR_atomic_inc_int(&MR_num_idle_engines);
-
-    #ifdef MR_THREADSCOPE
-    MR_threadscope_post_looking_for_global_work();
-    #endif
+    MR_SEM_WAIT(&(esync->d.es_wake_semaphore), "try_wake_engine, wake_sem");
+    MR_CPU_LFENCE;
+    if (esync->d.es_state & states) {
+        /*
+        ** We now KNOW that the engine is in one of the correct states.
+        **
+        ** We tell the engine what to do, and tell others that we've woken
+        ** it before actually waking it.
+        */
+        esync->d.es_action = action;
+        if (action_data) {
+            esync->d.es_action_data = *action_data;
+        }
+        esync->d.es_state = ENGINE_STATE_WOKEN;
+        MR_CPU_SFENCE;
+        MR_SEM_POST(&(esync->d.es_sleep_semaphore), "try_wake_engine sleep_sem");
+        success = MR_TRUE;
+    }
+    MR_SEM_POST(&(esync->d.es_wake_semaphore), "try_wake_engine wake_sem");
 
-    MR_LOCK(&MR_runqueue_lock, "MR_do_runnext (i)");
+    return success;
+}
 
-    while (1) {
+void
+MR_shutdown_all_engines(void)
+{
+    int i;
 
-    #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
-        if (MR_profile_parallel_execution) {
-            MR_profiling_start_timer(&runnext_timer);
+    for (i = 0; i < MR_num_threads; i++) {
+        if (i == MR_ENGINE(MR_eng_id)) {
+            continue;
         }
-    #endif
-
-        if (MR_exit_now) {
-            /*
-            ** The primordial thread has the responsibility of cleaning
-            ** up the Mercury runtime. It cannot exit by this route.
-            */
-            assert(!MR_thread_equal(thd, MR_primordial_thread));
-            MR_destroy_thread(MR_cur_engine());
-            MR_num_exited_engines++;
-            MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (ii)");
-            MR_atomic_dec_int(&MR_num_idle_engines);
-            pthread_exit(0);
+        try_wake_engine(i, MR_ENGINE_ACTION_SHUTDOWN, NULL,
+            ENGINE_STATE_ALL);
         }
 
-        ready_context = MR_find_ready_context(thd, depth);
-        if (ready_context != NULL) {
-            MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
-            MR_atomic_dec_int(&MR_num_idle_engines);
-            goto ReadyContext;
+    for (i = 0; i < (MR_num_threads - 1); i++) {
+        MR_SEM_WAIT(&shutdown_semaphore, "MR_shutdown_all_engines");
         }
-        /*
-        ** If execution reaches here then there are no suitable ready contexts.
-        */
+}
 
-        /*
-        ** A context may be created to execute a spark, so only attempt to
-        ** steal sparks if doing so would not exceed the limit of outstanding
-        ** contexts.
-        */
-        if (!((MR_ENGINE(MR_eng_this_context) == NULL) &&
-             (MR_max_outstanding_contexts <= MR_num_outstanding_contexts))) {
-            /* Attempt to steal a spark */
-            if (MR_attempt_steal_spark(&spark)) {
-                MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iv)");
-                MR_atomic_dec_int(&MR_num_idle_engines);
-                goto ReadySpark;
+#endif /* MR_LL_PARALLEL_CONJ */
+
+#ifndef MR_HIGHLEVEL_CODE
+
+/****************************************************************************
+**
+** Parallel runtime idle loop.
+**
+** This also contains code to run the next runnable context for non-parallel
+** low level C grades.
+**
+*/
+
+/*
+** The run queue used to include timing code, it's been removed and may be
+** added in the future.
+*/
+
+/*
+** If the call returns a non-null code pointer then jump to that address,
+** otherwise fall-through
+*/
+#define MR_MAYBE_TRAMPOLINE_AND_ACTION(call, action)                        \
+    do {                                                                    \
+        MR_Code *tramp;                                                     \
+        tramp = (call);                                                     \
+        if (tramp) {                                                        \
+            action;                                                         \
+            MR_GOTO(tramp);                                                 \
+        }                                                                   \
+    } while (0)
+#define MR_MAYBE_TRAMPOLINE(call) \
+    MR_MAYBE_TRAMPOLINE_AND_ACTION((call), )
+
+MR_define_extern_entry(MR_do_idle);
+
+#ifdef MR_THREAD_SAFE
+MR_define_extern_entry(MR_do_idle_clean_context);
+MR_define_extern_entry(MR_do_idle_dirty_context);
+MR_define_extern_entry(MR_do_sleep);
+
+static MR_Code*
+do_get_context(void);
+
+static MR_Code*
+do_local_spark(MR_Code *join_label);
+
+static MR_Code*
+do_work_steal(MR_Code *join_label);
+
+static void
+save_dirty_context(MR_Code *join_label);
+
+/*
+** Prepare the engine to execute a spark.  If join_label is not null then this
+** engine has a context that may not be compatible with the spark.  If it isn't
+** then the context must be saved with join_label as the resume point.
+*/
+static void
+prepare_engine_for_spark(volatile MR_Spark *spark, MR_Code *join_label);
+
+/*
+** Prepare the engine to execute a context.  This loads the context into the
+** engine after discarding any existing context.  All the caller need do is
+** jump to the resume/start point.
+*/
+static void
+prepare_engine_for_context(MR_Context *context);
+
+/*
+** Advertise that the engine is looking for work after being in the working state.
+** (Do not use this call when waking from sleep).
+*/
+static void
+advertise_engine_state_idle(void);
+
+/*
+** Advertise that the engine will begin working.
+*/
+static void
+advertise_engine_state_working(void);
+#endif
+
+MR_BEGIN_MODULE(scheduler_module_idle)
+    MR_init_entry_an(MR_do_idle);
+MR_BEGIN_CODE
+MR_define_entry(MR_do_idle);
+  #ifdef MR_THREAD_SAFE
+{
+    /*
+    ** Try to get a context.
+    **
+    ** Always look for local work first, even though we'd need to allocate a
+    ** context to execute it.  This is probably less efficient (TODO) but it's
+    ** safer. It makes it easier for the state of the machine to change before
+    ** it goes to sleep.
+    */
+    MR_MAYBE_TRAMPOLINE(do_local_spark(NULL));
+
+    advertise_engine_state_idle();
+
+    MR_MAYBE_TRAMPOLINE_AND_ACTION(do_get_context(),
+        advertise_engine_state_working());
+    MR_MAYBE_TRAMPOLINE_AND_ACTION(do_work_steal(NULL),
+        advertise_engine_state_working());
+    MR_GOTO(MR_ENTRY(MR_do_sleep));
+}
+  #else /* !MR_THREAD_SAFE */
+{
+    /*
+    ** When an engine becomes idle in a non parallel grade it simply picks up
+    ** another context.
+    */
+    if (MR_runqueue_head == NULL && MR_pending_contexts == NULL) {
+        MR_fatal_error("empty runqueue!");
             }
+
+    while (MR_runqueue_head == NULL) {
+        MR_check_pending_contexts(MR_TRUE); /* block */
         }
 
-        /* Nothing to do, go back to sleep. */
-    #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
-        if (MR_profile_parallel_execution) {
-            MR_profiling_stop_timer(&runnext_timer,
-                    &MR_profile_parallel_executed_nothing);
+    MR_ENGINE(MR_eng_this_context) = MR_runqueue_head;
+    MR_runqueue_head = MR_runqueue_head->MR_ctxt_next;
+    if (MR_runqueue_head == NULL) {
+        MR_runqueue_tail = NULL;
         }
-    #endif
 
-        MR_milliseconds_from_now(&timeout, MR_worksteal_sleep_msecs);
-        MR_TIMED_WAIT(&MR_runqueue_cond, &MR_runqueue_lock, &timeout,
-            "do_runnext");
+    MR_load_context(MR_ENGINE(MR_eng_this_context));
+    MR_GOTO(MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume);
+}
+  #endif /* !MR_THREAD_SAFE */
+MR_END_MODULE
+
+#ifdef MR_THREAD_SAFE
+MR_BEGIN_MODULE(scheduler_module_idle_clean_context)
+    MR_init_entry_an(MR_do_idle_clean_context);
+MR_BEGIN_CODE
+MR_define_entry(MR_do_idle_clean_context);
+{
+    MR_MAYBE_TRAMPOLINE(do_local_spark(NULL));
+
+    advertise_engine_state_idle();
+
+    MR_MAYBE_TRAMPOLINE_AND_ACTION(do_work_steal(NULL),
+        advertise_engine_state_working());
+    MR_MAYBE_TRAMPOLINE_AND_ACTION(do_get_context(),
+        advertise_engine_state_working());
+    MR_GOTO(MR_ENTRY(MR_do_sleep));
+}
+MR_END_MODULE
+#endif /* MR_THREAD_SAFE */
+
+#ifdef MR_THREAD_SAFE
+MR_BEGIN_MODULE(scheduler_module_idle_dirty_context)
+    MR_init_entry_an(MR_do_idle_dirty_context);
+MR_BEGIN_CODE
+MR_define_entry(MR_do_idle_dirty_context);
+{
+    MR_Code *join_label = (MR_Code*)MR_r1;
+
+    MR_MAYBE_TRAMPOLINE(do_local_spark(join_label));
+
+    advertise_engine_state_idle();
+
+    MR_MAYBE_TRAMPOLINE_AND_ACTION(do_work_steal(join_label),
+        advertise_engine_state_working());
+
+    /*
+    ** Save the dirty context, we can't take it to sleep and it won't be used
+    ** if do_get_context() succeeds.
+    */
+    save_dirty_context(join_label);
+    MR_ENGINE(MR_eng_this_context) = NULL;
+
+    MR_MAYBE_TRAMPOLINE_AND_ACTION(do_get_context(),
+        advertise_engine_state_working());
+    MR_GOTO(MR_ENTRY(MR_do_sleep));
+}
+MR_END_MODULE
+
+/*
+** Put the engine to sleep since there's no work to do.
+**
+** This call does not return.
+**
+** REQUIREMENT: Only call this with either no context or a clean context.
+** REQUIREMENT: This must be called from the same C and Mercury stack depths as
+**              the call into the idle loop.
+*/
+MR_BEGIN_MODULE(scheduler_module_idle_sleep)
+    MR_init_entry_an(MR_do_sleep);
+MR_BEGIN_CODE
+MR_define_entry(MR_do_sleep);
+{
+    MR_EngineId engine_id = MR_ENGINE(MR_eng_id);
+    unsigned action;
+    int result;
+
+    while (1) {
+        engine_sleep_sync_data[engine_id].d.es_state = ENGINE_STATE_SLEEPING;
+        MR_CPU_SFENCE;
+        result = MR_SEM_WAIT(
+            &(engine_sleep_sync_data[engine_id].d.es_sleep_semaphore),
+            "MR_do_sleep sleep_sem");
+
+        if (0 == result) {
+            MR_CPU_LFENCE;
+            action = engine_sleep_sync_data[engine_id].d.es_action;
+#ifdef MR_DEBUG_THREADS
+            if (MR_debug_threads) {
+                fprintf(stderr, "%ld Engine %d is awake and will do action %d\n",
+                    MR_SELF_THREAD_ID, engine_id, action);
     }
-    /* unreachable */
-    abort();
+#endif
 
-ReadyContext:
+            switch(action) {
+                case MR_ENGINE_ACTION_SHUTDOWN:
+                    /*
+                    ** The primordial thread has the responsibility of cleaning
+                    ** up the Mercury runtime. It cannot exit by this route.
+                    */
+                    assert(engine_id != 0);
+                    MR_atomic_dec_int(&MR_num_idle_engines);
+                    MR_destroy_thread(MR_cur_engine());
+                    MR_SEM_POST(&shutdown_semaphore, "MR_do_sleep shutdown_sem");
+                    pthread_exit(0);
+                    break;
+
+                case MR_ENGINE_ACTION_WORKSTEAL:
+                    MR_ENGINE(MR_eng_victim_counter) =
+                        engine_sleep_sync_data[engine_id].d.es_action_data.
+                        MR_ewa_worksteal_engine;
+                    MR_MAYBE_TRAMPOLINE(do_work_steal(NULL));
+                    MR_MAYBE_TRAMPOLINE(do_get_context());
+                    break;
+
+                case MR_ENGINE_ACTION_CONTEXT:
+                    {
+                        MR_Context *context;
+                        MR_Code *resume_point;
+
+                        context = engine_sleep_sync_data[engine_id].d.
+                            es_action_data.MR_ewa_context;
+                        prepare_engine_for_context(context);
 
-    /* Discard whatever unused context we may have and switch to tmp. */
-    if (MR_ENGINE(MR_eng_this_context) != NULL) {
     #ifdef MR_DEBUG_STACK_SEGMENTS
-        MR_debug_log_message("destroying old context %p",
-            MR_ENGINE(MR_eng_this_context));
+                        MR_debug_log_message("resuming old context: %p",
+                            context);
     #endif
-        MR_destroy_context(MR_ENGINE(MR_eng_this_context));
+
+                        resume_point = (MR_Code*)(context->MR_ctxt_resume);
+                        context->MR_ctxt_resume = NULL;
+
+                        MR_GOTO(resume_point);
     }
-    #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
-    if (MR_profile_parallel_execution) {
-        MR_profiling_stop_timer(&runnext_timer,
-                &MR_profile_parallel_executed_contexts);
+                    break;
+
+                case MR_ENGINE_ACTION_NONE:
+                default:
+                    MR_MAYBE_TRAMPOLINE(do_get_context());
+                    MR_MAYBE_TRAMPOLINE(do_work_steal(NULL));
+                    break;
+            }
+        } else {
+            /*
+            ** sem_wait reported an error
+            */
+            switch (errno) {
+                case EINTR:
+                    /*
+                    ** An interrupt woke the engine, go back to sleep.
+                    */
+                    break;
+                default:
+                    perror("sem_post");
+                    abort();
     }
+        }
+    }
+}
+MR_END_MODULE
+#endif
+
+#ifdef MR_THREAD_SAFE
+
+static MR_Code*
+do_get_context(void)
+{
+    MR_Context *ready_context;
+    MR_Code *resume_point;
+
+    /*
+    ** Look for a runnable context and execute it.  If there was no runnable
+    ** context, then proceed to MR_do_runnext_local.
+    */
+
+    #ifdef MR_THREADSCOPE
+    MR_threadscope_post_looking_for_global_context();
     #endif
-    MR_ENGINE(MR_eng_this_context) = ready_context;
-    MR_load_context(ready_context);
+
+    MR_LOCK(&MR_runqueue_lock, "do_get_context (i)");
+    ready_context = MR_find_ready_context();
+    MR_UNLOCK(&MR_runqueue_lock, "do_get_context (ii)");
+
+    if (ready_context != NULL) {
+        prepare_engine_for_context(ready_context);
+
     #ifdef MR_DEBUG_STACK_SEGMENTS
     MR_debug_log_message("resuming old context: %p", ready_context);
     #endif
 
     resume_point = (MR_Code*)(ready_context->MR_ctxt_resume);
     ready_context->MR_ctxt_resume = NULL;
-    MR_GOTO(resume_point);
 
-ReadySpark:
+        return resume_point;
+    }
 
+    return NULL;
+}
+
+static void
+prepare_engine_for_context(MR_Context *context) {
+    /*
+    ** Discard whatever unused context we may have and switch to the new one.
+    */
+    if (MR_ENGINE(MR_eng_this_context) != NULL) {
     #ifdef MR_DEBUG_STACK_SEGMENTS
-    MR_debug_log_message("stole spark: st: %p", spark.MR_spark_sync_term);
+        MR_debug_log_message("destroying old context %p",
+            MR_ENGINE(MR_eng_this_context));
     #endif
+        MR_destroy_context(MR_ENGINE(MR_eng_this_context));
+    }
+    MR_ENGINE(MR_eng_this_context) = context;
+    MR_load_context(context);
+}
 
-  #if 0 /* This is a complicated optimisation that may not be worth-while */
-    if (!spark.MR_spark_sync_term->MR_st_is_shared) {
-        spark.MR_spark_sync_term_is_shared = MR_TRUE;
-        /*
-        ** If we allow the stolen spark (New) to execute immediately
-        ** there could be a race with a sibling conjunct (Old) which is
-        ** currently executing, e.g.
-        **
-        ** 1. Old enters MR_join_and_continue(), loads old value of
-        **    MR_st_count;
-        ** 2. New begins executing;
-        ** 3. New enters MR_join_and_continue(), decrements MR_st_count
-        **    atomically;
-        ** 4. Old decrements MR_st_count *non-atomically* based on the
-        **    old value of MR_st_count.
-        **
-        ** Therefore this loop delays the new spark from executing
-        ** while there is another conjunct in MR_join_and_continue()
-        ** which might decrement MR_st_count non-atomically.
+static void
+prepare_engine_for_spark(volatile MR_Spark *spark, MR_Code *join_label)
+{
+    MR_Context *this_context = MR_ENGINE(MR_eng_this_context);
+
+    /*
+    ** We need to save this context if it is dirty and incompatible with
+    ** this spark.
         */
-        while (spark.MR_spark_sync_term->MR_st_attempt_cheap_join) {
-            MR_sched_yield();
-        }
+    if ((this_context != NULL) &&
+        (join_label != NULL) &&
+        (spark->MR_spark_sync_term->MR_st_orig_context != this_context))
+    {
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+        MR_debug_log_message("Saving old dirty context %p", this_context);
+#endif
+        save_dirty_context(join_label);
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+        MR_debug_log_message("done.");
+#endif
+        this_context = NULL;
     }
-  #endif
-
-    /* Grab a new context if we haven't got one then begin execution. */
-    if (MR_ENGINE(MR_eng_this_context) == NULL) {
+    if (this_context == NULL) {
+        /*
+        ** Get a new context
+        */
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+        MR_debug_log_message("Need a new context.");
+#endif
         MR_ENGINE(MR_eng_this_context) = MR_create_context("from spark",
             MR_CONTEXT_SIZE_FOR_SPARK, NULL);
-    #ifdef MR_THREADSCOPE
+#ifdef MR_THREADSCOPE
         MR_threadscope_post_create_context_for_spark(
             MR_ENGINE(MR_eng_this_context));
-    #endif
-    #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
+#endif
+/*
+#ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
         if (MR_profile_parallel_execution) {
             MR_atomic_inc_int(
                 &MR_profile_parallel_contexts_created_for_sparks);
         }
-    #endif
+#endif
+*/
         MR_load_context(MR_ENGINE(MR_eng_this_context));
-    #ifdef MR_DEBUG_STACK_SEGMENTS
+#ifdef MR_DEBUG_STACK_SEGMENTS
         MR_debug_log_message("created new context for spark: %p",
             MR_ENGINE(MR_eng_this_context));
-    #endif
+#endif
+    }
 
-    } else {
-    #ifdef MR_THREADSCOPE
         /*
-        ** Allocate a new context Id so that someone looking at the threadscope
-        ** profile sees this as new work.
+    ** At this point we have a context, either a dirty context that's compatbile or a clean one.
         */
-        MR_ENGINE(MR_eng_this_context)->MR_ctxt_num_id = allocate_context_id();
-        MR_threadscope_post_run_context();
-    #endif
-    }
-    MR_parent_sp = spark.MR_spark_sync_term->MR_st_parent_sp;
-    MR_SET_THREAD_LOCAL_MUTABLES(spark.MR_spark_thread_local_mutables);
+    MR_parent_sp = spark->MR_spark_sync_term->MR_st_parent_sp;
+    MR_SET_THREAD_LOCAL_MUTABLES(spark->MR_spark_thread_local_mutables);
 
     MR_assert(MR_parent_sp);
     MR_assert(MR_parent_sp != MR_sp);
     MR_assert(spark.MR_spark_sync_term->MR_st_count > 0);
+}
 
-    #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
-    if (MR_profile_parallel_execution) {
-        MR_profiling_stop_timer(&runnext_timer,
-                &MR_profile_parallel_executed_global_sparks);
+static MR_Code*
+do_local_spark(MR_Code *join_label)
+{
+    volatile MR_Spark *spark;
+
+    spark = MR_wsdeque_pop_bottom(&MR_ENGINE(MR_eng_spark_deque));
+    if (NULL == spark) {
+        return NULL;
     }
-    #endif
-    #ifdef MR_THREADSCOPE
-    MR_threadscope_post_steal_spark(spark.MR_spark_id);
-    #endif
-    MR_GOTO(spark.MR_spark_resume);
+
+#ifdef MR_THREADSCOPE
+    MR_threadscope_post_run_spark(spark->MR_spark_id);
+#endif
+    prepare_engine_for_spark(spark, join_label);
+    return spark->MR_spark_resume;
 }
-  #else /* !MR_THREAD_SAFE */
+
+static MR_Code*
+do_work_steal(MR_Code *join_label)
 {
-    if (MR_runqueue_head == NULL && MR_pending_contexts == NULL) {
-        MR_fatal_error("empty runqueue!");
-    }
+    MR_Spark spark;
 
-    while (MR_runqueue_head == NULL) {
-        MR_check_pending_contexts(MR_TRUE); /* block */
-    }
+    #ifdef MR_THREADSCOPE
+    MR_threadscope_post_work_stealing();
+    #endif
 
-    MR_ENGINE(MR_eng_this_context) = MR_runqueue_head;
-    MR_runqueue_head = MR_runqueue_head->MR_ctxt_next;
-    if (MR_runqueue_head == NULL) {
-        MR_runqueue_tail = NULL;
+    /*
+    ** A context may be created to execute a spark, so only attempt to
+    ** steal sparks if doing so would not exceed the limit of outstanding
+    ** contexts.
+    */
+    if (!((MR_ENGINE(MR_eng_this_context) == NULL) &&
+         (MR_max_outstanding_contexts <= MR_num_outstanding_contexts))) {
+        /* Attempt to steal a spark */
+        if (MR_attempt_steal_spark(&spark)) {
+#ifdef MR_THREADSCOPE
+            MR_threadscope_post_steal_spark(spark.MR_spark_id);
+#endif
+            prepare_engine_for_spark(&spark, join_label);
+            return spark.MR_spark_resume;
+        }
     }
 
-    MR_load_context(MR_ENGINE(MR_eng_this_context));
-    MR_GOTO(MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume);
+    return NULL;
 }
-  #endif /* !MR_THREAD_SAFE */
 
-MR_END_MODULE
+static void
+save_dirty_context(MR_Code *join_label) {
+    MR_Context *this_context = MR_ENGINE(MR_eng_this_context);
+
+#ifdef MR_THREADSCOPE
+    MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
+#endif
+    this_context->MR_ctxt_resume_owner_engine = MR_ENGINE(MR_eng_id);
+    MR_save_context(this_context);
+    /*
+    ** Make sure the context gets saved before we set the join
+    ** label, use a memory barrier.
+    */
+    MR_CPU_SFENCE;
+    this_context->MR_ctxt_resume = join_label;
+    MR_ENGINE(MR_eng_this_context) = NULL;
+}
+
+static void
+advertise_engine_state_idle(void)
+{
+    engine_sleep_sync_data[MR_ENGINE(MR_eng_id)].d.es_state = ENGINE_STATE_IDLE;
+    MR_CPU_SFENCE;
+    MR_atomic_inc_int(&MR_num_idle_engines);
+}
+
+static void
+advertise_engine_state_working(void)
+{
+    MR_atomic_dec_int(&MR_num_idle_engines);
+    MR_CPU_SFENCE;
+    engine_sleep_sync_data[MR_ENGINE(MR_eng_id)].d.es_state = ENGINE_STATE_WORKING;
+}
+#endif /* MR_THREAD_SAFE */
 
 #endif /* !MR_HIGHLEVEL_CODE */
 
@@ -1376,21 +1865,36 @@ MR_do_join_and_continue(MR_SyncTerm *jnc
 
     jnc_last = MR_atomic_dec_and_is_zero_uint(&(jnc_st->MR_st_count));
 
-    if (jnc_last) {
         if (this_context == jnc_st->MR_st_orig_context) {
             /*
-            ** This context originated this parallel conjunction and all the
-            ** branches have finished so jump to the join label.
+        ** This context originated this parallel conjunction.
+        */
+        if (jnc_last) {
+            /*
+            ** All the conjuncts have finished so jump to the join label.
             */
             return join_label;
         } else {
-  #ifdef MR_THREADSCOPE
+            /*
+            ** This context is dirty, it is needed to complete the parallel
+            ** conjunction
+            */
+            MR_r1 = (MR_Word)join_label;
+            return MR_ENTRY(MR_do_idle_dirty_context);
+        }
+    } else {
+        /*
+        ** This context is now clean, it can be used to execute _any_ spark.
+        */
+        if (jnc_last) {
+#ifdef MR_THREADSCOPE
             MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
-  #endif
+#endif
             /*
             ** This context didn't originate this parallel conjunction and
             ** we're the last branch to finish. The originating context should
-            ** be suspended waiting for us to finish, so wake it up.
+            ** be suspended waiting for us to finish, we should run it using
+            ** the current engine.
             **
             ** We could be racing with the original context, in which case we
             ** have to make sure that it is ready to be scheduled before we
@@ -1401,52 +1905,13 @@ MR_do_join_and_continue(MR_SyncTerm *jnc
                 /* XXX: Need to configure using sched_yeild or spin waiting */
                 MR_ATOMIC_PAUSE;
             }
-            MR_schedule_context(jnc_st->MR_st_orig_context);
-            return MR_ENTRY(MR_do_runnext);
-        }
-    } else {
-        volatile MR_Spark *spark;
-
-        /*
-        ** The parallel conjunction it is not yet finished. Try to work on a
-        ** spark from our local stack. The sparks on our stack are likely to
-        ** cause this conjunction to be complete.
-        */
-        spark = MR_wsdeque_pop_bottom(&this_context->MR_ctxt_spark_deque);
-        if (NULL != spark) {
 #ifdef MR_THREADSCOPE
-            MR_threadscope_post_run_spark(spark->MR_spark_id);
+            MR_threadscope_post_context_runnable(jnc_st->MR_st_orig_context);
 #endif
-            return spark->MR_spark_resume;
-        } else {
-            /*
-            ** If this context originated the parallel conjunction that we've
-            ** been executing, suspend this context so that it will be
-            ** resumed at the join label once the parallel conjunction is
-            ** completed.
-            **
-            ** Otherwise we can reuse this context for the next piece of work.
-            */
-            if (this_context == jnc_st->MR_st_orig_context) {
-  #ifdef MR_THREADSCOPE
-                MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
-  #endif
-                MR_save_context(this_context);
-                /*
-                ** Make sure the context gets saved before we set the join
-                ** label, use a memory barrier.
-                */
-                MR_CPU_SFENCE;
-                this_context->MR_ctxt_resume = (join_label);
-                MR_ENGINE(MR_eng_this_context) = NULL;
-            } else {
-  #ifdef MR_THREADSCOPE
-                MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
-  #endif
-            }
-
-            return MR_ENTRY(MR_do_runnext);
+            prepare_engine_for_context(jnc_st->MR_st_orig_context);
+            return join_label;
         }
+        return MR_ENTRY(MR_do_idle_clean_context);
     }
 }
 #endif
@@ -1517,7 +1982,12 @@ void mercury_sys_init_scheduler_wrapper_
 void mercury_sys_init_scheduler_wrapper_init(void)
 {
 #ifndef MR_HIGHLEVEL_CODE
-    scheduler_module();
+    scheduler_module_idle();
+#ifdef MR_THREAD_SAFE
+    scheduler_module_idle_clean_context();
+    scheduler_module_idle_dirty_context();
+    scheduler_module_idle_sleep();
+#endif
 #endif
 }
 
Index: runtime/mercury_context.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.h,v
retrieving revision 1.65
diff -u -p -b -r1.65 mercury_context.h
--- runtime/mercury_context.h	13 Apr 2011 06:29:55 -0000	1.65
+++ runtime/mercury_context.h	13 Apr 2011 13:02:04 -0000
@@ -117,15 +117,22 @@
 **                  when this context is next scheduled.
 **                  (Accessed via MR_eng_this_context.)
 **
-** resume_owner_thread
+** resume_owner_engine
+**                  When resuming a context this is the engine that it
+**                  prefers to be resumed on, Doing so can avoid cache misses
+**                  as the engine's cache may already be warm.
+**                  (Accessed only when directly specifying the context.)
+**
+** resume_engine_required
 ** resume_c_depth
 **                  These fields are used to ensure that when we enter a
 **                  Mercury engine from C, we return to the same engine. If
-**                  resume_owner_thread is NULL then this context can be
-**                  executed by any engine. Otherwise the resume_owner_thread
-**                  and resume_c_depth must match the engine's owner_thread
-**                  and c_depth. See the comments in mercury_engine.h.
-**                  (Both accessed only when directly specifying the context.)
+**                  resume_engine_required is MR_FALSE then this context can be
+**                  executed by any engine and resume_owner_engine is simply a
+**                  preference. Otherwise the resume_owner_engine and
+**                  resume_c_depth must match the engine's id and c_depth. See
+**                  the comments in mercury_engine.h.  (Both accessed only when
+**                  directly specifying the context.)
 **
 ** saved_owners
 **                  A stack used to record the Mercury engines on which this
@@ -171,10 +178,6 @@
 ** parent_sp        The saved parent_sp for this context.
 **                  (Accessed via abstract machine register.)
 **
-** spark_deque      The sparks generated by this context.
-**                  (Accessed usually by explicitly specifying the context,
-**                  but also via MR_eng_this_context.)
-**
 ** trail_zone       The trail zone for this context.
 ** prev_trail_zones A list of any previous trail zones for this context.
 **                  (Accessed via MR_eng_context.)
@@ -224,7 +227,7 @@ typedef enum {
 typedef struct MR_SavedOwner_Struct     MR_SavedOwner;
 
 struct MR_SavedOwner_Struct {
-    MercuryThread       MR_saved_owner_thread;
+    MR_EngineId         MR_saved_owner_engine;
     MR_Unsigned         MR_saved_owner_c_depth;
     MR_SavedOwner       *MR_saved_owner_next;
 };
@@ -276,7 +279,8 @@ struct MR_Context_Struct {
     MR_Code             *MR_ctxt_resume;
 #endif
 #ifdef  MR_THREAD_SAFE
-    MercuryThread       MR_ctxt_resume_owner_thread;
+    MR_EngineId         MR_ctxt_resume_owner_engine;
+    MR_bool             MR_ctxt_resume_engine_required;
     MR_Unsigned         MR_ctxt_resume_c_depth;
     MR_SavedOwner       *MR_ctxt_saved_owners;
 #endif
@@ -310,7 +314,6 @@ struct MR_Context_Struct {
 
   #ifdef MR_LL_PARALLEL_CONJ
     MR_Word             *MR_ctxt_parent_sp;
-    MR_SparkDeque       MR_ctxt_spark_deque;
   #endif
 #endif /* !MR_HIGHLEVEL_CODE */
 
@@ -416,6 +419,12 @@ extern  MR_PendingContext   *MR_pending_
   ** resources.
   */
   extern volatile MR_Unsigned   MR_num_exited_engines;
+
+  /*
+  ** Spark deques for work stealing,  These are made visible so that they can
+  ** be initialised by code in mercury_thread.c.
+  */
+  extern MR_SparkDeque          **MR_spark_deques;
 #endif  /* !MR_LL_PARALLEL_CONJ */
 
 /*---------------------------------------------------------------------------*/
@@ -442,10 +451,10 @@ extern  MR_Context  *MR_create_context(c
 extern  void        MR_destroy_context(MR_Context *context);
 
 /*
-** MR_init_thread_stuff() initializes the lock structures for the runqueue,
+** MR_init_context_stuff() initializes the lock structures for the runqueue,
 ** and detects the number of threads to use on the LLC backend.
 */
-extern  void        MR_init_thread_stuff(void);
+extern  void        MR_init_context_stuff(void);
 
 /*
 ** MR_pin_thread() pins the current thread to the next available processor ID,
@@ -453,15 +462,29 @@ extern  void        MR_init_thread_stuff
 ** MR_pin_primordial_thread() is a special case for the primordial thread.  It
 ** should only be executed once, and only by the primordial thread _before_
 ** the other threads are started.
+**
+** Both functions return the CPU number that the thread is pinned to or would
+** be pinned to if pinning was both enabled and supported.  That is a valid
+** value is always returned even if the thread is not actually pinned.
+*/
+#if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
+extern unsigned
+MR_pin_primordial_thread(void);
+extern unsigned
+MR_pin_thread(void);
+
+/*
+** Shutdown all the engines.
 */
-extern  void        MR_pin_primordial_thread(void);
-extern  void        MR_pin_thread(void);
+extern void
+MR_shutdown_all_engines(void);
+#endif
 
 /*
-** MR_finalize_thread_stuff() finalizes the lock structures for the runqueue
-** among other things setup by MR_init_thread_stuff().
+** MR_finalize_context_stuff() finalizes the lock structures for the runqueue
+** among other things setup by MR_init_context_stuff().
 */
-extern  void        MR_finalize_thread_stuff(void);
+extern  void        MR_finalize_context_stuff(void);
 
 /*
 ** MR_flounder() aborts with a runtime error message. It is called if
@@ -481,11 +504,29 @@ extern  void        MR_sched_yield(void)
 extern  void        MR_schedule_context(MR_Context *ctxt);
 
 #ifndef MR_HIGHLEVEL_CODE
-  MR_declare_entry(MR_do_runnext);
-  #define MR_runnext()                          \
+/*
+** MR_idle() should be called by an engine without a context that is looking
+** for more work.
+*/
+  MR_declare_entry(MR_do_idle);
+  #define MR_idle()                                     \
     do {                                        \
-        MR_GOTO(MR_ENTRY(MR_do_runnext));       \
+        MR_GOTO(MR_ENTRY(MR_do_idle));                  \
     } while (0)
+/*
+** MR_do_idle_clean_context should be used by an engine that is becoming idle
+** with a context that may be re-used.
+*/
+  MR_declare_entry(MR_do_idle_clean_context);
+/*
+** The same is true for MR_do_idle_dirty_context except that the context is
+** blocked on the end of a parallel conjunction, It may nnly be used to execute
+** sparks that contribute to that parallel conjunction.
+**
+** This takes one argument in MR_r1, the join label at the end of the parallel
+** conjunction.
+*/
+  MR_declare_entry(MR_do_idle_dirty_context);
 #endif
 
 #ifndef MR_CONSERVATIVE_GC
@@ -762,35 +803,35 @@ extern  void        MR_schedule_context(
   ** MR_parent_sp must already be set appropriately before this instruction
   ** is executed.
   */
-#ifdef MR_THREADSCOPE
-  #define MR_fork_new_child(sync_term, child)                                 \
-    do {                                                                      \
+#define MR_fork_new_child(sync_term, child)                                  \
+do {                                                                         \
         MR_Spark            fnc_spark;                                        \
         MR_SparkDeque       *fnc_deque;                                       \
+    MR_EngineId         engine_id = MR_ENGINE(MR_eng_id);                    \
+    MR_IF_THREADSCOPE(                                                       \
         MR_uint_least32_t   id;                                               \
+    )                                                                        \
+    union MR_engine_wake_action_data action_data;                            \
                                                                               \
         fnc_spark.MR_spark_sync_term = (MR_SyncTerm*) &(sync_term);           \
         fnc_spark.MR_spark_resume = (child);                                  \
         fnc_spark.MR_spark_thread_local_mutables = MR_THREAD_LOCAL_MUTABLES;  \
+    MR_IF_THREADSCOPE(                                                       \
         id = MR_ENGINE(MR_eng_next_spark_id)++;                               \
-        fnc_spark.MR_spark_id = (MR_ENGINE(MR_eng_id) << 24)|(id & 0xFFFFFF); \
-        fnc_deque = &MR_ENGINE(MR_eng_this_context)->MR_ctxt_spark_deque;     \
+        fnc_spark.MR_spark_id = (engine_id << 24)|(id & 0xFFFFFF);           \
+    )                                                                        \
+    fnc_deque = &MR_ENGINE(MR_eng_spark_deque);                              \
         MR_wsdeque_push_bottom(fnc_deque, &fnc_spark);                        \
+    MR_IF_THREADSCOPE(                                                       \
         MR_threadscope_post_sparking(&(sync_term), fnc_spark.MR_spark_id);    \
-    } while (0)
-#else
-  #define MR_fork_new_child(sync_term, child)                                 \
-    do {                                                                      \
-        MR_Spark fnc_spark;                                                   \
-        MR_SparkDeque   *fnc_deque;                                           \
-                                                                              \
-        fnc_spark.MR_spark_sync_term = (MR_SyncTerm*) &(sync_term);           \
-        fnc_spark.MR_spark_resume = (child);                                  \
-        fnc_spark.MR_spark_thread_local_mutables = MR_THREAD_LOCAL_MUTABLES;  \
-        fnc_deque = &MR_ENGINE(MR_eng_this_context)->MR_ctxt_spark_deque;     \
-        MR_wsdeque_push_bottom(fnc_deque, &fnc_spark);                        \
-    } while (0)
-#endif
+    )                                                                        \
+    action_data.MR_ewa_worksteal_engine = MR_ENGINE(MR_eng_id);              \
+    if (MR_num_idle_engines > 0) {                                           \
+        MR_try_wake_an_engine(MR_ENGINE(MR_eng_id),                          \
+            MR_ENGINE_ACTION_WORKSTEAL,                                      \
+            &action_data, NULL);                                             \
+    }                                                                        \
+} while (0)
 
   /*
   ** This macro may be used as conditions for runtime parallelism decisions.
@@ -810,7 +851,7 @@ extern  void        MR_schedule_context(
   ** the first-level cache.
   */
   #define MR_par_cond_local_wsdeque_length                                    \
-      (MR_wsdeque_length(&MR_ENGINE(MR_eng_this_context)->MR_ctxt_spark_deque) < \
+      (MR_wsdeque_length(&MR_ENGINE(MR_eng_spark_deque)) < \
         MR_granularity_wsdeque_length)
 
 extern MR_Code*
@@ -827,6 +868,49 @@ MR_do_join_and_continue(MR_SyncTerm *syn
   /* This needs to come after the definition of MR_SparkDeque_Struct. */
   #include "mercury_wsdeque.h"
 
+/*
+** This structure and function can be used to wake up a sleeping engine, it's
+** exported here for use by the MR_fork_new_child macro above.
+*/
+
+#define MR_ENGINE_ACTION_NONE 0
+#define MR_ENGINE_ACTION_CONTEXT 1
+#define MR_ENGINE_ACTION_WORKSTEAL 2
+#define MR_ENGINE_ACTION_SHUTDOWN 3
+
+union MR_engine_wake_action_data {
+    /*
+    ** This is provided for workstealing actions, to let the engine know
+    ** where to look for work to steal.
+    */
+    MR_EngineId     MR_ewa_worksteal_engine;
+
+    /*
+    ** This is provided for context actions.
+    */
+    MR_Context      *MR_ewa_context;
+};
+
+/*
+** Try to wake a sleeping engine.
+**
+** preferred_engine - The engine we'd like to wake up, a nearby engine will
+**                    often be chosen so it's okay to name the current engine
+**                    in this field.
+**
+** action           - The action to run, see the macros above.
+**
+** action_data      - Extra data for the action, if not applicable pass NULL.
+**
+** target_engine    - If the call succeeds and this parameter is non-null, the
+**                    ID of the engine that received this message is written to
+**                    this address.
+**
+** This returns MR_TRUE if successful MR_FALSE otherwise.
+*/
+MR_bool
+MR_try_wake_an_engine(MR_EngineId perferred_engine, int action,
+    union MR_engine_wake_action_data *action_data, MR_EngineId *target_engine);
 #ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
 
   /*
Index: runtime/mercury_engine.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_engine.c,v
retrieving revision 1.64
diff -u -p -b -r1.64 mercury_engine.c
--- runtime/mercury_engine.c	5 Apr 2011 10:27:26 -0000	1.64
+++ runtime/mercury_engine.c	13 Apr 2011 12:03:38 -0000
@@ -20,8 +20,7 @@ ENDINIT
 #include    "mercury_engine.h"
 #include    "mercury_memory_zones.h"    /* for MR_create_zone() */
 #include    "mercury_memory_handlers.h" /* for MR_default_handler() */
-#include    "mercury_threadscope.h"     /* for MR_threadscope_setup_engine()
-                                           and event posting */
+#include    "mercury_threadscope.h"     /* for event posting */
 
 #include    "mercury_dummy.h"
 
@@ -79,14 +78,7 @@ MR_Debug_Flag_Info  MR_debug_flag_info[M
     { "detail",         MR_DETAILFLAG }
 };
 
-#ifdef MR_THREAD_SAFE
-  #ifndef MR_HIGHLEVEL_CODE
-/*
-** Writes to this array are protected by the init_engine_array_lock.
-*/
-MercuryEngine **MR_all_engine_bases = NULL;
-  #endif
-#else
+#ifndef MR_THREAD_SAFE
 MercuryEngine MR_engine_base;
 #endif
 
@@ -156,8 +148,9 @@ MR_init_engine(MercuryEngine *eng)
     eng->MR_eng_c_depth = 0;
 #endif
 
-#ifdef MR_THREADSCOPE
-    MR_threadscope_setup_engine(eng);
+#ifdef MR_LL_PARALLEL_CONJ
+    MR_init_wsdeque(&(eng->MR_eng_spark_deque),
+        MR_INITIAL_SPARK_DEQUE_SIZE);
 #endif
 
     /*
@@ -489,11 +482,11 @@ dummy_label:
 #ifdef  MR_THREAD_SAFE
     MR_ENGINE(MR_eng_c_depth)++;
 
-    if (MR_ENGINE(MR_eng_this_context)) {
+    if (MR_ENGINE(MR_eng_this_context) != NULL) {
         MR_SavedOwner *owner;
 
         owner = MR_GC_NEW(MR_SavedOwner);
-        owner->MR_saved_owner_thread = MR_ENGINE(MR_eng_owner_thread);
+        owner->MR_saved_owner_engine = MR_ENGINE(MR_eng_id);
         owner->MR_saved_owner_c_depth = MR_ENGINE(MR_eng_c_depth);
         owner->MR_saved_owner_next =
             MR_ENGINE(MR_eng_this_context)->MR_ctxt_saved_owners;
@@ -525,7 +518,7 @@ MR_define_label(engine_done);
         owner = this_ctxt->MR_ctxt_saved_owners;
         this_ctxt->MR_ctxt_saved_owners = owner->MR_saved_owner_next;
 
-        if (MR_thread_equal(owner->MR_saved_owner_thread, MR_ENGINE(MR_eng_owner_thread)) &&
+        if ((owner->MR_saved_owner_engine == MR_ENGINE(MR_eng_id)) &&
             owner->MR_saved_owner_c_depth == MR_ENGINE(MR_eng_c_depth))
         {
             MR_GC_free(owner);
@@ -537,13 +530,14 @@ MR_define_label(engine_done);
 #endif
         MR_save_context(this_ctxt);
         this_ctxt->MR_ctxt_resume = MR_LABEL(engine_done_2);
-        this_ctxt->MR_ctxt_resume_owner_thread = owner->MR_saved_owner_thread;
+        this_ctxt->MR_ctxt_resume_owner_engine = owner->MR_saved_owner_engine;
         this_ctxt->MR_ctxt_resume_c_depth = owner->MR_saved_owner_c_depth;
+        this_ctxt->MR_ctxt_resume_engine_required = MR_TRUE;
         MR_GC_free(owner);
         MR_schedule_context(this_ctxt);
 
         MR_ENGINE(MR_eng_this_context) = NULL;
-        MR_runnext();
+        MR_idle();
     }
 #endif
 
Index: runtime/mercury_engine.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_engine.h,v
retrieving revision 1.55
diff -u -p -b -r1.55 mercury_engine.h
--- runtime/mercury_engine.h	25 Mar 2011 03:13:41 -0000	1.55
+++ runtime/mercury_engine.h	13 Apr 2011 12:03:38 -0000
@@ -347,18 +347,23 @@ typedef struct {
 **              that engine is available. When the call into the Mercury code
 **              finishes, c_depth is decremented.
 **
-** MR_eng_cpu_clock_ticks_offset
+** cpu_clock_ticks_offset
 **              The offset to be added to the CPU's TSC to give a time relative to the start of the program.
 **
-** MR_eng_ts_buffer
+** ts_buffer
 **              The buffer object used by threadscope for this engine.
 **
-** MR_eng_id    The ID of this engine which is used by threadscope.
+** id           The ID of this engine which is used by threadscope.
 **
-** MR_eng_next_spark_id
+** next_spark_id
 **              In threadscope grades sparks are given IDs to help us track
 **              them.  This and MR_eng_id is used to allocate unique IDs.
 **
+** spark_deque  The sparks generated by contexts executing on this engine.
+**
+** victim_counter
+**              The engine id of this engines' next victim for work stealing.
+**
 ** jmp_buf      The jump buffer used by library/exception.m to return to the
 **              runtime system on otherwise unhandled exceptions.
 **
@@ -408,6 +413,7 @@ typedef struct MR_mercury_engine_struct 
     MR_Dlist            *MR_eng_free_contexts;  /* elements are MR_Context */
 #endif
 #ifdef  MR_THREAD_SAFE
+    MR_EngineId         MR_eng_id;
     MercuryThread       MR_eng_owner_thread;
     MR_Unsigned         MR_eng_c_depth;
   #ifdef MR_THREADSCOPE
@@ -418,9 +424,12 @@ typedef struct MR_mercury_engine_struct 
     */
     MR_int_least64_t                    MR_eng_cpu_clock_ticks_offset;
     struct MR_threadscope_event_buffer  *MR_eng_ts_buffer;
-    MR_uint_least16_t                   MR_eng_id;
     MR_uint_least32_t                   MR_eng_next_spark_id;
   #endif
+  #ifdef MR_LL_PARALLEL_CONJ
+    MR_SparkDeque       MR_eng_spark_deque;
+    MR_EngineId         MR_eng_victim_counter;
+  #endif
 #endif
     jmp_buf             *MR_eng_jmp_buf;
     MR_Word             *MR_eng_exception;
Index: runtime/mercury_grade.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_grade.h,v
retrieving revision 1.82
diff -u -p -b -r1.82 mercury_grade.h
--- runtime/mercury_grade.h	20 Mar 2010 10:15:51 -0000	1.82
+++ runtime/mercury_grade.h	13 Apr 2011 13:03:17 -0000
@@ -58,14 +58,16 @@
 ** Searching for MR_RTTI_VERSION__ should find all code related to the
 ** RTTI version number.
 **
-** The MR_GRADE_EXEC_TRACE_VERSION_NO and MR_GRADE_DEEP_PROF_VERSION_NO
-** macros should be incremented when a change breaks binary backwards
-** compatibility only in debugging and deep profiling grades respectively.
+** The MR_GRADE_EXEC_TRACE_VERSION_NO, MR_GRADE_DEEP_PROF_VERSION_NO and
+** MR_GRADE_LLC_PAR_VERSION_NO macros should be incremented when a change breaks
+** binary backwards compatibility only in debugging, deep profiling and
+** low-level C parallel grades respectively.
 */
 
 #define MR_GRADE_PART_0 v16_
 #define MR_GRADE_EXEC_TRACE_VERSION_NO  9
 #define MR_GRADE_DEEP_PROF_VERSION_NO   3
+#define MR_GRADE_LLC_PAR_VERSION_NO 1
 
 #ifdef MR_HIGHLEVEL_CODE
 
@@ -85,6 +87,17 @@
     #define MR_GRADE_OPT_PART_2         MR_GRADE_OPT_PART_1
   #endif
 
+  /*
+  ** This grade component is repeated below version information.
+  */
+  #ifdef MR_THREAD_SAFE
+    #define MR_GRADE_PART_3       MR_PASTE2(MR_GRADE_PART_2, _par)
+    #define MR_GRADE_OPT_PART_3   MR_GRADE_OPT_PART_2 ".par"
+  #else
+    #define MR_GRADE_PART_3       MR_GRADE_PART_2
+    #define MR_GRADE_OPT_PART_3   MR_GRADE_OPT_PART_2
+  #endif
+
 #else /* ! MR_HIGHLEVEL_CODE */
 
   #ifdef MR_USE_ASM_LABELS
@@ -113,15 +126,18 @@
     #endif
   #endif
 
-#endif /* ! MR_HIGHLEVEL_CODE */
-
-#ifdef MR_THREAD_SAFE
-  #define MR_GRADE_PART_3       MR_PASTE2(MR_GRADE_PART_2, _par)
+  /*
+  ** This grade component is repeated above without the version information.
+  */
+  #ifdef MR_THREAD_SAFE
+    #define MR_GRADE_PART_3       MR_PASTE3(MR_GRADE_PART_2, _par, MR_GRADE_LLC_PAR_VERSION_NO)
   #define MR_GRADE_OPT_PART_3   MR_GRADE_OPT_PART_2 ".par"
-#else
+  #else
   #define MR_GRADE_PART_3       MR_GRADE_PART_2
   #define MR_GRADE_OPT_PART_3   MR_GRADE_OPT_PART_2
-#endif
+  #endif
+
+#endif /* ! MR_HIGHLEVEL_CODE */
 
 #if defined(MR_MPS_GC)
   #define MR_GRADE_PART_4       MR_PASTE2(MR_GRADE_PART_3, _mps)
Index: runtime/mercury_memory_zones.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_memory_zones.c,v
retrieving revision 1.37
diff -u -p -b -r1.37 mercury_memory_zones.c
--- runtime/mercury_memory_zones.c	5 Apr 2011 10:27:26 -0000	1.37
+++ runtime/mercury_memory_zones.c	13 Apr 2011 12:43:15 -0000
@@ -586,6 +586,9 @@ MR_create_or_reuse_zone(const char *name
         zone = MR_create_new_zone(size, redzone_size);
     }
 
+#ifdef  MR_DEBUG_STACK_SEGMENTS
+    MR_debug_log_message("Configuring zone");
+#endif
     zone->MR_zone_name = name;
 #ifdef MR_CHECK_OVERFLOW_VIA_MPROTECT
     zone->MR_zone_handler = handler;
@@ -808,6 +811,10 @@ MR_setup_redzones(MR_MemoryZone *zone)
 
     assert(size > redsize);
 
+#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
+    MR_debug_log_message("Setting up redzone of size: 0x%x.", redsize);
+#endif
+
     /*
     ** setup the redzone
     */
@@ -1018,6 +1025,14 @@ MR_get_free_zone(size_t size)
     MR_MemoryZonesFree  *zones_list_prev;
 
     /*
+    ** Before using the lock below see if there is at least one zone on the
+    ** list.
+    */
+    if (!free_memory_zones) {
+        return NULL;
+    }
+
+    /*
     ** Unlink the first zone on the free-list, link it onto the used-list
     ** and return it.
     */
Index: runtime/mercury_mm_own_stacks.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_mm_own_stacks.c,v
retrieving revision 1.12
diff -u -p -b -r1.12 mercury_mm_own_stacks.c
--- runtime/mercury_mm_own_stacks.c	11 Oct 2007 11:45:22 -0000	1.12
+++ runtime/mercury_mm_own_stacks.c	13 Apr 2011 12:03:37 -0000
@@ -558,11 +558,6 @@ MR_get_context_for_gen(MR_GeneratorPtr g
             generator);
         MR_copy_eng_this_context_fields(ctxt, MR_ENGINE(MR_eng_this_context));
         ctxt->MR_ctxt_next = NULL;
-#ifdef MR_LL_PARALLEL_CONJ
-        ctxt->MR_ctxt_spark_deque.MR_sd_active_array = NULL;
-        MR_init_wsdeque(&ctxt->MR_ctxt_spark_deque,
-            MR_INITIAL_LOCAL_SPARK_DEQUE_SIZE);
-#endif
     }
 
     ctxt->MR_ctxt_owner_generator = generator;
Index: runtime/mercury_par_builtin.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_par_builtin.h,v
retrieving revision 1.3
diff -u -p -b -r1.3 mercury_par_builtin.h
--- runtime/mercury_par_builtin.h	2 Apr 2011 05:41:11 -0000	1.3
+++ runtime/mercury_par_builtin.h	13 Apr 2011 12:03:38 -0000
@@ -129,6 +129,7 @@ vim: ft=c ts=4 sw=4 et
                                                                             \
                 ctxt->MR_ctxt_resume =                                      \
                     MR_ENTRY(mercury__par_builtin__wait_resume);            \
+                ctxt->MR_ctxt_resume_owner_engine = MR_ENGINE(MR_eng_id);   \
                 ctxt->MR_ctxt_next = Future->MR_fut_suspended;              \
                 Future->MR_fut_suspended = ctxt;                            \
                                                                             \
@@ -137,7 +138,11 @@ vim: ft=c ts=4 sw=4 et
                                                                             \
                 MR_maybe_post_stop_context;                                 \
                 MR_ENGINE(MR_eng_this_context) = NULL;                      \
-                MR_runnext();                                               \
+                /*                                                          \
+                ** MR_idle will try to run a different context as that has  \
+                ** good chance of unblocking the future.                    \
+                */                                                          \
+                MR_idle();                                                  \
             }                                                               \
         } while (0)
 
Index: runtime/mercury_thread.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.c,v
retrieving revision 1.45
diff -u -p -b -r1.45 mercury_thread.c
--- runtime/mercury_thread.c	13 Dec 2010 05:59:42 -0000	1.45
+++ runtime/mercury_thread.c	13 Apr 2011 13:04:20 -0000
@@ -29,11 +29,16 @@
   #endif
   MercuryLock       MR_global_lock;
   #ifndef MR_HIGHLEVEL_CODE
-  MercuryLock       MR_init_engine_array_lock;
+  static MercuryLock      MR_next_engine_id_lock;
+  static MR_EngineId      MR_next_engine_id = 0;
+
+  /*
+  ** This array is indexed by engine id.  No locking is necessary.
+  */
+  MercuryEngine **MR_all_engine_bases = NULL;
   #endif
 #endif
 
-volatile MR_bool    MR_exit_now;
 MR_bool             MR_debug_threads = MR_FALSE;
 
 MR_Unsigned         MR_num_thread_local_mutables = 0;
@@ -51,12 +56,6 @@ MR_Integer          MR_thread_barrier_co
 
 #ifdef MR_THREAD_SAFE
 
-  #if defined(MR_PTHREADS_WIN32)
-    #define SELF_THREAD_ID ((long) pthread_self().p)
-  #else
-    #define SELF_THREAD_ID ((long) pthread_self())
-  #endif
-
 static void *
 MR_create_thread_2(void *goal);
 
@@ -102,7 +101,6 @@ MR_create_thread_2(void *goal0)
         (goal->func)(goal->arg);
         /* XXX: We should clean up the engine here */
     } else {
-        MR_pin_thread();
         MR_init_thread(MR_use_later);
     }
 
@@ -117,6 +115,10 @@ MR_init_thread(MR_when_to_use when_to_us
     MercuryEngine   *eng;
 
 #ifdef MR_THREAD_SAFE
+  #ifdef MR_LL_PARALLEL_CONJ
+    unsigned        cpu;
+  #endif
+
     /*
     ** Check to see whether there is already an engine that is initialized
     ** in this thread.  If so we just return, there's nothing for us to do.
@@ -124,6 +126,24 @@ MR_init_thread(MR_when_to_use when_to_us
     if (MR_thread_engine_base != NULL) {
         return MR_FALSE;
     }
+  #ifdef MR_LL_PARALLEL_CONJ
+    switch (when_to_use) {
+        case MR_use_later:
+            cpu = MR_pin_thread();
+            break;
+        case MR_use_now:
+            /*
+            ** Don't pin the primordial thread here, it's already been done.
+            */
+            cpu = MR_primordial_thread_cpu;
+            break;
+        /*
+        ** TODO: We may use the cpu value here to determine which CPUs which
+        ** engines are on.  This can help with some interesting work stealing
+        ** algorithms.
+        */
+    }
+  #endif
 #endif
     eng = MR_create_engine();
 
@@ -134,17 +154,17 @@ MR_init_thread(MR_when_to_use when_to_us
     MR_engine_base_word = (MR_Word) eng;
   #endif
   #ifndef MR_HIGHLEVEL_CODE
-    MR_LOCK(&MR_init_engine_array_lock, "MR_init_thread");
-    {
-        int i;
-        for (i = 0; i < MR_num_threads; i++) {
-            if (!MR_all_engine_bases[i]) {
-                MR_all_engine_bases[i] = eng;
-                break;
-            }
-        }
-    }
-    MR_UNLOCK(&MR_init_engine_array_lock, "MR_init_thread");
+    MR_LOCK(&MR_next_engine_id_lock, "MR_init_thread");
+    eng->MR_eng_id = MR_next_engine_id++;
+    MR_UNLOCK(&MR_next_engine_id_lock, "MR_init_thread");
+
+    eng->MR_eng_victim_counter = (eng->MR_eng_id + 1) % MR_num_threads;
+
+    MR_all_engine_bases[eng->MR_eng_id] = eng;
+    MR_spark_deques[eng->MR_eng_id] = &(eng->MR_eng_spark_deque);
+    #ifdef MR_THREADSCOPE
+        MR_threadscope_setup_engine(eng);
+    #endif
   #endif
 #else
     MR_memcpy(&MR_engine_base, eng, sizeof(MercuryEngine));
@@ -154,6 +174,7 @@ MR_init_thread(MR_when_to_use when_to_us
 
 #ifdef  MR_THREAD_SAFE
     MR_ENGINE(MR_eng_owner_thread) = pthread_self();
+  #ifdef MR_LL_PARALLEL_CONJ
   #ifdef MR_THREADSCOPE
     /*
     ** TSC Synchronization is not used, support is commented out.  See
@@ -164,6 +185,7 @@ MR_init_thread(MR_when_to_use when_to_us
     }
     */
   #endif
+  #endif
 #endif
 
     switch (when_to_use) {
@@ -172,10 +194,9 @@ MR_init_thread(MR_when_to_use when_to_us
             MR_fatal_error("Sorry, not implemented: "
                 "--high-level-code and multiple engines");
 #else
-            /* This call may never return */
-            (void) MR_call_engine(MR_ENTRY(MR_do_runnext), MR_FALSE);
+            /* This call never returns */
+            (void) MR_call_engine(MR_ENTRY(MR_do_idle), MR_FALSE);
 #endif
-            MR_destroy_engine(eng);
             return MR_FALSE;
 
         case MR_use_now :
@@ -244,8 +265,12 @@ MR_mutex_lock(MercuryLock *lock, const c
     int err;
 
     fprintf(stderr, "%ld locking on %p (%s)\n",
-        SELF_THREAD_ID, lock, from);
+        MR_SELF_THREAD_ID, lock, from);
+    fflush(stderr);
     err = pthread_mutex_lock(lock);
+    fprintf(stderr, "%ld lock returned %d\n",
+        MR_SELF_THREAD_ID, err);
+    fflush(stderr);
     assert(err == 0);
     return err;
 }
@@ -256,8 +281,12 @@ MR_mutex_unlock(MercuryLock *lock, const
     int err;
 
     fprintf(stderr, "%ld unlocking on %p (%s)\n",
-        SELF_THREAD_ID, lock, from);
+        MR_SELF_THREAD_ID, lock, from);
+    fflush(stderr);
     err = pthread_mutex_unlock(lock);
+    fprintf(stderr, "%ld unlock returned %d\n",
+        MR_SELF_THREAD_ID, err);
+    fflush(stderr);
     assert(err == 0);
     return err;
 }
@@ -268,8 +297,12 @@ MR_cond_signal(MercuryCond *cond, const 
     int err;
 
     fprintf(stderr, "%ld signaling %p (%s)\n", 
-        SELF_THREAD_ID, cond, from);
+        MR_SELF_THREAD_ID, cond, from);
+    fflush(stderr);
     err = pthread_cond_signal(cond);
+    fprintf(stderr, "%ld signal returned %d\n",
+        MR_SELF_THREAD_ID, err);
+    fflush(stderr);
     assert(err == 0);
     return err;
 }
@@ -280,8 +313,12 @@ MR_cond_broadcast(MercuryCond *cond, con
     int err;
 
     fprintf(stderr, "%ld broadcasting %p (%s)\n", 
-        SELF_THREAD_ID, cond, from);
+        MR_SELF_THREAD_ID, cond, from);
+    fflush(stderr);
     err = pthread_cond_broadcast(cond);
+    fprintf(stderr, "%ld broadcast returned %d\n",
+        MR_SELF_THREAD_ID, err);
+    fflush(stderr);
     assert(err == 0);
     return err;
 }
@@ -292,8 +329,12 @@ MR_cond_wait(MercuryCond *cond, MercuryL
     int err;
 
     fprintf(stderr, "%ld waiting on cond: %p lock: %p (%s)\n", 
-        SELF_THREAD_ID, cond, lock, from);
+        MR_SELF_THREAD_ID, cond, lock, from);
+    fflush(stderr);
     err = pthread_cond_wait(cond, lock);
+    fprintf(stderr, "%ld wait returned %d\n",
+        MR_SELF_THREAD_ID, err);
+    fflush(stderr);
     assert(err == 0);
     return err;
 }
@@ -305,10 +346,44 @@ MR_cond_timed_wait(MercuryCond *cond, Me
     int err;
     
     fprintf(stderr, "%ld timed-waiting on cond: %p lock: %p (%s)\n",
-        SELF_THREAD_ID, cond, lock, from);
+        MR_SELF_THREAD_ID, cond, lock, from);
+    fflush(stderr);
     err = pthread_cond_timedwait(cond, lock, abstime);
     fprintf(stderr, "%ld timed-wait returned %d\n",
-        SELF_THREAD_ID, err);
+        MR_SELF_THREAD_ID, err);
+    fflush(stderr);
+    return err;
+}
+
+int
+MR_sem_wait(MercurySem *sem, const char *from)
+{
+    int err;
+
+    fprintf(stderr, "%ld waiting on sem: %p (%s)\n",
+        MR_SELF_THREAD_ID, sem, from);
+    fflush(stderr);
+    err = sem_wait(sem);
+    fprintf(stderr, "%ld wait returned %d\n",
+        MR_SELF_THREAD_ID, err);
+    fflush(stderr);
+
+    return err;
+}
+
+int
+MR_sem_post(MercurySem *sem, const char *from)
+{
+    int err;
+
+    fprintf(stderr, "%ld posting to sem: %p (%s)\n",
+        MR_SELF_THREAD_ID, sem, from);
+    fflush(stderr);
+    err = sem_post(sem);
+    fprintf(stderr, "%ld post returned %d\n",
+        MR_SELF_THREAD_ID, err);
+    fflush(stderr);
+
     return err;
 }
 
@@ -366,3 +441,27 @@ MR_clone_thread_local_mutables(const MR_
 
     return new_muts;
 }
+
+#ifdef MR_THREAD_SAFE
+void
+MR_init_thread_stuff(void) {
+    int i;
+
+    pthread_mutex_init(&MR_next_engine_id_lock, MR_MUTEX_ATTR);
+    pthread_mutex_init(&MR_global_lock, MR_MUTEX_ATTR);
+  #ifndef MR_THREAD_LOCAL_STORAGE
+    MR_KEY_CREATE(&MR_engine_base_key, NULL);
+  #endif
+    MR_KEY_CREATE(&MR_exception_handler_key, NULL);
+    pthread_mutex_init(&MR_thread_barrier_lock, MR_MUTEX_ATTR);
+  #ifdef MR_HIGHLEVEL_CODE
+    pthread_cond_init(&MR_thread_barrier_cond, MR_COND_ATTR);
+  #endif
+
+    MR_all_engine_bases = MR_GC_malloc(sizeof(MercuryEngine*)*MR_num_threads);
+    for (i = 0; i < MR_num_threads; i++) {
+        MR_all_engine_bases[i] = NULL;
+    }
+}
+#endif
+
Index: runtime/mercury_thread.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.h,v
retrieving revision 1.33
diff -u -p -b -r1.33 mercury_thread.h
--- runtime/mercury_thread.h	13 Dec 2010 05:59:42 -0000	1.33
+++ runtime/mercury_thread.h	13 Apr 2011 12:03:39 -0000
@@ -15,6 +15,7 @@
 
   #include <signal.h>   /* for sigset_t on the SPARC */
   #include <pthread.h>
+  #include <semaphore.h> /* POSIX semaphores */
 
   #if defined(MR_DIGITAL_UNIX_PTHREADS)
     #define MR_MUTEX_ATTR       pthread_mutexattr_default
@@ -31,6 +32,8 @@
   typedef pthread_mutex_t   MercuryLock;
   typedef pthread_cond_t    MercuryCond;
 
+  typedef sem_t             MercurySem;
+
 extern int
 MR_mutex_lock(MercuryLock *lock, const char *from);
 extern int
@@ -45,6 +48,11 @@ extern int
 MR_cond_timed_wait(MercuryCond *cond, MercuryLock *lock, 
     const struct timespec *abstime, const char *from);
 
+extern int
+MR_sem_wait(MercurySem *sem, const char *from);
+extern int
+MR_sem_post(MercurySem *sem, const char *from);
+
    #if defined(MR_PTHREADS_WIN32)
 extern MercuryThread
 MR_null_thread(void);
@@ -54,6 +62,12 @@ MR_null_thread(void);
 
   #define MR_thread_equal(a, b)       pthread_equal((a), (b))
 
+  #if defined(MR_PTHREADS_WIN32)
+    #define MR_SELF_THREAD_ID ((long) pthread_self().p)
+  #else
+    #define MR_SELF_THREAD_ID ((long) pthread_self())
+  #endif
+
   extern MR_bool    MR_debug_threads;
 
   #ifndef MR_DEBUG_THREADS
@@ -70,6 +84,9 @@ MR_null_thread(void);
     #define MR_WAIT(cnd, mtx, from) pthread_cond_wait((cnd), (mtx))
     #define MR_TIMED_WAIT(cond, mtx, abstime, from)                         \
         pthread_cond_timedwait((cond), (mtx), (abstime))
+
+    #define MR_SEM_POST(sem, from)  sem_post((sem))
+    #define MR_SEM_WAIT(sem, from)  sem_wait((sem))
   #else
     #define MR_LOCK(lck, from)                          \
                 ( MR_debug_threads ?                    \
@@ -108,6 +125,21 @@ MR_null_thread(void);
         :                                                                   \
             pthread_cond_timedwait((cond), (mtx), (abstime))                \
         )
+
+    #define MR_SEM_WAIT(sem, from)                      \
+        ( MR_debug_threads ?                            \
+            MR_sem_wait((sem), (from))                  \
+        :                                               \
+            sem_wait((sem))                             \
+        )
+
+    #define MR_SEM_POST(sem, from)                      \
+        ( MR_debug_threads ?                            \
+            MR_sem_post((sem), (from))                  \
+        :                                               \
+            sem_post((sem))                             \
+        )
+
   #endif
 
     /*
@@ -147,7 +179,6 @@ MR_null_thread(void);
 
   extern MercuryThread      *MR_create_thread(MR_ThreadGoal *);
   extern void               MR_destroy_thread(void *eng);
-  extern volatile MR_bool   MR_exit_now;
 
   /*
   ** The primordial thread. Currently used for debugging.
@@ -180,6 +211,11 @@ MR_null_thread(void);
   */
   extern MercuryThreadKey   MR_exception_handler_key;
 
+  /*
+  ** The CPU that the primordial thread is running on.
+  */
+  extern MR_Unsigned        MR_primordial_thread_cpu;
+
 #else /* not MR_THREAD_SAFE */
 
   #define MR_LOCK(nothing, from)        do { } while (0)
@@ -317,4 +353,10 @@ MR_clone_thread_local_mutables(const MR_
         MR_UNLOCK(&tlm->MR_tlm_lock, "MR_set_thread_local_mutable");    \
     } while (0)
 
+/*
+** Initialise some static structures in mercury_thread.c
+*/
+void
+MR_init_thread_stuff(void);
+
 #endif  /* MERCURY_THREAD_H */
Index: runtime/mercury_threadscope.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_threadscope.c,v
retrieving revision 1.10
diff -u -p -b -r1.10 mercury_threadscope.c
--- runtime/mercury_threadscope.c	2 Apr 2011 05:41:11 -0000	1.10
+++ runtime/mercury_threadscope.c	13 Apr 2011 12:03:39 -0000
@@ -158,8 +158,6 @@ ENDINIT
 /*
 ** Duncan Coutts has reserved IDs 33-37 in a discussion via IRC.
 */
-#define MR_TS_EVENT_LOOKING_FOR_GLOBAL_WORK \
-                                        38 /* () */
 #define MR_TS_EVENT_STRING              39 /* (string, id) */
 #define MR_TS_EVENT_CALL_MAIN           40 /* () */
 
@@ -181,8 +179,10 @@ ENDINIT
 #define MR_TS_MER_EVENT_FUT_WAIT_NOSUSPEND  105 /* (fut id) */
 #define MR_TS_MER_EVENT_FUT_WAIT_SUSPENDED  106 /* (fut id) */
 #define MR_TS_MER_EVENT_FUT_SIGNAL          107 /* (fut id) */
-
-#define MR_TS_NUM_MER_EVENTS            8
+#define MR_TS_MER_EVENT_LOOKING_FOR_GLOBAL_CONTEXT \
+                                            108 /* () */
+#define MR_TS_MER_EVENT_WORK_STEALING       109 /* () */
+#define MR_TS_NUM_MER_EVENTS                10
 
 #if 0  /* DEPRECATED EVENTS: */
 #define EVENT_CREATE_SPARK        13 /* (cap, thread) */
@@ -219,7 +219,7 @@ ENDINIT
 /***************************************************************************/
 
 struct MR_threadscope_event_buffer {
-    MR_UnsignedChar     MR_tsbuffer_data[MR_TS_BUFFERSIZE];
+    unsigned char       MR_tsbuffer_data[MR_TS_BUFFERSIZE];
 
     /* The current writing position in the buffer. */
     MR_Unsigned         MR_tsbuffer_pos;
@@ -399,11 +399,6 @@ static EventTypeDesc event_type_descs[] 
         0
     },
     {
-        MR_TS_EVENT_LOOKING_FOR_GLOBAL_WORK,
-        "Engine begins looking for global work",
-        0
-    },
-    {
         MR_TS_EVENT_CAPSET_CREATE,
         "Create an engine set",
         SZ_CAPSET_ID + SZ_CAPSET_TYPE
@@ -492,6 +487,16 @@ static EventTypeDesc event_type_descs[] 
         SZ_FUTURE_ID
     },
     {
+        MR_TS_MER_EVENT_LOOKING_FOR_GLOBAL_CONTEXT,
+        "Engine begins looking for a context to execute",
+        0
+    },
+    {
+        MR_TS_MER_EVENT_WORK_STEALING,
+        "Engine begins attempting to steal work",
+        0
+    },
+    {
         /* Mark the end of this array. */
         MR_TS_NUM_EVENT_TAGS, NULL, 0
     }
@@ -516,9 +521,6 @@ static char* MR_threadscope_output_filen
 */
 static MR_uint_least64_t MR_primordial_first_tsc;
 
-static MercuryLock      MR_next_engine_id_lock;
-static MR_EngineId      MR_next_engine_id = 0;
-
 static Timedelta        MR_global_offset;
 
 static struct MR_threadscope_event_buffer global_buffer;
@@ -854,9 +856,6 @@ MR_setup_threadscope(void)
     /* This value is used later when setting up the primordial engine */
     MR_primordial_first_tsc = MR_read_cpu_tsc();
 
-    /* Setup locks. */
-    pthread_mutex_init(&MR_next_engine_id_lock, MR_MUTEX_ATTR);
-
     /*
     ** These variables are used for TSC synchronization which is not used.  See
     ** below.
@@ -919,10 +918,7 @@ MR_threadscope_setup_engine(MercuryEngin
     MR_DO_THREADSCOPE_DEBUG(
         fprintf(stderr, "In threadscope setup engine thread: 0x%lx\n", pthread_self())
     );
-    MR_LOCK(&MR_next_engine_id_lock, "MR_get_next_engine_id");
-    eng->MR_eng_id = MR_next_engine_id++;
     eng->MR_eng_next_spark_id = 0;
-    MR_UNLOCK(&MR_next_engine_id_lock, "MR_get_next_engine_id");
 
     if (eng->MR_eng_id == 0) {
         MR_global_offset = -MR_primordial_first_tsc;
@@ -1264,6 +1260,7 @@ MR_threadscope_post_run_spark(MR_SparkId
 {
     struct MR_threadscope_event_buffer  *buffer;
     MR_Context                          *context;
+    MR_ContextId                        context_id;
 
     buffer = MR_thread_engine_base->MR_eng_ts_buffer;
     context = MR_thread_engine_base->MR_eng_this_context;
@@ -1278,7 +1275,12 @@ MR_threadscope_post_run_spark(MR_SparkId
 
     put_event_header(buffer, MR_TS_EVENT_RUN_SPARK,
         get_current_time_nanosecs());
-    put_context_id(buffer, context->MR_ctxt_num_id);
+    if (context) {
+        context_id = context->MR_ctxt_num_id;
+    } else {
+        context_id = 0xFFFFFFFF;
+    }
+    put_context_id(buffer, context_id);
     put_spark_id(buffer, spark_id);
     MR_US_UNLOCK(&(buffer->MR_tsbuffer_lock));
 }
@@ -1288,6 +1290,7 @@ MR_threadscope_post_steal_spark(MR_Spark
 {
     struct MR_threadscope_event_buffer  *buffer;
     MR_Context                          *context;
+    MR_ContextId                        context_id;
     unsigned                            engine_id;
 
     buffer = MR_thread_engine_base->MR_eng_ts_buffer;
@@ -1303,7 +1306,13 @@ MR_threadscope_post_steal_spark(MR_Spark
 
     put_event_header(buffer, MR_TS_EVENT_STEAL_SPARK,
         get_current_time_nanosecs());
-    put_context_id(buffer, context->MR_ctxt_num_id);
+    if (context) {
+        context_id = context->MR_ctxt_num_id;
+    } else {
+        context_id = 0xFFFFFFFF;
+    }
+    put_context_id(buffer, context_id);
+
     /*
     ** The engine that created the spark (which may not be whom it was stolen
     ** from if different work-stealking algorithms are implemented) can be
@@ -1354,18 +1363,36 @@ MR_threadscope_post_calling_main(void) {
 }
 
 void
-MR_threadscope_post_looking_for_global_work(void) {
+MR_threadscope_post_looking_for_global_context(void) {
+    struct MR_threadscope_event_buffer *buffer = MR_ENGINE(MR_eng_ts_buffer);
+
+    MR_US_SPIN_LOCK(&(buffer->MR_tsbuffer_lock));
+    if (!enough_room_for_event(buffer,
+            MR_TS_MER_EVENT_LOOKING_FOR_GLOBAL_CONTEXT)) {
+        flush_event_buffer(buffer);
+        open_block(buffer, MR_ENGINE(MR_eng_id));
+    } else if (!block_is_open(buffer)) {
+        open_block(buffer, MR_ENGINE(MR_eng_id));
+    }
+
+    put_event_header(buffer, MR_TS_MER_EVENT_LOOKING_FOR_GLOBAL_CONTEXT,
+        get_current_time_nanosecs());
+    MR_US_UNLOCK(&(buffer->MR_tsbuffer_lock));
+}
+
+void
+MR_threadscope_post_work_stealing(void) {
     struct MR_threadscope_event_buffer *buffer = MR_ENGINE(MR_eng_ts_buffer);
 
     MR_US_SPIN_LOCK(&(buffer->MR_tsbuffer_lock));
-    if (!enough_room_for_event(buffer, MR_TS_EVENT_LOOKING_FOR_GLOBAL_WORK)) {
+    if (!enough_room_for_event(buffer, MR_TS_MER_EVENT_WORK_STEALING)) {
         flush_event_buffer(buffer);
         open_block(buffer, MR_ENGINE(MR_eng_id));
     } else if (!block_is_open(buffer)) {
         open_block(buffer, MR_ENGINE(MR_eng_id));
     }
 
-    put_event_header(buffer, MR_TS_EVENT_LOOKING_FOR_GLOBAL_WORK,
+    put_event_header(buffer, MR_TS_MER_EVENT_WORK_STEALING,
         get_current_time_nanosecs());
     MR_US_UNLOCK(&(buffer->MR_tsbuffer_lock));
 }
Index: runtime/mercury_threadscope.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_threadscope.h,v
retrieving revision 1.7
diff -u -p -b -r1.7 mercury_threadscope.h
--- runtime/mercury_threadscope.h	2 Apr 2011 05:41:11 -0000	1.7
+++ runtime/mercury_threadscope.h	13 Apr 2011 12:03:39 -0000
@@ -36,7 +36,6 @@
 
 typedef struct MR_threadscope_event_buffer MR_threadscope_event_buffer_t;
 
-typedef MR_uint_least16_t   MR_EngineId;
 typedef MR_uint_least16_t   MR_ContextStopReason;
 typedef MR_Integer          MR_ContextId;
 typedef MR_uint_least32_t   MR_TS_StringId;
@@ -158,10 +157,16 @@ extern void
 MR_threadscope_post_calling_main(void);
 
 /*
-** Post this message when a thread begins looking for work in MR_do_runnext
+** Post this message when a thread begins looking for a context to run.
 */
 extern void
-MR_threadscope_post_looking_for_global_work(void);
+MR_threadscope_post_looking_for_global_context(void);
+
+/*
+** Post this message when a thread is about to attempt work stealing.
+*/
+extern void
+MR_threadscope_post_work_stealing(void);
 
 /*
 ** Post this message before a parallel conjunction starts.
Index: runtime/mercury_types.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_types.h,v
retrieving revision 1.60
diff -u -p -b -r1.60 mercury_types.h
--- runtime/mercury_types.h	4 Apr 2011 07:10:40 -0000	1.60
+++ runtime/mercury_types.h	13 Apr 2011 12:03:38 -0000
@@ -315,4 +315,6 @@ typedef struct MR_RegionProfUnit_Struct 
 
 typedef struct MR_Future_Struct                 MR_Future;
 
+typedef MR_uint_least16_t   MR_EngineId;
+
 #endif /* not MERCURY_TYPES_H */
Index: runtime/mercury_wrapper.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.c,v
retrieving revision 1.219
diff -u -p -b -r1.219 mercury_wrapper.c
--- runtime/mercury_wrapper.c	13 Apr 2011 06:29:57 -0000	1.219
+++ runtime/mercury_wrapper.c	13 Apr 2011 12:03:38 -0000
@@ -216,17 +216,6 @@ MR_Unsigned MR_max_contexts_per_thread =
 #endif
 MR_Unsigned MR_max_outstanding_contexts;
 
-#ifdef MR_LL_PARALLEL_CONJ
-/*
-** In grades that support parallel conjunctions, an idle engine can steal
-** parallel work from Mercury contexts. The following variables control the
-** maximum number of contexts that an idle engine will try to steal from
-** before resting, and how long to rest before attempting another steal.
-*/
-MR_Unsigned MR_worksteal_max_attempts = 24;
-MR_Unsigned MR_worksteal_sleep_msecs = 2;
-#endif
-
 /* file names for mdb's debugger I/O streams */
 const char  *MR_mdb_in_filename = NULL;
 const char  *MR_mdb_out_filename = NULL;
@@ -629,7 +618,11 @@ mercury_runtime_init(int argc, char **ar
     (*MR_address_of_mercury_init_io)();
 
 #ifdef MR_THREAD_SAFE
-    /* MR_init_thread_stuff() must be called prior to MR_init_memory() */
+    /*
+    ** MR_init_context_stuff() and MR_init_thread_stuff() must be called prior
+    ** to MR_init_memory()
+    */
+    MR_init_context_stuff();
     MR_init_thread_stuff();
     MR_max_outstanding_contexts = MR_max_contexts_per_thread * MR_num_threads;
 #ifdef MR_LL_PARALLEL_CONJ
@@ -677,13 +670,6 @@ mercury_runtime_init(int argc, char **ar
     (*MR_address_of_init_modules_threadscope_string_table)();
   #endif
 
-    MR_all_engine_bases = MR_GC_malloc(sizeof(MercuryEngine*)*MR_num_threads);
-    {
-        int i;
-        for (i = 0; i < MR_num_threads; i++) {
-            MR_all_engine_bases[i] = NULL;
-        }
-    }
 #endif
 
     /*
@@ -698,8 +684,6 @@ mercury_runtime_init(int argc, char **ar
     {
         int i;
 
-        MR_exit_now = MR_FALSE;
-
         for (i = 1; i < MR_num_threads; i++) {
             MR_create_thread(NULL);
         }
@@ -1409,8 +1393,6 @@ struct MR_option MR_long_opts[] = {
     { "max-contexts-per-thread",        1, 0, MR_MAX_CONTEXTS_PER_THREAD },
     { "runtime-granularity-wsdeque-length-factor", 1, 0,
         MR_RUNTIME_GRANULAITY_WSDEQUE_LENGTH_FACTOR },
-    { "worksteal-max-attempts",         1, 0, MR_WORKSTEAL_MAX_ATTEMPTS },
-    { "worksteal-sleep-msecs",          1, 0, MR_WORKSTEAL_SLEEP_MSECS },
     { "no-thread-pinning",              0, 0, MR_THREAD_PINNING },
     { "profile-parallel-execution",     0, 0, MR_PROFILE_PARALLEL_EXECUTION },
     { "mdb-tty",                        1, 0, MR_MDB_TTY },
@@ -1846,26 +1828,6 @@ MR_process_options(int argc, char **argv
 #endif
                 break;
 
-            case MR_WORKSTEAL_MAX_ATTEMPTS:
-#ifdef MR_LL_PARALLEL_CONJ
-                if (sscanf(MR_optarg, "%"MR_INTEGER_LENGTH_MODIFIER"u",
-			&MR_worksteal_max_attempts) != 1)
-		{
-                    MR_usage();
-                }
-#endif
-                break;
-
-            case MR_WORKSTEAL_SLEEP_MSECS:
-#ifdef MR_LL_PARALLEL_CONJ
-                if (sscanf(MR_optarg, "%"MR_INTEGER_LENGTH_MODIFIER"u",
-			&MR_worksteal_sleep_msecs) != 1)
-		{
-                    MR_usage();
-                }
-#endif
-                break;
-
             case MR_THREAD_PINNING:
 #if defined(MR_LL_PARALLEL_CONJ) && defined(MR_HAVE_SCHED_SETAFFINITY)
                 MR_thread_pinning_configured = MR_FALSE;
@@ -2896,7 +2858,7 @@ MR_define_label(global_success);
         MR_UNLOCK(&MR_thread_barrier_lock, "global_success");
 
         MR_ENGINE(MR_eng_this_context) = NULL;
-        MR_runnext();
+        MR_idle();
     }
 
 MR_define_label(global_success_2);
@@ -3063,14 +3025,7 @@ mercury_runtime_terminate(void)
     }
 
 #if !defined(MR_HIGHLEVEL_CODE) && defined(MR_THREAD_SAFE)
-    MR_LOCK(&MR_runqueue_lock, "exit_now");
-    MR_exit_now = MR_TRUE;
-    pthread_cond_broadcast(&MR_runqueue_cond);
-    MR_UNLOCK(&MR_runqueue_lock, "exit_now");
-
-    while (MR_num_exited_engines < MR_num_threads - 1) {
-        MR_ATOMIC_PAUSE;
-    }
+    MR_shutdown_all_engines();
 
 #ifdef MR_THREADSCOPE
     if (MR_ENGINE(MR_eng_ts_buffer)) {
@@ -3082,7 +3037,7 @@ mercury_runtime_terminate(void)
     assert(MR_thread_equal(MR_primordial_thread, pthread_self()));
     MR_primordial_thread = MR_null_thread();
 
-    MR_finalize_thread_stuff();
+    MR_finalize_context_stuff();
 #endif
 
 #ifdef MR_HAVE_SYS_STAT_H
Index: runtime/mercury_wrapper.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.h,v
retrieving revision 1.86
diff -u -p -b -r1.86 mercury_wrapper.h
--- runtime/mercury_wrapper.h	5 Apr 2011 10:27:26 -0000	1.86
+++ runtime/mercury_wrapper.h	13 Apr 2011 12:03:38 -0000
@@ -259,10 +259,6 @@ extern	MR_Unsigned	MR_contexts_per_threa
 */
 extern	MR_Unsigned	MR_max_outstanding_contexts;
 
-/* work-stealing tunables (documented in mercury_wrapper.c) */
-extern	MR_Unsigned MR_worksteal_max_attempts;
-extern	MR_Unsigned MR_worksteal_sleep_msecs;
-
 extern  MR_Unsigned MR_num_threads;
 
 #if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
Index: runtime/mercury_wsdeque.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wsdeque.c,v
retrieving revision 1.3
diff -u -p -b -r1.3 mercury_wsdeque.c
--- runtime/mercury_wsdeque.c	25 Mar 2011 03:13:42 -0000	1.3
+++ runtime/mercury_wsdeque.c	13 Apr 2011 12:03:37 -0000
@@ -37,10 +37,7 @@ MR_init_wsdeque(MR_SparkDeque *dq, MR_In
 {
     dq->MR_sd_bottom = 0;
     dq->MR_sd_top = 0;
-    if (dq->MR_sd_active_array == NULL) {
-        /* The context might already have a deque if it is being recycled. */
         dq->MR_sd_active_array = MR_alloc_spark_array(size);
-    }
 }
 
 MR_bool
Index: runtime/mercury_wsdeque.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wsdeque.h,v
retrieving revision 1.5
diff -u -p -b -r1.5 mercury_wsdeque.h
--- runtime/mercury_wsdeque.h	25 Mar 2011 03:13:42 -0000	1.5
+++ runtime/mercury_wsdeque.h	13 Apr 2011 13:05:41 -0000
@@ -14,9 +14,8 @@
 
 #include "mercury_atomic_ops.h"
 
-/* XXX should experiment with these */
-#define MR_INITIAL_GLOBAL_SPARK_QUEUE_SIZE  4
-#define MR_INITIAL_LOCAL_SPARK_DEQUE_SIZE   8
+/* XXX should experiment with this, perhaps it should be configurable. */
+#define MR_INITIAL_SPARK_DEQUE_SIZE   8
 
 /*---------------------------------------------------------------------------*/
 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 489 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20110413/0082baca/attachment.sig>


More information about the reviews mailing list