[m-rev.] for review: work stealing for parallel conjunctions

Peter Wang novalazy at gmail.com
Fri Nov 7 17:56:41 AEDT 2008


Benchmark results at a later date.

Branches: main

Implement work stealing for parallel conjunctions.  This builds on an older
patch which introduced work-stealing deques to the runtime but didn't
perform work stealing.

Previously when we came across a parallel conjunct, we would place a spark
into either the _global spark queue_ or the _local spark stack_ of the
Mercury context.  A spark on the global spark queue may be picked up for
parallel execution by an idle Mercury engine, whereas a spark on a local
spark stack is confined to execution in the context that originated it.

The problem is that we have to decide, ahead of time, where to put a spark.
Ideally, we should have just enough sparks in the global queue to keep the
available Mercury engines busy, and leave the rest of the sparks to execute
in their original contexts since that is more efficient.  But we can't
predict the future so have to make do with guesses using simple heuristics.
A bad decision, once made, cannot be reversed.  An engine may sit idle due
to an empty global spark queue, even while there are sparks available in
some local spark stacks.

In the work stealing scheme, sparks are always placed into each context's
_local spark deque_.  Idle engines actively try to steal sparks from random
spark deques.  We don't need to make irreversible and potentially suboptimal
decisions about where to put sparks.  Making a spark available for parallel
execution is cheap and happens by default because of the work-stealing
deques; putting a spark on a global queue implies synchronisation with other
threads.  The downside is that idle engines need to expend more time and
effort to find the work from multiple places instead of just one place.

Practically, the new scheme seems to work as well as the old scheme and vice
versa, except that the old scheme often required `--max-context-per-threads'
to be set "correctly" to get good results.

Only tested on x86-64, which has a relatively constrained memory model.


configure.in:
runtime/mercury_conf.h.in:
      Check if sched_yield() is available.

      Account for changed size of sync terms.

runtime/mercury_atomic_ops.c:
runtime/mercury_atomic_ops.h:
      Add an atomic decrement operation.

runtime/mercury_context.c:
      Keep pointers to all spark deques in a flat array, so we have access
      to them for stealing.

      Add code to steal sparks in MR_do_runnext.  Clean up MR_do_runnext.

      Add MR_sched_yield().

      Delete references to the global spark queue, MR_num_idle_engines,
      MR_num_outstanding_contexts_and_sparks.

runtime/mercury_context.h:
      Make sparks point to their parent sync terms so the MR_st_is_shared
      field can be modified when a spark is stolen.

      Move the parent_sp field from sparks to sync terms as it is the same
      for all sparks from the same parallel conjunction.

      Change MR_SyncTerm, MR_fork_new_child(), MR_join_and_continue() for
      work-stealing.  Under the old scheme, we could rely on the fact that a
      sync term which was "unshared" (placed on a local spark stack) would
      remain unshared, and so we could avoid costly locks or atomic
      instructions when decrementing the "count" field of a sync term.  This
      no longer holds, as sync terms may become shared at any time by having
      one of its sparks stolen.  Fortunately, with a little extra work, we
      can still avoid costly operations in the common case.

      Replace MR_choose_parallel_over_sequential_cond() by a dummy
      definition, as the choice no longer exists.

runtime/mercury_thread.c:
      Use atomic decrement to count the number of engines we are waiting for
      at start up, since we have atomic decrement now.

runtime/mercury_thread.h:
      Add MR_TIMED_WAIT wrapper for pthread_cond_wait().

runtime/mercury_wrapper.c:
runtime/mercury_wrapper.h:
      Add MERCURY_OPTIONS `--worksteal-max-attempts' and
      `--worksteal-sleep-msecs' and associated globals.

      Delete `--max-contexts-per-thread' option.

      Count the number of engines we are waiting for at startup.

runtime/mercury_wsdeque.h:
      Make MR_wsdeque_pop_bottom() return only the resume code address after
      successfully popping a spark, as that is the only part of the spark
      needed now.

doc/user_guide.texi:
      Add commented out documentation for two new tunable parameters,
      `--worksteal-max-attempts' and `--worksteal-sleep-msecs'.
      Implementors may want to experiment with different values but end
      users shouldn't need to know about them.

diff --git a/configure.in b/configure.in
index e02ff6b..1f7a42d 100644
--- a/configure.in
+++ b/configure.in
@@ -1086,7 +1086,7 @@ mercury_check_for_functions \
         getpid setpgid fork execlp wait kill \
         grantpt unlockpt ptsname tcgetattr tcsetattr ioctl \
         access sleep opendir readdir closedir mkdir symlink readlink \
-        gettimeofday setenv putenv _putenv posix_spawn
+        gettimeofday setenv putenv _putenv posix_spawn sched_yield
 
 #-----------------------------------------------------------------------------#
 
@@ -1860,8 +1860,10 @@ AC_CACHE_VAL(mercury_cv_sync_term_size,
     int main() {
         struct {
             void        *orig_context;
-            int     count;
-            int         is_shared;
+            void        *parent_sp;
+            int         count;
+            short       is_shared;
+            short       attempt_cheap_join;
         } x;
         FILE *fp;
 
diff --git a/doc/user_guide.texi b/doc/user_guide.texi
index b661daf..0e14b87 100644
--- a/doc/user_guide.texi
+++ b/doc/user_guide.texi
@@ -9781,6 +9781,18 @@ multiplied by the word size in bytes.
 @c Sets the size of the redzone on the trail to @var{size} kilobytes
 @c multiplied by the word size in bytes.
 
+ at c @sp 1
+ at c @item --worksteal-max-attempts @var{attempts}
+ at c @findex --worksteal-max-attempts (runtime option)
+ at c Tells idle Mercury engines to attempt to steal parallel conjuncts
+ at c up to a maximum of @var{attempts} times before sleeping.
+
+ at c @sp 1
+ at c @item --worksteal-sleep-msecs @var{milliseconds}
+ at c @findex --worksteal-sleep-msecs (runtime option)
+ at c Sets the amount of time that an idle Mercury engine should sleep before
+ at c attempting more work-stealing attempts.
+
 @sp 1
 @item -i @var{filename}
 @itemx --mdb-in @var{filename}
diff --git a/runtime/mercury_atomic_ops.c b/runtime/mercury_atomic_ops.c
index f7213ed..5d4fcea 100644
--- a/runtime/mercury_atomic_ops.c
+++ b/runtime/mercury_atomic_ops.c
@@ -22,12 +22,21 @@
 */
 
 MR_OUTLINE_DEFN(
+    int
+    MR_sub1_and_fetch_int(volatile int *addr)
+,
+    {
+        MR_SUB1_AND_FETCH_INT_BODY;
+    }
+)
+
+MR_OUTLINE_DEFN(
     MR_bool
     MR_compare_and_swap_word(volatile MR_Integer *addr, MR_Integer old,
-	    MR_Integer new_val)
+        MR_Integer new_val)
 ,
     {
-	MR_COMPARE_AND_SWAP_WORD_BODY;
+        MR_COMPARE_AND_SWAP_WORD_BODY;
     }
 )
 
diff --git a/runtime/mercury_atomic_ops.h b/runtime/mercury_atomic_ops.h
index 4cd2cc0..5b71e77 100644
--- a/runtime/mercury_atomic_ops.h
+++ b/runtime/mercury_atomic_ops.h
@@ -18,6 +18,10 @@
 
 #if defined(MR_LL_PARALLEL_CONJ)
 
+/* Atomic decrement. */
+MR_EXTERN_INLINE int
+MR_sub1_and_fetch_int(volatile int *addr);
+
 /*
 ** If the value at addr is equal to old, assign new to addr and return true.
 ** Otherwise return false.
@@ -30,46 +34,78 @@ MR_compare_and_swap_word(volatile MR_Integer *addr, MR_Integer old,
 
 #if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
 
-    /*
-    ** gcc 4.1 and above have builtin atomic operations.
-    */
-    #define MR_COMPARE_AND_SWAP_WORD_BODY                                   \
+    /* gcc 4.1 and above have builtin atomic operations. */
+
+    #define MR_SUB1_AND_FETCH_INT_BODY                                      \
         do {                                                                \
-            return __sync_bool_compare_and_swap(addr, old, new_val);        \
+            return __sync_sub_and_fetch(addr, 1);                           \
         } while (0)
 
-#elif defined(__GNUC__) && defined(__x86_64__)
-
     #define MR_COMPARE_AND_SWAP_WORD_BODY                                   \
         do {                                                                \
-            char result;                                                    \
-                                                                            \
-            __asm__ __volatile__(                                           \
-                "lock; cmpxchgq %3, %0; setz %1"                            \
-                : "=m"(*addr), "=q"(result)                                 \
-                : "m"(*addr), "r" (new_val), "a"(old)                       \
-                : "memory"                                                  \
-            );                                                              \
-            return (int) result;                                            \
+            return __sync_bool_compare_and_swap(addr, old, new_val);        \
         } while (0)
 
-#elif defined(__GNUC__) && defined(__i386__)
+#elif defined(__GNUC__)
 
-    /* Really 486 or better. */
-    #define MR_COMPARE_AND_SWAP_WORD_BODY                                   \
-        do {                                                                \
-            char result;                                                    \
+    /* In the following, __i386__ really means 486 or above. */
+
+    #if defined(__x86_64__) || defined(__i386__)
+        #define MR_SUB1_AND_FETCH_INT_BODY                                  \
+            do {                                                            \
+                int result;                                                 \
                                                                             \
-            __asm__ __volatile__(                                           \
-                "lock; cmpxchgl %3, %0; setz %1"                            \
-                : "=m"(*addr), "=q"(result)                                 \
-                : "m"(*addr), "r" (new_val), "a"(old)                       \
-                : "memory");                                                \
-            return (int) result;                                            \
-        } while (0)
+                __asm__ __volatile__(                                       \
+                    "lock; xaddl %0, %1"                                    \
+                    : "=r"(result), "=m"(*addr)                             \
+                    : "0"(-1), "m"(*addr)                                   \
+                    : "memory"                                              \
+                );                                                          \
+                return result - 1;                                          \
+            } while (0)
+    #endif
+
+    #if defined(__x86_64__)
+
+        #define MR_COMPARE_AND_SWAP_WORD_BODY                               \
+            do {                                                            \
+                char result;                                                \
+                                                                            \
+                __asm__ __volatile__(                                       \
+                    "lock; cmpxchgq %3, %0; setz %1"                        \
+                    : "=m"(*addr), "=q"(result)                             \
+                    : "m"(*addr), "r" (new_val), "a"(old)                   \
+                    : "memory"                                              \
+                );                                                          \
+                return (int) result;                                        \
+            } while (0)
+
+    #elif defined(__i386__)
+
+        #define MR_COMPARE_AND_SWAP_WORD_BODY                               \
+            do {                                                            \
+                char result;                                                \
+                                                                            \
+                __asm__ __volatile__(                                       \
+                    "lock; cmpxchgl %3, %0; setz %1"                        \
+                    : "=m"(*addr), "=q"(result)                             \
+                    : "m"(*addr), "r" (new_val), "a"(old)                   \
+                    : "memory");                                            \
+                return (int) result;                                        \
+            } while (0)
+
+    #endif
 
 #endif
 
+#ifdef MR_SUB1_AND_FETCH_INT_BODY
+    MR_EXTERN_INLINE int
+    MR_sub1_and_fetch_int(volatile int *addr)
+    {
+        MR_SUB1_AND_FETCH_INT_BODY;
+    }
+#endif
+
 #ifdef MR_COMPARE_AND_SWAP_WORD_BODY
     MR_EXTERN_INLINE MR_bool
     MR_compare_and_swap_word(volatile MR_Integer *addr, MR_Integer old,
diff --git a/runtime/mercury_conf.h.in b/runtime/mercury_conf.h.in
index f987a07..5c424ec 100644
--- a/runtime/mercury_conf.h.in
+++ b/runtime/mercury_conf.h.in
@@ -259,6 +259,7 @@
 **	MR_HAVE__PUTENV		we have the _putenv() function.
 **	MR_HAVE_POSIX_SPAWN	we have the posix_spawn() function.
 **	MR_HAVE_FESETROUND	we have the fesetround() function.
+**	MR_HAVE_SCHED_YIELD	we have the sched_yield() function.
 */
 #undef	MR_HAVE_GETPID
 #undef	MR_HAVE_SETPGID
@@ -320,6 +321,7 @@
 #undef	MR_HAVE__PUTENV
 #undef	MR_HAVE_POSIX_SPAWN
 #undef	MR_HAVE_FESETROUND
+#undef	MR_HAVE_SCHED_YIELD
 
 /*
 ** We use mprotect() and signals to catch stack and heap overflows.
diff --git a/runtime/mercury_context.c b/runtime/mercury_context.c
index d329c50..44c3ea1 100644
--- a/runtime/mercury_context.c
+++ b/runtime/mercury_context.c
@@ -27,28 +27,31 @@ ENDINIT
 	#include <unistd.h>	/* for select() on OS X */
   #endif	
 #endif
+#ifdef MR_WIN32
+  #include <sys/timeb.h>    /* for _ftime() */
+#endif
 
 #include "mercury_memory_handlers.h"
 #include "mercury_context.h"
 #include "mercury_engine.h"             /* for `MR_memdebug' */
 #include "mercury_reg_workarounds.h"    /* for `MR_fd*' stuff */
 
-static void
-MR_init_context_maybe_generator(MR_Context *c, const char *id,
-    MR_GeneratorPtr gen);
+static  void    MR_init_context_maybe_generator(MR_Context *c, const char *id,
+                    MR_GeneratorPtr gen);
+
+#ifdef  MR_LL_PARALLEL_CONJ
+static  void    MR_add_spark_deque(MR_SparkDeque *sd);
+static  void    MR_delete_spark_deque(const MR_SparkDeque *sd);
+static  void    MR_sleep_runqueue(unsigned int msecs);
+static  void    MR_milliseconds_from_now(struct timespec *timeout,
+                    unsigned int msecs);
+#endif
 
 /*---------------------------------------------------------------------------*/
 
 /*
-** The run queue and spark queue are protected and signalled with the
-** same lock and condition variable.
-**
-** The single sync term lock is used to prevent races in MR_join_and_continue.
-** The holder of the sync term lock may acquire the runqueue lock but not vice
-** versa.  (We could also have one sync term lock per context, and make
-** MR_join_and_continue acquire the sync term lock of the context that
-** originated the parallel conjunction, but contention for the single lock
-** doesn't seem to be an issue.)
+** The run queue is protected with MR_runqueue_lock and signalled
+** MR_runqueue_cond.
 */
 MR_Context              *MR_runqueue_head;
 MR_Context              *MR_runqueue_tail;
@@ -56,10 +59,6 @@ MR_Context              *MR_runqueue_tail;
   MercuryLock           MR_runqueue_lock;
   MercuryCond           MR_runqueue_cond;
 #endif
-#ifdef  MR_LL_PARALLEL_CONJ
-  MR_SparkDeque         MR_spark_queue;
-  MercuryLock           MR_sync_term_lock;
-#endif
 
 MR_PendingContext       *MR_pending_contexts;
 #ifdef  MR_THREAD_SAFE
@@ -79,8 +78,10 @@ static MR_Context       *free_small_context_list = NULL;
 #endif
 
 #ifdef  MR_LL_PARALLEL_CONJ
-int MR_num_idle_engines = 0;
-int MR_num_outstanding_contexts_and_sparks = 0;
+static MercuryLock      spark_deques_lock;
+static MR_SparkDeque    **MR_spark_deques = NULL;
+static int              MR_max_spark_deques = 0;
+static int              MR_victim_counter = 0;
 #endif
 
 /*---------------------------------------------------------------------------*/
@@ -96,8 +97,7 @@ MR_init_thread_stuff(void)
     pthread_mutex_init(&MR_global_lock, MR_MUTEX_ATTR);
     pthread_mutex_init(&MR_pending_contexts_lock, MR_MUTEX_ATTR);
   #ifdef MR_LL_PARALLEL_CONJ
-    MR_init_wsdeque(&MR_spark_queue, MR_INITIAL_GLOBAL_SPARK_QUEUE_SIZE);
-    pthread_mutex_init(&MR_sync_term_lock, MR_MUTEX_ATTR);
+    pthread_mutex_init(&spark_deques_lock, MR_MUTEX_ATTR);
   #endif
     pthread_mutex_init(&MR_STM_lock, MR_MUTEX_ATTR);
   #ifndef MR_THREAD_LOCAL_STORAGE
@@ -128,7 +128,7 @@ MR_finalize_runqueue(void)
     pthread_mutex_destroy(&free_context_list_lock);
 #endif
 #ifdef  MR_LL_PARALLEL_CONJ
-    pthread_mutex_destroy(&MR_sync_term_lock);
+    pthread_mutex_destroy(&spark_deques_lock);
 #endif
 }
 
@@ -264,6 +264,7 @@ MR_init_context_maybe_generator(MR_Context *c, const char *id,
     c->MR_ctxt_parent_sp = NULL;
     MR_init_wsdeque(&c->MR_ctxt_spark_deque,
         MR_INITIAL_LOCAL_SPARK_DEQUE_SIZE);
+    MR_add_spark_deque(&c->MR_ctxt_spark_deque);
   #endif /* MR_LL_PARALLEL_CONJ */
 
 #endif /* !MR_HIGHLEVEL_CODE */
@@ -315,10 +316,6 @@ MR_create_context(const char *id, MR_ContextSize ctxt_size, MR_Generator *gen)
 
     MR_LOCK(&free_context_list_lock, "create_context");
 
-#ifdef MR_LL_PARALLEL_CONJ
-    MR_num_outstanding_contexts_and_sparks++;
-#endif
-
     /*
     ** Regular contexts have stacks at least as big as small contexts,
     ** so we can return a regular context in place of a small context
@@ -373,11 +370,11 @@ MR_destroy_context(MR_Context *c)
         c->MR_ctxt_nondetstack_zone->MR_zone_min);
 #endif /* defined(MR_CONSERVATIVE_GC) && !defined(MR_HIGHLEVEL_CODE) */
 
-    MR_LOCK(&free_context_list_lock, "destroy_context");
-#ifdef MR_LL_PARALLEL_CONJ
-    MR_num_outstanding_contexts_and_sparks--;
+#ifdef  MR_LL_PARALLEL_CONJ
+    MR_delete_spark_deque(&c->MR_ctxt_spark_deque);
 #endif
 
+    MR_LOCK(&free_context_list_lock, "destroy_context");
     switch (c->MR_ctxt_size) {
         case MR_CONTEXT_SIZE_REGULAR:
             c->MR_ctxt_next = free_context_list;
@@ -391,12 +388,216 @@ MR_destroy_context(MR_Context *c)
     MR_UNLOCK(&free_context_list_lock, "destroy_context");
 }
 
+#ifdef MR_LL_PARALLEL_CONJ
+
+static void
+MR_add_spark_deque(MR_SparkDeque *sd)
+{
+    int             slot;
+
+    MR_LOCK(&spark_deques_lock, "create_spark_deque");
+
+    /* Search for an spark deque slot. */
+    for (slot = 0; slot < MR_max_spark_deques; slot++) {
+        if (MR_spark_deques[slot] == NULL) {
+            break;
+        }
+    }
+
+    if (slot == MR_max_spark_deques) {
+        if (MR_max_spark_deques == 0) {
+            MR_max_spark_deques = 1;
+        } else if (MR_max_spark_deques < 32) {
+            MR_max_spark_deques *= 2;
+        } else {
+            MR_max_spark_deques += 16;
+        }
+        MR_spark_deques = MR_GC_RESIZE_ARRAY(MR_spark_deques,
+            MR_SparkDeque *, MR_max_spark_deques);
+    }
+
+    MR_spark_deques[slot] = sd;
+
+    MR_UNLOCK(&spark_deques_lock, "create_spark_deque");
+}
+
+static void
+MR_delete_spark_deque(const MR_SparkDeque *sd)
+{
+    int i;
+
+    MR_LOCK(&spark_deques_lock, "delete_spark_deque");
+
+    for (i = 0; i < MR_max_spark_deques; i++) {
+        if (MR_spark_deques[i] == sd) {
+            MR_spark_deques[i] = NULL;
+            break;
+        }
+    }
+
+    MR_UNLOCK(&spark_deques_lock, "delete_spark_deque");
+}
+
+/* Search for a ready context which we can handle. */
+static MR_Context *
+MR_find_ready_context(MercuryThread thd, MR_Unsigned depth)
+{
+    MR_Context  *cur;
+    MR_Context  *prev;
+
+    cur = MR_runqueue_head;
+    /* XXX check pending io */
+    prev = NULL;
+    while (cur != NULL) {
+        if (cur->MR_ctxt_resume_owner_thread == thd &&
+            cur->MR_ctxt_resume_c_depth == depth)
+        {
+            cur->MR_ctxt_resume_owner_thread = (MercuryThread) NULL;
+            cur->MR_ctxt_resume_c_depth = 0;
+            break;
+        }
+
+        if (cur->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
+            break;
+        }
+
+        prev = cur;
+        cur = cur->MR_ctxt_next;
+    }
+
+    if (cur != NULL) {
+        if (prev != NULL) {
+            prev->MR_ctxt_next = cur->MR_ctxt_next;
+        } else {
+            MR_runqueue_head = cur->MR_ctxt_next;
+        }
+        if (MR_runqueue_tail == cur) {
+            MR_runqueue_tail = prev;
+        }
+    }
+
+    return cur;
+}
+
+static MR_bool
+MR_attempt_steal_spark(MR_Spark *spark)
+{
+    int             max_attempts;
+    int             attempt;
+    MR_SparkDeque   *victim;
+    int             steal_top;
+
+    do {
+        /*
+        ** Protect against concurrent updates of MR_spark_deques and
+        ** MR_num_spark_deques.  This allows only one thread to try to steal
+        ** work at any time, which is mainly a good thing as it limits the
+        ** amount of wasted effort.
+        */
+        MR_LOCK(&spark_deques_lock, "attempt_steal_spark");
+
+        if (MR_max_spark_deques < MR_worksteal_max_attempts) {
+            max_attempts = MR_max_spark_deques;
+        } else {
+            max_attempts = MR_worksteal_max_attempts;
+        }
+
+        for (attempt = 0; attempt < max_attempts; attempt++) {
+            MR_victim_counter++;
+            victim = MR_spark_deques[MR_victim_counter % MR_max_spark_deques];
+            if (victim != NULL) {
+                steal_top = MR_wsdeque_steal_top(victim, spark);
+                if (steal_top == 1) {
+                    /* Steal successful. */
+                    MR_UNLOCK(&spark_deques_lock, "attempt_steal_spark");
+                    return MR_TRUE;
+                }
+            }
+        }
+
+        MR_UNLOCK(&spark_deques_lock, "attempt_steal_spark");
+
+        MR_sleep_runqueue(MR_worksteal_sleep_msecs);
+    } while (MR_runqueue_head == NULL && !MR_exit_now);
+
+    /* Steal unsuccessful. */
+    return MR_FALSE;
+}
+
+/*
+** Snooze for a while, but wake up if the runqueue becomes non-empty.
+*/
+static void
+MR_sleep_runqueue(unsigned int msecs)
+{
+    struct timespec timeout;
+
+    MR_milliseconds_from_now(&timeout, msecs);
+    MR_LOCK(&MR_runqueue_lock, "sleep_runqueue");
+    MR_TIMED_WAIT(&MR_runqueue_cond, &MR_runqueue_lock, &timeout);
+    MR_UNLOCK(&MR_runqueue_lock, "sleep_runqueue");
+}
+
+static void
+MR_milliseconds_from_now(struct timespec *timeout, unsigned int msecs)
+{
+#if defined(MR_HAVE_GETTIMEOFDAY)
+
+    const long          NANOSEC_PER_SEC = 1000000000L;
+    struct timeval      now;
+    MR_int_least64_t    nanosecs;
+
+    gettimeofday(&now, NULL);
+    timeout->tv_sec = now.tv_sec;
+    nanosecs = ((MR_int_least64_t) (now.tv_usec + (msecs * 1000))) * 1000L;
+    if (nanosecs >= NANOSEC_PER_SEC) {
+        timeout->tv_sec++;
+        nanosecs %= NANOSEC_PER_SEC;
+    }
+    timeout->tv_nsec = (long) nanosecs;
+
+#elif defined(MR_WIN32)
+
+    const long          NANOSEC_PER_SEC = 1000000000L;
+    const long          NANOSEC_PER_MILLISEC = 1000000L;
+    struct _timeb       now;
+    MR_int_least64_t    nanosecs;
+
+    _ftime(&now);
+    timeout->tv_sec = now.time;
+    nanosecs = ((MR_int_least64_t) (msecs + now.millitm)) * NANOSEC_PER_MILLISEC;
+    if (nanosecs >= NANOSEC_PER_SEC) {
+        timeout->tv_sec++;
+        nanosecs %= NANOSEC_PER_SEC;
+    }
+    timeout->tv_nsec = (long) nanosecs;
+
+#else
+
+    #error Missing definition of MR_milliseconds_from_now.
+
+#endif
+}
+
+#endif  /* MR_LL_PARALLEL_CONJ */
+
 void 
 MR_flounder(void)
 {
     MR_fatal_error("computation floundered");
 }
 
+void
+MR_sched_yield(void)
+{
+#if defined(MR_HAVE_SCHED_YIELD)
+    sched_yield();
+#elif defined(MR_CAN_DO_PENDING_IO)
+    struct timeval timeout = {0, 1};
+    select(0, NULL, NULL, NULL, &timeout);
+#endif
+}
+
 /*
 ** Check to see if any contexts that blocked on IO have become
 ** runnable. Return the number of contexts that are still blocked.
@@ -527,18 +728,6 @@ MR_schedule_context(MR_Context *ctxt)
     MR_UNLOCK(&MR_runqueue_lock, "schedule_context");
 }
 
-#ifdef MR_LL_PARALLEL_CONJ
-void
-MR_schedule_spark_globally(const MR_Spark *proto_spark)
-{
-    MR_LOCK(&MR_runqueue_lock, "schedule_spark_globally");
-    MR_wsdeque_push_bottom(&MR_spark_queue, proto_spark);
-    MR_num_outstanding_contexts_and_sparks++;
-    MR_SIGNAL(&MR_runqueue_cond);
-    MR_UNLOCK(&MR_runqueue_lock, "schedule_spark_globally");
-}
-#endif /* !MR_LL_PARALLEL_CONJ */
-
 
 #ifndef MR_HIGHLEVEL_CODE
 
@@ -551,10 +740,10 @@ MR_BEGIN_CODE
 MR_define_entry(MR_do_runnext);
 #ifdef MR_THREAD_SAFE
 {
-    MR_Context      *tmp;
-    MR_Context      *prev;
+    MR_Context      *ready_context;
+    MR_Code         *resume_point;
     MR_Spark        spark;
-    unsigned        depth;
+    MR_Unsigned     depth;
     MercuryThread   thd;
 
     /*
@@ -562,8 +751,7 @@ MR_define_entry(MR_do_runnext);
     ** in the middle of running some code.
     */
     MR_assert(
-        MR_ENGINE(MR_eng_this_context) == NULL
-    ||
+        MR_ENGINE(MR_eng_this_context) == NULL ||
         MR_wsdeque_is_empty(
             &MR_ENGINE(MR_eng_this_context)->MR_ctxt_spark_deque)
     );
@@ -571,10 +759,6 @@ MR_define_entry(MR_do_runnext);
     depth = MR_ENGINE(MR_eng_c_depth);
     thd = MR_ENGINE(MR_eng_owner_thread);
 
-    MR_LOCK(&MR_runqueue_lock, "MR_do_runnext (i)");
-
-    MR_num_idle_engines++;
-
     while (1) {
         if (MR_exit_now) {
             /*
@@ -582,68 +766,66 @@ MR_define_entry(MR_do_runnext);
             ** up the Mercury runtime.  It cannot exit by this route.
             */
             assert(thd != MR_primordial_thread);
-            MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (ii)");
             MR_destroy_thread(MR_cur_engine());
         }
 
-        /* Search for a ready context which we can handle. */
-        tmp = MR_runqueue_head;
-        /* XXX check pending io */
-        prev = NULL;
-        while (tmp != NULL) {
-            if (tmp->MR_ctxt_resume_owner_thread == thd && 
-                tmp->MR_ctxt_resume_c_depth == depth)
-            {
-                tmp->MR_ctxt_resume_owner_thread = (MercuryThread) NULL;
-                tmp->MR_ctxt_resume_c_depth = 0;
-                MR_num_idle_engines--;
-                goto ReadyContext;
-            }
-
-            if (tmp->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
-                MR_num_idle_engines--;
-                goto ReadyContext;
-            }
-
-            prev = tmp;
-            tmp = tmp->MR_ctxt_next;
+        MR_LOCK(&MR_runqueue_lock, "do_runnext");
+        ready_context = MR_find_ready_context(thd, depth);
+        MR_UNLOCK(&MR_runqueue_lock, "do_runnext");
+        if (ready_context != NULL) {
+            goto ReadyContext;
         }
 
-        /* Check if the global spark queue is nonempty. */
-        if (MR_wsdeque_take_top(&MR_spark_queue, &spark)) {
-            MR_num_idle_engines--;
-            MR_num_outstanding_contexts_and_sparks--;
+        /*
+        ** No suitable ready contexts, so try to steal a parallel conjunct
+        ** instead.
+        */
+        if (MR_attempt_steal_spark(&spark)) {
             goto ReadySpark;
         }
-
-        /* Nothing to do, go back to sleep. */
-        while (MR_WAIT(&MR_runqueue_cond, &MR_runqueue_lock) != 0) {
-        }
     }
 
-  ReadyContext:
+    /* Unreachable. */
+    abort();
 
-    if (prev != NULL) {
-        prev->MR_ctxt_next = tmp->MR_ctxt_next;
-    } else {
-        MR_runqueue_head = tmp->MR_ctxt_next;
-    }
-    if (MR_runqueue_tail == tmp) {
-        MR_runqueue_tail = prev;
-    }
-    MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
+  ReadyContext:
 
     /* Discard whatever unused context we may have and switch to tmp. */
     if (MR_ENGINE(MR_eng_this_context) != NULL) {
         MR_destroy_context(MR_ENGINE(MR_eng_this_context));
     }
-    MR_ENGINE(MR_eng_this_context) = tmp;
-    MR_load_context(tmp);
-    MR_GOTO(tmp->MR_ctxt_resume);
+    MR_ENGINE(MR_eng_this_context) = ready_context;
+    MR_load_context(ready_context);
+
+    resume_point = ready_context->MR_ctxt_resume;
+    ready_context->MR_ctxt_resume = NULL;
+    MR_GOTO(resume_point);
 
   ReadySpark:
 
-    MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
+    if (! spark.MR_spark_sync_term->MR_st_is_shared) {
+        spark.MR_spark_sync_term->MR_st_is_shared = MR_TRUE;
+        /*
+        ** If we allow the stolen spark (New) to execute immediately
+        ** there could be a race with a sibling conjunct (Old) which is
+        ** currently executing, e.g.
+        **
+        ** 1. Old enters MR_join_and_continue(), loads old value of
+        **    MR_st_count;
+        ** 2. New begins executing;
+        ** 3. New enters MR_join_and_continue(), decrements MR_st_count
+        **    atomically;
+        ** 4. Old decrements MR_st_count *non-atomically* based on the
+        **    old value of MR_st_count.
+        **
+        ** Therefore this loop delays the new spark from executing
+        ** while there is another conjunct in MR_join_and_continue()
+        ** which might decrement MR_st_count non-atomically.
+        */
+        while (spark.MR_spark_sync_term->MR_st_attempt_cheap_join) {
+            MR_sched_yield();
+        }
+    }
 
     /* Grab a new context if we haven't got one then begin execution. */
     if (MR_ENGINE(MR_eng_this_context) == NULL) {
@@ -651,9 +833,13 @@ MR_define_entry(MR_do_runnext);
             MR_CONTEXT_SIZE_SMALL, NULL);
         MR_load_context(MR_ENGINE(MR_eng_this_context));
     }
-    MR_parent_sp = spark.MR_spark_parent_sp;
-    MR_assert(MR_parent_sp != MR_sp);
+    MR_parent_sp = spark.MR_spark_sync_term->MR_st_parent_sp;
     MR_SET_THREAD_LOCAL_MUTABLES(spark.MR_spark_thread_local_mutables);
+
+    assert(MR_parent_sp);
+    assert(MR_parent_sp != MR_sp);
+    assert(spark.MR_spark_sync_term->MR_st_count > 0);
+
     MR_GOTO(spark.MR_spark_resume);
 }
 #else /* !MR_THREAD_SAFE */
@@ -675,7 +861,7 @@ MR_define_entry(MR_do_runnext);
     MR_load_context(MR_ENGINE(MR_eng_this_context));
     MR_GOTO(MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume);
 }
-#endif
+#endif /* !MR_THREAD_SAFE */
 
 MR_END_MODULE
 
diff --git a/runtime/mercury_context.h b/runtime/mercury_context.h
index e12c48f..3f98c13 100644
--- a/runtime/mercury_context.h
+++ b/runtime/mercury_context.h
@@ -201,22 +201,6 @@
 **                  (Accessed via MR_eng_this_context.)
 */
 
-/*
-** A spark contains just enough information to begin execution of a parallel
-** conjunct.  Sparks exist on a context's local spark deque or in the global
-** spark queue.  In the former case, a spark will eventually be executed in the
-** same context (same detstack, etc.) as the code that generated the spark. In
-** the latter case the spark can be picked up and executed by any idle engine
-** in a different context.
-**
-** In the current implementation a spark is put on the global spark queue if,
-** at the time a fork instruction is reached, we think the spark has a chance
-** of being picked up for execution by an idle engine.  Otherwise the spark
-** goes on the context's spark stack. At the moment this is an irrevocable
-** decision. A future possibility is to allow idle engines to steal work
-** from the cold end of some context's spark deque.
-*/
-
 typedef struct MR_Context_Struct        MR_Context;
 
 typedef enum {
@@ -236,13 +220,21 @@ struct MR_SavedOwner_Struct {
 
 
 #ifdef MR_LL_PARALLEL_CONJ
+typedef struct MR_SyncTerm_Struct       MR_SyncTerm;
 typedef struct MR_Spark_Struct          MR_Spark;
 typedef struct MR_SparkDeque_Struct     MR_SparkDeque;
 typedef struct MR_SparkArray_Struct     MR_SparkArray;
 
+/*
+** A spark contains just enough information to begin execution of a parallel
+** conjunct.  A spark will either be executed in the same context (same
+** detstack, etc.) as the code that generated the spark, or it may be stolen
+** from its deque and executed by any idle engine in a different context.
+*/
+
 struct MR_Spark_Struct {
+    MR_SyncTerm             *MR_spark_sync_term;
     MR_Code                 *MR_spark_resume;
-    MR_Word                 *MR_spark_parent_sp;
     MR_ThreadLocalMuts      *MR_spark_thread_local_mutables;
 };
 
@@ -328,10 +320,6 @@ struct MR_Context_Struct {
 
 /*
 ** The runqueue is a linked list of contexts that are runnable.
-** The spark_queue is an array of sparks that are runnable.
-** We keep them separate to prioritise contexts (which are mainly
-** computations which have already started) over sparks (which are
-** computations which have not begun).
 */
 
 extern      MR_Context  *MR_runqueue_head;
@@ -340,10 +328,6 @@ extern      MR_Context  *MR_runqueue_tail;
   extern    MercuryLock MR_runqueue_lock;
   extern    MercuryCond MR_runqueue_cond;
 #endif
-#ifdef  MR_LL_PARALLEL_CONJ
-  extern    MR_SparkDeque   MR_spark_queue;
-  extern    MercuryLock     MR_sync_term_lock;
-#endif
 
 /*
 ** As well as the runqueue, we maintain a linked list of contexts
@@ -380,35 +364,6 @@ extern  MR_PendingContext   *MR_pending_contexts;
   extern    MercuryLock     MR_pending_contexts_lock;
 #endif
 
-#ifdef  MR_LL_PARALLEL_CONJ
-  /*
-  ** The number of engines waiting for work.
-  ** We don't protect it with a separate lock, but updates to it are made while
-  ** holding the MR_runqueue_lock.  Reads are made without the lock.
-  ** XXX We may need to use atomic instructions or memory fences on some
-  ** architectures.
-  */
-  extern    int     MR_num_idle_engines;
-
-  /*
-  ** The number of contexts that are not in the free list (i.e. are executing
-  ** or suspended) plus the number of sparks in the spark queue.  We count
-  ** those sparks as they can quickly accumulate on the spark queue before any
-  ** of them are taken up for execution.  Once they do get taken up, many
-  ** contexts would need to be allocated to execute them.  Sparks not on the
-  ** spark queue are currently guaranteed to be executed on their originating
-  ** context so won't cause allocation of more contexts.
-  **
-  ** What we are mainly interested in here is preventing too many contexts from
-  ** being allocated, as each context is quite large and we can quickly run out
-  ** of memory.  Another problem is due to the context free list and
-  ** conservative garbage collection: every context ever allocated will be
-  ** scanned.  (Getting the garbage collector not to scan contexts on the free
-  ** list should be possible though.)
-  */
-  extern    int     MR_num_outstanding_contexts_and_sparks;
-#endif  /* !MR_LL_PARALLEL_CONJ */
-
 /*---------------------------------------------------------------------------*/
 
 /*
@@ -450,17 +405,15 @@ extern  void        MR_finalize_runqueue(void);
 extern  void        MR_flounder(void);
 
 /*
+** Relinquish the processor voluntarily without blocking.
+*/
+extern  void        MR_sched_yield(void);
+
+/*
 ** Append the given context onto the end of the run queue.
 */
 extern  void        MR_schedule_context(MR_Context *ctxt);
 
-#ifdef MR_LL_PARALLEL_CONJ
-  /*
-  ** Append the given spark onto the end of the spark queue.
-  */
-  extern    void    MR_schedule_spark_globally(const MR_Spark *spark);
-#endif /* !MR_LL_PARALLEL_CONJ */
-
 #ifndef MR_HIGHLEVEL_CODE
   MR_declare_entry(MR_do_runnext);
   #define MR_runnext()                          \
@@ -684,7 +637,7 @@ extern  void        MR_schedule_context(MR_Context *ctxt);
         /* it wouldn't be appropriate to copy the resume field */             \
         to_cptr->MR_ctxt_thread_local_mutables =                              \
             from_cptr->MR_ctxt_thread_local_mutables;                         \
-        /* it wouldn't be appropriate to copy the spark_queue field */        \
+        /* it wouldn't be appropriate to copy the spark_deque field */        \
         /* it wouldn't be appropriate to copy the saved_owners field */       \
     } while (0)
 
@@ -695,182 +648,148 @@ extern  void        MR_schedule_context(MR_Context *ctxt);
   /*
   ** If you change MR_SyncTerm_Struct you need to update configure.in.
   **
-  ** MR_st_count is `int' so that on a 64-bit machine the total size of the
-  ** sync term is two words, not three words (assuming `int' is 32 bits).
+  ** MR_st_attempt_cheap_join is read-write by the context which originated
+  ** the sync term and read-only to all other contexts.
   **
-  ** XXX we should remove that assumption but it's a little tricky because
-  ** configure needs to understand the types as well
+  ** MR_InitSyncTerm_Struct allows us to initialise MR_st_is_shared
+  ** and MR_st_attempt_cheap_join together, which is a bit quicker.
   */
 
-  typedef struct MR_SyncTerm_Struct MR_SyncTerm;
-
   struct MR_SyncTerm_Struct {
     MR_Context      *MR_st_orig_context;
+    MR_Word         *MR_st_parent_sp;
     volatile int    MR_st_count;
-    volatile int    MR_st_is_shared;
+    volatile short  MR_st_is_shared;
+    volatile short  MR_st_attempt_cheap_join;
+  };
+
+  typedef struct MR_InitSyncTerm_Struct MR_InitSyncTerm;
+  struct MR_InitSyncTerm_Struct {
+    MR_Context      *MR_st_orig_context;
+    MR_Word         *MR_st_parent_sp;
+    int             MR_st_count;
+    int             MR_st_rest;
   };
 
   #define MR_init_sync_term(sync_term, nbranches)                             \
     do {                                                                      \
-        MR_SyncTerm *init_st = (MR_SyncTerm *) &(sync_term);                  \
+        MR_InitSyncTerm *init_st = (MR_InitSyncTerm *) &(sync_term);          \
                                                                               \
         init_st->MR_st_orig_context = MR_ENGINE(MR_eng_this_context);         \
+        init_st->MR_st_parent_sp = MR_parent_sp;                              \
         init_st->MR_st_count = (nbranches);                                   \
-        init_st->MR_st_is_shared = MR_FALSE;                                  \
+        init_st->MR_st_rest = 0;                                              \
     } while (0)
 
   /*
   ** fork_new_child(MR_SyncTerm st, MR_Code *child):
   **
   ** Create a new spark to execute the code at `child'.  The new spark is put
-  ** on the global spark queue or the context-local spark deque.  The current
-  ** context resumes at `parent'.  MR_parent_sp must already be set
-  ** appropriately before this instruction is executed.
-  **
-  ** If the spark ends up on the global spark queue then we set
-  ** `MR_st_is_shared' to true as branches of this parallel conjunction could
-  ** be executed in parallel.
+  ** on the context's spark queue.  The current context resumes at `parent'.
+  ** MR_parent_sp must already be set appropriately before this instruction
+  ** is executed.
   */
   #define MR_fork_new_child(sync_term, child)                                 \
     do {                                                                      \
-        MR_Spark fnc_spark;                                                   \
+        MR_Spark        fnc_spark;                                            \
+        MR_SparkDeque   *fnc_deque;                                           \
                                                                               \
+        fnc_spark.MR_spark_sync_term = (MR_SyncTerm *) &(sync_term);          \
         fnc_spark.MR_spark_resume = (child);                                  \
-        fnc_spark.MR_spark_parent_sp = MR_parent_sp;                          \
         fnc_spark.MR_spark_thread_local_mutables = MR_THREAD_LOCAL_MUTABLES;  \
-        if (MR_fork_globally_criteria) {                                      \
-            MR_SyncTerm *fnc_st = (MR_SyncTerm *) &(sync_term);               \
-            fnc_st->MR_st_is_shared = MR_TRUE;                                \
-            MR_schedule_spark_globally(&fnc_spark);                           \
-        } else {                                                              \
-            MR_schedule_spark_locally(&fnc_spark);                            \
-        }                                                                     \
-    } while (0)
-
-  #define MR_fork_globally_criteria                                           \
-    (MR_num_idle_engines != 0 &&                                              \
-    MR_num_outstanding_contexts_and_sparks < MR_max_outstanding_contexts)
-
-  #define MR_choose_parallel_over_sequential_cond(target_cpus)                \
-      (MR_num_outstanding_contexts_and_sparks < target_cpus)
-
-  #define MR_schedule_spark_locally(spark)                                    \
-    do {                                                                      \
-        MR_Context  *ssl_ctxt;                                                \
                                                                               \
-        /*                                                                    \
-        ** Only the engine running the context is allowed to access           \
-        ** the context's spark stack, so no locking is required here.         \
-        */                                                                    \
-        ssl_ctxt = MR_ENGINE(MR_eng_this_context);                            \
-        MR_wsdeque_push_bottom(&ssl_ctxt->MR_ctxt_spark_deque, (spark));      \
+        fnc_deque = &MR_ENGINE(MR_eng_this_context)->MR_ctxt_spark_deque;     \
+        MR_wsdeque_push_bottom(fnc_deque, &fnc_spark);                        \
     } while (0)
 
   #define MR_join_and_continue(sync_term, join_label)                         \
     do {                                                                      \
-        MR_SyncTerm *jnc_st = (MR_SyncTerm *) &sync_term;                     \
+        MR_SyncTerm *jnc_st = (MR_SyncTerm *) &(sync_term);                   \
+        MR_bool     jnc_last;                                                 \
                                                                               \
-        if (!jnc_st->MR_st_is_shared) {                                       \
-            /* This parallel conjunction has only executed sequentially. */   \
-            if (--jnc_st->MR_st_count == 0) {                                 \
-                MR_GOTO(join_label);                                          \
+        if (jnc_st->MR_st_count == 1) {                                       \
+            jnc_last = MR_TRUE;                                               \
+        } else if (MR_ENGINE(MR_eng_this_context)                             \
+            == jnc_st->MR_st_orig_context)                                    \
+        {                                                                     \
+            jnc_st->MR_st_attempt_cheap_join = MR_TRUE;                       \
+            if (jnc_st->MR_st_is_shared) {                                    \
+                jnc_last = MR_sub1_and_fetch_int(&jnc_st->MR_st_count) == 0;  \
             } else {                                                          \
-                MR_join_and_continue_1();                                     \
+                jnc_last = (--jnc_st->MR_st_count == 0);                      \
             }                                                                 \
+            jnc_st->MR_st_attempt_cheap_join = MR_FALSE;                      \
         } else {                                                              \
-            /* This parallel conjunction may be executing in parallel. */     \
-            MR_LOCK(&MR_sync_term_lock, "continue");                          \
-            if (--jnc_st->MR_st_count == 0) {                                 \
-                if (MR_ENGINE(MR_eng_this_context)                            \
-                    == jnc_st->MR_st_orig_context)                            \
-                {                                                             \
-                    /*                                                        \
-                    ** This context originated this parallel conjunction and  \
-                    ** all the branches have finished so jump to the join     \
-                    ** label.                                                 \
-                    */                                                        \
-                    MR_UNLOCK(&MR_sync_term_lock, "continue i");              \
-                    MR_GOTO(join_label);                                      \
-                } else {                                                      \
-                    /*                                                        \
-                    ** This context didn't originate this parallel            \
-                    ** conjunction and we're the last branch to finish.  The  \
-                    ** originating context should be suspended waiting for us \
-                    ** to finish, so wake it up.                              \
-                    */                                                        \
-                    jnc_st->MR_st_orig_context->MR_ctxt_resume = join_label;  \
-                    MR_schedule_context(jnc_st->MR_st_orig_context);          \
-                    MR_UNLOCK(&MR_sync_term_lock, "continue ii");             \
-                    MR_runnext();                                             \
-                }                                                             \
-            } else {                                                          \
-                MR_join_and_continue_2();                                     \
-            }                                                                 \
+            jnc_last = MR_sub1_and_fetch_int(&jnc_st->MR_st_count) == 0;      \
+        }                                                                     \
+        if (jnc_last) {                                                       \
+            MR_jnc_last_conjunct(jnc_st, join_label);                         \
+        } else {                                                              \
+            MR_jnc_nonlast_conjunct(jnc_st, join_label);                      \
         }                                                                     \
     } while (0)
 
-  #define MR_join_and_continue_1()                                            \
+  #define MR_jnc_last_conjunct(jnc_st, join_label)                            \
     do {                                                                      \
-        MR_Context  *jnc_ctxt;                                                \
-        MR_bool     jnc_popped;                                               \
-        MR_Spark    jnc_spark;                                                \
-                                                                              \
-        jnc_ctxt = MR_ENGINE(MR_eng_this_context);                            \
-        jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,    \
-            &jnc_spark);                                                      \
-        if (jnc_popped) {                                                     \
-            MR_GOTO(jnc_spark.MR_spark_resume);                               \
+        MR_Context  *jnc_orig_context = jnc_st->MR_st_orig_context;           \
+        /*                                                                    \
+        ** If this context originated this parallel conjunction and this is   \
+        ** the last conjunct to finish then we can jump to the join_label.    \
+        */                                                                    \
+        if (MR_ENGINE(MR_eng_this_context) == jnc_orig_context) {             \
+            MR_GOTO(join_label);                                              \
         } else {                                                              \
+            /*                                                                \
+            ** This context didn't originate this parallel conjunction        \
+            ** but we're the last conjunct to finish.                         \
+            **                                                                \
+            ** Only the originating context (OC) can jump to the join_label.  \
+            ** As the last conjunct to finish we have the responsibility of   \
+            ** scheduling OC when we are ready.  It's possible however that   \
+            ** both we and OC executed MR_join_and_continue() concurrently so \
+            ** we can't do that until we know that OC is ready to be          \
+            ** scheduled.  OC will set its resume point to join_label to      \
+            ** indicate that it is ready.                                     \
+            */                                                                \
+            while (jnc_orig_context->MR_ctxt_resume != (join_label)) {        \
+                MR_sched_yield();                                             \
+            }                                                                 \
+            MR_schedule_context(jnc_orig_context);                            \
             MR_runnext();                                                     \
         }                                                                     \
     } while (0)
 
-  #define MR_join_and_continue_2()                                            \
+  #define MR_jnc_nonlast_conjunct(jnc_st, join_label)                         \
     do {                                                                      \
-        MR_Context  *jnc_ctxt;                                                \
+        MR_Context  *jnc_this_ctxt;                                           \
         MR_bool     jnc_popped;                                               \
-        MR_Spark    jnc_spark;                                                \
+        MR_Code     *jnc_spark_resume;                                        \
                                                                               \
-        jnc_ctxt = MR_ENGINE(MR_eng_this_context);                            \
-        jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,    \
-            &jnc_spark);                                                      \
-        if (jnc_popped && (jnc_spark.MR_spark_parent_sp == MR_parent_sp)) {   \
-            /*                                                                \
-            ** The spark at the top of the stack is due to the same parallel  \
-            ** conjunction that we've just been executing. We can immediately \
-            ** execute the next branch of the same parallel conjunction in    \
-            ** the current context.                                           \
-            */                                                                \
-            MR_UNLOCK(&MR_sync_term_lock, "continue_2 i");                    \
-            MR_GOTO(jnc_spark.MR_spark_resume);                               \
+        jnc_this_ctxt = MR_ENGINE(MR_eng_this_context);                       \
+        jnc_popped = MR_wsdeque_pop_bottom(                                   \
+            &jnc_this_ctxt->MR_ctxt_spark_deque, &jnc_spark_resume);          \
+        if (jnc_popped) {                                                     \
+            MR_GOTO(jnc_spark_resume);                                        \
         } else {                                                              \
             /*                                                                \
-            ** The spark stack is empty or the next spark is from a different \
-            ** parallel conjunction to the one we've been executing.  Either  \
-            ** way, there's nothing more we can do with this context right    \
-            ** now.  Put back the spark we won't be using.                    \
-            */                                                                \
-            if (jnc_popped) {                                                 \
-                MR_wsdeque_putback_bottom(&jnc_ctxt->MR_ctxt_spark_deque,     \
-                    &jnc_spark);                                              \
-            }                                                                 \
-            /*                                                                \
             ** If this context originated the parallel conjunction we've been \
-            ** executing, the rest of the parallel conjunction must have been \
-            ** put on the global spark queue to be executed in other          \
-            ** contexts.  This context will need to be resumed once the       \
-            ** parallel conjunction is completed, so suspend the context.     \
+            ** executing, suspend this context such that it will be resumed   \
+            ** at the join label once the parallel conjunction is completed.  \
+            **                                                                \
+            ** Otherwise we can reuse this context for the next piece of work.\
             */                                                                \
-            if (jnc_ctxt == jnc_st->MR_st_orig_context) {                     \
-                MR_save_context(jnc_ctxt);                                    \
+            if (jnc_this_ctxt == jnc_st->MR_st_orig_context) {                \
+                MR_save_context(jnc_this_ctxt);                               \
+                jnc_this_ctxt->MR_ctxt_resume = (join_label);                 \
                 MR_ENGINE(MR_eng_this_context) = NULL;                        \
             }                                                                 \
-            /* Finally look for other work. */                                \
-            MR_UNLOCK(&MR_sync_term_lock, "continue_2 ii");                   \
             MR_runnext();                                                     \
         }                                                                     \
     } while (0)
 
+  /* XXX this is meaningless now */
+  #define MR_choose_parallel_over_sequential_cond(target_cpus) (MR_TRUE)
+
   /* This needs to come after the definition of MR_SparkDeque_Struct. */
   #include "mercury_wsdeque.h"
 
diff --git a/runtime/mercury_thread.c b/runtime/mercury_thread.c
index 440b817..b170b27 100644
--- a/runtime/mercury_thread.c
+++ b/runtime/mercury_thread.c
@@ -30,6 +30,8 @@
 #endif
 
 MR_bool             MR_exit_now;
+int                 MR_engines_to_start;
+
 MR_bool             MR_debug_threads = MR_FALSE;
 
 MR_Unsigned         MR_num_thread_local_mutables = 0;
@@ -136,6 +138,7 @@ MR_init_thread(MR_when_to_use when_to_use)
             MR_fatal_error("Sorry, not implemented: "
                 "--high-level-code and multiple engines");
 #else
+            MR_sub1_and_fetch_int(&MR_engines_to_start);
             (void) MR_call_engine(MR_ENTRY(MR_do_runnext), MR_FALSE);
 #endif
             MR_destroy_engine(eng);
diff --git a/runtime/mercury_thread.h b/runtime/mercury_thread.h
index 197f9ac..a7cc05f 100644
--- a/runtime/mercury_thread.h
+++ b/runtime/mercury_thread.h
@@ -45,44 +45,52 @@
     ** stable, since the alternative versions do the
     ** same thing, but with debugging support.
     */
-    #define MR_LOCK(lck, from)  pthread_mutex_lock((lck))
+    #define MR_LOCK(lck, from)      pthread_mutex_lock((lck))
     #define MR_UNLOCK(lck, from)    pthread_mutex_unlock((lck))
 
-    #define MR_SIGNAL(cnd)      pthread_cond_signal((cnd))
-    #define MR_BROADCAST(cnd)   pthread_cond_broadcast((cnd))
-    #define MR_WAIT(cnd, mtx)   pthread_cond_wait((cnd), (mtx))
+    #define MR_SIGNAL(cnd)          pthread_cond_signal((cnd))
+    #define MR_BROADCAST(cnd)       pthread_cond_broadcast((cnd))
+    #define MR_WAIT(cnd, mtx)       pthread_cond_wait((cnd), (mtx))
+    #define MR_TIMED_WAIT(cnd, mtx, abstime)                            \
+        pthread_cond_timedwait((cnd), (mtx), (abstime))
   #else
-    #define MR_LOCK(lck, from)                  \
-                    ( MR_debug_threads ?            \
-                    MR_mutex_lock((lck), (from))    \
-                :                   \
-                    pthread_mutex_lock((lck))   \
-                )
-    #define MR_UNLOCK(lck, from)                    \
-                    ( MR_debug_threads ?            \
-                        MR_mutex_unlock((lck), (from))  \
-                :                   \
-                    pthread_mutex_unlock((lck)) \
-                )
-
-    #define MR_SIGNAL(cnd)                      \
-                    ( MR_debug_threads ?            \
-                    MR_cond_signal((cnd))       \
-                :                   \
-                    pthread_cond_signal((cnd))  \
-                )
-    #define MR_BROADCAST(cnd)                   \
-                    ( MR_debug_threads ?            \
-                    MR_cond_broadcast((cnd))    \
-                :                   \
-                    pthread_cond_broadcast((cnd))   \
-                )
-    #define MR_WAIT(cnd, mtx)                   \
-                    ( MR_debug_threads ?            \
-                    MR_cond_wait((cnd), (mtx))  \
-                :                   \
-                    pthread_cond_wait((cnd), (mtx)) \
-                )
+    #define MR_LOCK(lck, from)                                          \
+        ( MR_debug_threads ?                                            \
+            MR_mutex_lock((lck), (from))                                \
+        :                                                               \
+            pthread_mutex_lock((lck))                                   \
+        )
+    #define MR_UNLOCK(lck, from)                                        \
+        ( MR_debug_threads ?                                            \
+            MR_mutex_unlock((lck), (from))                              \
+        :                                                               \
+            pthread_mutex_unlock((lck))                                 \
+        )
+
+    #define MR_SIGNAL(cnd)                                              \
+        ( MR_debug_threads ?                                            \
+            MR_cond_signal((cnd))                                       \
+        :                                                               \
+            pthread_cond_signal((cnd))                                  \
+        )
+    #define MR_BROADCAST(cnd)                                           \
+        ( MR_debug_threads ?                                            \
+            MR_cond_broadcast((cnd))                                    \
+        :                                                               \
+            pthread_cond_broadcast((cnd))                               \
+        )
+    #define MR_WAIT(cnd, mtx)                                           \
+        ( MR_debug_threads ?                                            \
+            MR_cond_wait((cnd), (mtx))                                  \
+        :                                                               \
+            pthread_cond_wait((cnd), (mtx))                             \
+        )
+    #define MR_TIMED_WAIT(cnd, mtx, abstime)                            \
+        ( MR_debug_threads ?                                            \
+            MR_cond_wait((cnd), (mtx))                                  \
+        :                                                               \
+            pthread_cond_wait((cnd), (mtx), (abstime))                  \
+        )
 
   #endif
 
@@ -95,10 +103,10 @@
   #define MR_RELEASE_GLOBAL_LOCK(where) MR_UNLOCK(&MR_global_lock, (where))
 
   #if defined(MR_DIGITAL_UNIX_PTHREADS)
-    #define MR_GETSPECIFIC(key)     ({      \
-        pthread_addr_t gstmp;           \
-        pthread_getspecific((key), &gstmp); \
-        (void *) gstmp;             \
+    #define MR_GETSPECIFIC(key)     ({                                  \
+        pthread_addr_t gstmp;                                           \
+        pthread_getspecific((key), &gstmp);                             \
+        (void *) gstmp;                                                 \
     })
     #define MR_KEY_CREATE   pthread_keycreate
   #else
@@ -123,6 +131,7 @@
   MercuryThread     *MR_create_thread(MR_ThreadGoal *);
   void              MR_destroy_thread(void *eng);
   extern MR_bool    MR_exit_now;
+  extern int        MR_engines_to_start;
 
   /*
   ** The primordial thread. Currently used for debugging.
diff --git a/runtime/mercury_wrapper.c b/runtime/mercury_wrapper.c
index d2d836c..8d2dbec 100644
--- a/runtime/mercury_wrapper.c
+++ b/runtime/mercury_wrapper.c
@@ -195,9 +195,14 @@ size_t      MR_stack_margin_size = 128;
 /* primary cache size to optimize for, in bytes */
 size_t      MR_pcache_size = 8192;
 
-/* soft limits on the number of contexts we can create */
-MR_Unsigned MR_max_contexts_per_thread = 2;
-MR_Unsigned MR_max_outstanding_contexts;
+/*
+** In grades that support parallel conjunctions, an idle engine can steal
+** parallel work from Mercury contexts. The following variables control the
+** maximum number of contexts that an idle engine will try to steal from
+** before resting, and how long to rest before attempting another steal.
+*/
+MR_Unsigned MR_worksteal_max_attempts = 24;
+MR_Unsigned MR_worksteal_sleep_msecs = 2;
 
 /* file names for mdb's debugger I/O streams */
 const char  *MR_mdb_in_filename = NULL;
@@ -610,12 +615,14 @@ mercury_runtime_init(int argc, char **argv)
         int i;
 
         MR_exit_now = MR_FALSE;
-        for (i = 1 ; i < MR_num_threads ; i++) {
+        MR_engines_to_start = MR_num_threads - 1;
+        for (i = 1; i < MR_num_threads; i++) {
             MR_create_thread(NULL);
         }
 
-        while (MR_num_idle_engines < MR_num_threads-1) {
-            /* busy wait until the worker threads are ready */
+        while (MR_engines_to_start > 0) {
+            /* wait until the worker threads are ready */
+            MR_sched_yield();
         }
     }
   #endif /* ! MR_LL_PARALLEL_CONJ */
@@ -1197,7 +1204,8 @@ enum MR_long_option {
     MR_GEN_DETSTACK_REDZONE_SIZE_KWORDS,
     MR_GEN_NONDETSTACK_REDZONE_SIZE,
     MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS,
-    MR_MAX_CONTEXTS_PER_THREAD,
+    MR_WORKSTEAL_MAX_ATTEMPTS,
+    MR_WORKSTEAL_SLEEP_MSECS,
     MR_MDB_TTY,
     MR_MDB_IN,
     MR_MDB_OUT,
@@ -1293,7 +1301,8 @@ struct MR_option MR_long_opts[] = {
         1, 0, MR_GEN_NONDETSTACK_REDZONE_SIZE },
     { "gen-nondetstack-zone-size-kwords",
         1, 0, MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS },
-    { "max-contexts-per-thread",        1, 0, MR_MAX_CONTEXTS_PER_THREAD },
+    { "worksteal-max-attempts",         1, 0, MR_WORKSTEAL_MAX_ATTEMPTS },
+    { "worksteal-sleep-msecs",          1, 0, MR_WORKSTEAL_SLEEP_MSECS },
     { "mdb-tty",                        1, 0, MR_MDB_TTY },
     { "mdb-in",                         1, 0, MR_MDB_IN },
     { "mdb-out",                        1, 0, MR_MDB_OUT },
@@ -1698,12 +1707,17 @@ MR_process_options(int argc, char **argv)
                 MR_gen_nondetstack_zone_size = size * sizeof(MR_Word);
                 break;
 
-            case MR_MAX_CONTEXTS_PER_THREAD:
-                if (sscanf(MR_optarg, "%lu", &size) != 1) {
+            case MR_WORKSTEAL_MAX_ATTEMPTS:
+                if (sscanf(MR_optarg, "%lu", &MR_worksteal_max_attempts) != 1)
+                {
                     MR_usage();
                 }
+                break;
 
-                MR_max_contexts_per_thread = size;
+            case MR_WORKSTEAL_SLEEP_MSECS:
+                if (sscanf(MR_optarg, "%lu", &MR_worksteal_sleep_msecs) != 1) {
+                    MR_usage();
+                }
                 break;
 
             case 'i':
@@ -2183,8 +2197,6 @@ MR_process_options(int argc, char **argv)
         }
     }
 
-    MR_max_outstanding_contexts = MR_max_contexts_per_thread * MR_num_threads;
-
     if (MR_lld_print_min > 0 || MR_lld_start_name != NULL) {
         MR_lld_print_enabled = 0;
     }
@@ -2747,9 +2759,6 @@ MR_define_label(global_fail);
 
 MR_define_label(all_done);
     assert(MR_runqueue_head == NULL);
-#ifdef MR_LL_PARALLEL_CONJ
-    assert(MR_wsdeque_is_empty(&MR_spark_queue));
-#endif
 
 #ifdef  MR_MPROF_PROFILE_TIME
     if (MR_profiling) {
diff --git a/runtime/mercury_wrapper.h b/runtime/mercury_wrapper.h
index 592810c..fb90b49 100644
--- a/runtime/mercury_wrapper.h
+++ b/runtime/mercury_wrapper.h
@@ -247,14 +247,9 @@ extern  double		MR_heap_expansion_factor;
 /* margin for the stack segment test (documented in mercury_wrapper.c) */
 extern	size_t		MR_stack_margin_size;
 
-/* number of outstanding contexts we can create per thread (soft limit) */
-extern	MR_Unsigned	MR_contexts_per_thread;
-
-/*
-** number of outstanding contexts we can create 
-** (MR_contexts_per_thread * MR_num_threads)
-*/
-extern	MR_Unsigned	MR_max_outstanding_contexts;
+/* work-stealing tunables */
+extern	MR_Unsigned 	MR_worksteal_max_attempts;
+extern	MR_Unsigned 	MR_worksteal_sleep_msecs;
 
 /* file names for the mdb debugging streams */
 extern	const char	*MR_mdb_in_filename;
diff --git a/runtime/mercury_wsdeque.h b/runtime/mercury_wsdeque.h
index eb3a95b..4b1acab 100644
--- a/runtime/mercury_wsdeque.h
+++ b/runtime/mercury_wsdeque.h
@@ -14,8 +14,7 @@
 
 #include "mercury_atomic_ops.h"
 
-/* XXX should experiment with these */
-#define MR_INITIAL_GLOBAL_SPARK_QUEUE_SIZE  4
+/* XXX should experiment with this */
 #define MR_INITIAL_LOCAL_SPARK_DEQUE_SIZE   8
 
 /*---------------------------------------------------------------------------*/
@@ -72,7 +71,8 @@ extern  void    MR_wsdeque_putback_bottom(MR_SparkDeque *dq,
 ** the owner of the deque.  Returns true if successful.
 */
 MR_INLINE
-MR_bool         MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Spark *ret_spark);
+MR_bool         MR_wsdeque_pop_bottom(MR_SparkDeque *dq,
+                    MR_Code **ret_spark_resume);
 
 /*
 ** Attempt to steal a spark from the top of the deque.
@@ -122,7 +122,7 @@ MR_wsdeque_push_bottom(MR_SparkDeque *dq, const MR_Spark *spark)
 }
 
 MR_INLINE MR_bool
-MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Spark *ret_spark)
+MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Code **ret_spark_resume)
 {
     MR_Integer              bot;
     MR_Integer              top;
@@ -143,7 +143,7 @@ MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Spark *ret_spark)
         return MR_FALSE;
     }
 
-    *ret_spark = MR_sa_element(arr, bot);
+    (*ret_spark_resume) = MR_sa_element(arr, bot).MR_spark_resume;
     if (size > 0) {
         return MR_TRUE;
     }


--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to:       mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions:          mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------



More information about the reviews mailing list