[m-rev.] Implement work-stealing

Paul Bone pbone at csse.unimelb.edu.au
Wed Dec 9 10:58:16 AEDT 2009


Implement work stealing.

This patch is heavily based on earlier, uncommitted work by Peter Wang.  It
has been updated so that it applies against the current version of the source.
A number of other changes have been made.  Peter's original ChangeLog
follows:

	Implement work stealing for parallel conjunctions.  This builds on an
	older patch which introduced work-stealing deques to the runtime but
	didn't perform work stealing.

	Previously when we came across a parallel conjunct, we would place a spark
	into either the _global spark queue_ or the _local spark stack_ of the
	Mercury context.  A spark on the global spark queue may be picked up for
	parallel execution by an idle Mercury engine, whereas a spark on a local
	spark stack is confined to execution in the context that originated it.

	The problem is that we have to decide, ahead of time, where to put a
	spark.  Ideally, we should have just enough sparks in the global queue to
	keep the available Mercury engines busy, and leave the rest of the sparks
	to execute in their original contexts since that is more efficient.  But
	we can't predict the future so have to make do with guesses using simple
	heuristics.  A bad decision, once made, cannot be reversed.  An engine may
	sit idle due to an empty global spark queue, even while there are sparks
	available in some local spark stacks.

	In the work stealing scheme, sparks are always placed into each context's
	_local spark deque_.  Idle engines actively try to steal sparks from
	random spark deques.  We don't need to make irreversible and potentially
	suboptimal decisions about where to put sparks.  Making a spark available
	for parallel execution is cheap and happens by default because of the
	work-stealing deques; putting a spark on a global queue implies
	synchronisation with other threads.  The downside is that idle engines
	need to expend more time and effort to find the work from multiple places
	instead of just one place.

	Practically, the new scheme seems to work as well as the old scheme and
	vice versa, except that the old scheme often required
	`--max-context-per-threads' to be set "correctly" to get good results.

	Only tested on x86-64, which has a relatively constrained memory model.

My modifications include:

	The difference between 'shared' and 'private' synchronisation terms has
	been removed.  All sync terms are assumed to be shared and thread-safe
	operations are used everywhere.  This allows us to remove complicated code
	used when a private synchronisation term became shared.  This may change
	the performance of thread stealing, in particular it may become slower due
	to the assumption that all sync terms are shared and therefore atomic
	operations must always be used when decrementing their count field.

	I've re-factored MR_do_join_and_continue, It is now much simpler as the
	conditional code in it enumerates the possible cases clearly.

This change bootchecks and successfully runs the test suite in asm_fast.gc
asm_fast.gc.par hlc.gc and hlc.par, no other grades where tested.  I have not
yet tested performance.

runtime/mercury_context.c:
runtime/mercury_context.h:
	Keep pointers to all spark deques in a flat array, so we have access
    to them for stealing.
	
	Added functions to manage the global array of spark deques.

	Modify MR_do_run_next, it now attempts to steal work from other context's
	spark queues.  Threads sleeping on the condition variable in
	MR_do_run_next now use a timed wait so they can wakeup and try to steal
	sparks.

	Re-factored MR_do_join_and_continue.
	
	MR_num_idle_engines is used by atomic operations, it has been made an
	MR_Integer so that it's size matches the expectations of the atomic
	operations we have defined.

	Modified the MR_SyncTerm and MR_Spark structures.  Sparks now point to
	their sync terms.  The perant stack pointer has been moved into the
	SyncTerm structure.  The MR_st_is_shared field in the MR_SyncTerm
	structure has been removed.

runtime/mercury_atomic_ops.c:
runtime/mercury_atomic_ops.h:
	Implement a new atomic operation: decrement integer and is zero.  On the
	x86/x86_64 one can't atomically decrement an integer and fetch the result
	in a single instruction, a loop with a 'compare and exchange' instruction
	is necessary.  However since we only want to test if the value has become
	zero after the decrement we can use the processor's flags.  This can be
	done in two instructions, but more importantly a loop is not required and
	only one instruction is atomic.

runtime/mercury_wrapper.c:
runtime/mercury_wrapper.h:
	Added runtime tunable options for work stealing.  These control the number
	of attempts an idle engine will make when looking for work, and the
	duration to sleep after failing to find any work.

runtime/mercury_thread.c:
runtime/mercury_thread.h:
	Added MR_COND_TIMED_WAIT, which waits on condition variables like
	MR_COND_WAIT except that it may time out.

runtime/mercury_wsdeque.h:
runtime/mercury_wsdeque.c:
	MR_wsdeque_pop_bottom now uses it's second argument to return the code
	address to jump to rather the whole spark.

runtime/mercury_conf.h.in:
configure.in:
	Test for sched_yield()

	Change the synchronisation term structure.

doc/user_guide.texi:
    Add commented out documentation for two new tunable parameters,
    `--worksteal-max-attempts' and `--worksteal-sleep-msecs'.
    Implementors may want to experiment with different values but end
    users shouldn't need to know about them.

Index: configure.in
===================================================================
RCS file: /home/mercury1/repository/mercury/configure.in,v
retrieving revision 1.556
diff -u -p -b -r1.556 configure.in
--- configure.in	3 Dec 2009 05:27:59 -0000	1.556
+++ configure.in	7 Dec 2009 00:04:40 -0000
@@ -1154,7 +1154,7 @@ mercury_check_for_functions \
         grantpt unlockpt ptsname tcgetattr tcsetattr ioctl \
         access sleep opendir readdir closedir mkdir symlink readlink \
         gettimeofday setenv putenv _putenv posix_spawn sched_setaffinity \
-        sched_getcpu
+        sched_getcpu sched_yield
 
 #-----------------------------------------------------------------------------#
 
@@ -1935,8 +1935,8 @@ AC_CACHE_VAL(mercury_cv_sync_term_size,
     int main() {
         struct {
             void        *orig_context;
-            int     count;
-            int         is_shared;
+            void                    *parent_sp;
+            $mercury_cv_word_type   count;
         } x;
         FILE *fp;
 
Index: doc/user_guide.texi
===================================================================
RCS file: /home/mercury1/repository/mercury/doc/user_guide.texi,v
retrieving revision 1.598
diff -u -p -b -r1.598 user_guide.texi
--- doc/user_guide.texi	5 Nov 2009 05:47:40 -0000	1.598
+++ doc/user_guide.texi	2 Dec 2009 10:22:45 -0000
@@ -9972,6 +9972,18 @@ multiplied by the word size in bytes.
 @c Sets the size of the redzone on the trail to @var{size} kilobytes
 @c multiplied by the word size in bytes.
 
+ at c @sp 1
+ at c @item --worksteal-max-attempts @var{attempts}
+ at c @findex --worksteal-max-attempts (runtime option)
+ at c Tells idle Mercury engines to attempt to steal parallel conjuncts
+ at c up to a maximum of @var{attempts} times before sleeping.
+
+ at c @sp 1
+ at c @item --worksteal-sleep-msecs @var{milliseconds}
+ at c @findex --worksteal-sleep-msecs (runtime option)
+ at c Sets the amount of time that an idle Mercury engine should sleep before
+ at c attempting more work-stealing attempts.
+
 @sp 1
 @item -i @var{filename}
 @itemx --mdb-in @var{filename}
Index: runtime/mercury_atomic_ops.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_atomic_ops.c,v
retrieving revision 1.7
diff -u -p -b -r1.7 mercury_atomic_ops.c
--- runtime/mercury_atomic_ops.c	3 Dec 2009 05:28:00 -0000	1.7
+++ runtime/mercury_atomic_ops.c	6 Dec 2009 23:12:52 -0000
@@ -68,6 +68,15 @@ MR_OUTLINE_DEFN(
     }
 )
 
+MR_OUTLINE_DEFN(
+    MR_bool
+    MR_atomic_dec_int_and_is_zero(volatile MR_Integer *addr)
+,
+    {
+        MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY;
+    }
+)
+
 #endif /* MR_LL_PARALLEL_CONJ */
 
 /*---------------------------------------------------------------------------*/
Index: runtime/mercury_atomic_ops.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_atomic_ops.h,v
retrieving revision 1.8
diff -u -p -b -r1.8 mercury_atomic_ops.h
--- runtime/mercury_atomic_ops.h	3 Dec 2009 05:28:00 -0000	1.8
+++ runtime/mercury_atomic_ops.h	8 Dec 2009 22:11:45 -0000
@@ -269,6 +269,61 @@ MR_atomic_sub_int(volatile MR_Integer *a
 #endif
 
 /*
+ * Decrement the integer at the pointed to address and set is_zero if it is
+ * zero after the decrement.  While fetching the value is more powerful on
+ * x86(_64) it requires a compare and exchange loop.
+ */
+MR_EXTERN_INLINE MR_bool 
+MR_atomic_dec_int_and_is_zero(volatile MR_Integer *addr);
+
+/*
+ * Note that on x86(_64) we have to use the sub instruction rather than the
+ * dec instruction because we need it to set the CPU flags.
+ */
+#if defined(__GNUC__) && defined(__x86_64__)
+
+    #define MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY                              \
+        do {                                                                \
+            char is_zero;                                                   \
+            __asm__(                                                        \
+                "lock; subq $1, %0; setz %1"                                \
+                : "=m"(*addr), "=q"(is_zero)                                \
+                : "m"(*addr)                                                \
+                );                                                          \
+            return (MR_bool)is_zero;                                        \
+        } while (0)
+
+#elif defined(__GNUC__) && defined(__i386__)
+    
+    #define MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY                              \
+        do {                                                                \
+            char is_zero;                                                   \
+            __asm__(                                                        \
+                "lock: subl $1, %0; setz %1"                                \
+                : "=m"(*addr), "=q"(is_zero)                                \
+                : "m"(*addr)                                                \
+                );                                                          \
+            return (MR_bool)is_zero;                                        \
+        } while (0)
+
+#elif __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
+
+    #define MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY                              \
+        do {                                                                \
+            is_zero = __sync_sub_and_fetch(addr, 1) == 0;                   \
+        } while (0)
+
+#endif
+
+#ifdef MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY
+    MR_EXTERN_INLINE MR_bool 
+    MR_atomic_dec_int_and_is_zero(volatile MR_Integer *addr)
+    {
+        MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY;
+    }
+#endif
+
+/*
  * Intel and AMD support a pause instruction that is roughly equivalent
  * to a no-op.  Intel recommend that it is used in spin-loops to improve
  * performance.  Without a pause instruction multiple simultaneous
Index: runtime/mercury_conf.h.in
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_conf.h.in,v
retrieving revision 1.67
diff -u -p -b -r1.67 mercury_conf.h.in
--- runtime/mercury_conf.h.in	3 Dec 2009 05:28:00 -0000	1.67
+++ runtime/mercury_conf.h.in	8 Dec 2009 22:17:52 -0000
@@ -272,6 +272,7 @@
 **	MR_HAVE_FESETROUND	we have the fesetround() function.
 **	MR_HAVE_SCHED_SETAFFINITY we have the sched_setaffinity() function.
 **	MR_HAVE_SCHED_GETCPU	we have the sched_getcpu() function (glibc specific).
+**	MR_HAVE_SCHED_YIELD	we have the sched_yield() function.
 */
 #undef	MR_HAVE_GETPID
 #undef	MR_HAVE_SETPGID
@@ -335,6 +336,7 @@
 #undef	MR_HAVE_FESETROUND
 #undef	MR_HAVE_SCHED_SETAFFINITY
 #undef	MR_HAVE_SCHED_GETCPU
+#undef	MR_HAVE_SCHED_YIELD
 
 /*
 ** We use mprotect() and signals to catch stack and heap overflows.
Index: runtime/mercury_context.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.c,v
retrieving revision 1.72
diff -u -p -b -r1.72 mercury_context.c
--- runtime/mercury_context.c	3 Dec 2009 05:28:00 -0000	1.72
+++ runtime/mercury_context.c	8 Dec 2009 22:36:52 -0000
@@ -35,6 +35,10 @@ ENDINIT
 #include <sched.h>
 #endif
 
+#ifdef MR_WIN32
+  #include <sys/timeb.h>    /* for _ftime() */
+#endif
+
 #include "mercury_memory_handlers.h"
 #include "mercury_context.h"
 #include "mercury_engine.h"             /* for `MR_memdebug' */
@@ -47,16 +51,24 @@ ENDINIT
 
 /*---------------------------------------------------------------------------*/
 
+static void
+MR_init_context_maybe_generator(MR_Context *c, const char *id,
+    MR_GeneratorPtr gen);
+
+/*---------------------------------------------------------------------------*/
+
+#ifdef  MR_LL_PARALLEL_CONJ
+static void
+MR_add_spark_deque(MR_SparkDeque *sd);
+static void
+MR_delete_spark_deque(const MR_SparkDeque *sd);
+static void
+MR_milliseconds_from_now(struct timespec *timeout, unsigned int msecs);
+#endif
+
 /*
-** The run queue and spark queue are protected and signalled with the
-** same lock and condition variable.
-**
-** The single sync term lock is used to prevent races in MR_join_and_continue.
-** The holder of the sync term lock may acquire the runqueue lock but not vice
-** versa.  (We could also have one sync term lock per context, and make
-** MR_join_and_continue acquire the sync term lock of the context that
-** originated the parallel conjunction, but contention for the single lock
-** doesn't seem to be an issue.)
+** The run queue is protected with MR_runqueue_lock and signalled with
+** MR_runqueue_cond.
 */
 MR_Context              *MR_runqueue_head;
 MR_Context              *MR_runqueue_tail;
@@ -64,10 +76,6 @@ MR_Context              *MR_runqueue_tai
   MercuryLock           MR_runqueue_lock;
   MercuryCond           MR_runqueue_cond;
 #endif
-#ifdef  MR_LL_PARALLEL_CONJ
-  MR_SparkDeque         MR_spark_queue;
-  MercuryLock           MR_sync_term_lock;
-#endif
 
 MR_PendingContext       *MR_pending_contexts;
 #ifdef  MR_THREAD_SAFE
@@ -130,13 +138,17 @@ static MR_Context       *free_small_cont
 #endif
 
 #ifdef  MR_LL_PARALLEL_CONJ
-int volatile MR_num_idle_engines = 0;
+MR_Integer volatile     MR_num_idle_engines = 0;
+MR_Unsigned volatile    MR_num_exited_engines = 0;
 int volatile MR_num_outstanding_contexts_and_global_sparks = 0;
 MR_Integer volatile MR_num_outstanding_contexts_and_all_sparks = 0;
 
-MR_Unsigned volatile MR_num_exited_engines = 0;
-
 static MercuryLock MR_par_cond_stats_lock;
+static MercuryLock      spark_deques_lock;
+static MR_SparkDeque    **MR_spark_deques = NULL;
+static MR_Integer       MR_max_spark_deques = 0;
+static MR_Integer       MR_victim_counter = 0;
+
 #endif
 
 /*---------------------------------------------------------------------------*/
@@ -170,8 +182,7 @@ MR_init_thread_stuff(void)
     pthread_mutex_init(&MR_global_lock, MR_MUTEX_ATTR);
     pthread_mutex_init(&MR_pending_contexts_lock, MR_MUTEX_ATTR);
   #ifdef MR_LL_PARALLEL_CONJ
-    MR_init_wsdeque(&MR_spark_queue, MR_INITIAL_GLOBAL_SPARK_QUEUE_SIZE);
-    pthread_mutex_init(&MR_sync_term_lock, MR_MUTEX_ATTR);
+    pthread_mutex_init(&spark_deques_lock, MR_MUTEX_ATTR);
     pthread_mutex_init(&MR_next_cpu_lock, MR_MUTEX_ATTR);
     #ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
     pthread_mutex_init(&MR_par_cond_stats_lock, MR_MUTEX_ATTR);
@@ -320,7 +331,7 @@ MR_finalize_thread_stuff(void)
 #endif
 
 #ifdef  MR_LL_PARALLEL_CONJ
-    pthread_mutex_destroy(&MR_sync_term_lock);
+    pthread_mutex_destroy(&spark_deques_lock);
 #endif
 
 #if defined(MR_THREAD_SAFE) && defined(MR_PROFILE_PARALLEL_EXECUTION_SUPPORT)
@@ -587,6 +598,7 @@ MR_init_context_maybe_generator(MR_Conte
     c->MR_ctxt_parent_sp = NULL;
     MR_init_wsdeque(&c->MR_ctxt_spark_deque,
         MR_INITIAL_LOCAL_SPARK_DEQUE_SIZE);
+    MR_add_spark_deque(&c->MR_ctxt_spark_deque);
   #endif /* MR_LL_PARALLEL_CONJ */
 
 #endif /* !MR_HIGHLEVEL_CODE */
@@ -715,12 +727,13 @@ MR_destroy_context(MR_Context *c)
         c->MR_ctxt_nondetstack_zone->MR_zone_min);
 #endif /* defined(MR_CONSERVATIVE_GC) && !defined(MR_HIGHLEVEL_CODE) */
 
-    MR_LOCK(&free_context_list_lock, "destroy_context");
 #ifdef MR_LL_PARALLEL_CONJ
     MR_num_outstanding_contexts_and_global_sparks--;
     MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
+    MR_delete_spark_deque(&c->MR_ctxt_spark_deque);
 #endif
 
+    MR_LOCK(&free_context_list_lock, "destroy_context");
     switch (c->MR_ctxt_size) {
         case MR_CONTEXT_SIZE_REGULAR:
             c->MR_ctxt_next = free_context_list;
@@ -744,12 +757,198 @@ MR_destroy_context(MR_Context *c)
     MR_UNLOCK(&free_context_list_lock, "destroy_context");
 }
 
+#ifdef MR_LL_PARALLEL_CONJ
+
+static void
+MR_add_spark_deque(MR_SparkDeque *sd)
+{
+    int slot;
+
+    MR_LOCK(&spark_deques_lock, "create_spark_deque");
+
+    for (slot = 0; slot < MR_max_spark_deques; slot++) {
+        if (MR_spark_deques[slot] == NULL) {
+            break;
+        }
+    }
+
+    if (slot == MR_max_spark_deques) {
+        if (MR_max_spark_deques == 0) {
+            MR_max_spark_deques = 1;
+        } else if (MR_max_spark_deques < 32) {
+            MR_max_spark_deques *= 2;
+        } else {
+            MR_max_spark_deques += 16;
+        }
+        MR_spark_deques = MR_GC_RESIZE_ARRAY(MR_spark_deques,
+            MR_SparkDeque *, MR_max_spark_deques);
+    }
+
+    MR_spark_deques[slot] = sd;
+
+    MR_UNLOCK(&spark_deques_lock, "create_spark_deque");
+}
+
+static void
+MR_delete_spark_deque(const MR_SparkDeque *sd)
+{
+    int i;
+
+    MR_LOCK(&spark_deques_lock, "delete_spark_deque");
+
+    for (i = 0; i < MR_max_spark_deques; i++) {
+        if (MR_spark_deques[i] == sd) {
+            MR_spark_deques[i] = NULL;
+            break;
+        }
+    }
+
+    MR_UNLOCK(&spark_deques_lock, "delete_spark_deque");
+}
+
+/* Search for a ready context which we can handle. */
+static MR_Context *
+MR_find_ready_context(MercuryThread thd, MR_Unsigned depth)
+{
+    MR_Context  *cur;
+    MR_Context  *prev;
+
+    cur = MR_runqueue_head;
+    /* XXX check pending io */
+    prev = NULL;
+    while (cur != NULL) {
+        if (cur->MR_ctxt_resume_owner_thread == thd &&
+            cur->MR_ctxt_resume_c_depth == depth)
+        {
+            cur->MR_ctxt_resume_owner_thread = (MercuryThread) NULL;
+            cur->MR_ctxt_resume_c_depth = 0;
+            break;
+        }
+
+        if (cur->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
+            break;
+        }
+
+        prev = cur;
+        cur = cur->MR_ctxt_next;
+    }
+
+    if (cur != NULL) {
+        if (prev != NULL) {
+            prev->MR_ctxt_next = cur->MR_ctxt_next;
+        } else {
+            MR_runqueue_head = cur->MR_ctxt_next;
+        }
+        if (MR_runqueue_tail == cur) {
+            MR_runqueue_tail = prev;
+        }
+    }
+
+    return cur;
+}
+
+static MR_bool
+MR_attempt_steal_spark(MR_Spark *spark)
+{
+    int             max_attempts;
+    int             attempt;
+    MR_SparkDeque   *victim;
+    int             steal_top;
+
+    /*
+    ** Protect against concurrent updates of MR_spark_deques and
+    ** MR_num_spark_deques.  This allows only one thread to try to steal
+    ** work at any time, which may be a good thing as it limits the
+    ** amount of wasted effort.
+    */
+    MR_LOCK(&spark_deques_lock, "attempt_steal_spark");
+
+    if (MR_max_spark_deques < MR_worksteal_max_attempts) {
+        max_attempts = MR_max_spark_deques;
+    } else {
+        max_attempts = MR_worksteal_max_attempts;
+    }
+
+    for (attempt = 0; attempt < max_attempts; attempt++) {
+        MR_victim_counter++;
+        victim = MR_spark_deques[MR_victim_counter % MR_max_spark_deques];
+        if (victim != NULL) {
+            steal_top = MR_wsdeque_steal_top(victim, spark);
+            if (steal_top == 1) {
+                /* Steal successful. */
+                MR_UNLOCK(&spark_deques_lock, "attempt_steal_spark");
+                return MR_TRUE;
+            }
+        }
+    }
+
+    MR_UNLOCK(&spark_deques_lock, "attempt_steal_spark");
+
+    /* Steal unsuccessful. */
+    return MR_FALSE;
+}
+
+static void
+MR_milliseconds_from_now(struct timespec *timeout, unsigned int msecs)
+{
+#if defined(MR_HAVE_GETTIMEOFDAY)
+
+    const long          NANOSEC_PER_SEC = 1000000000L;
+    struct timeval      now;
+    MR_int_least64_t    nanosecs;
+
+    gettimeofday(&now, NULL);
+    timeout->tv_sec = now.tv_sec;
+    nanosecs = ((MR_int_least64_t) (now.tv_usec + (msecs * 1000))) * 1000L;
+    if (nanosecs >= NANOSEC_PER_SEC) {
+        timeout->tv_sec++;
+        nanosecs %= NANOSEC_PER_SEC;
+    }
+    timeout->tv_nsec = (long) nanosecs;
+
+#elif defined(MR_WIN32)
+
+    const long          NANOSEC_PER_SEC = 1000000000L;
+    const long          NANOSEC_PER_MILLISEC = 1000000L;
+    struct _timeb       now;
+    MR_int_least64_t    nanosecs;
+
+    _ftime(&now);
+    timeout->tv_sec = now.time;
+    nanosecs = ((MR_int_least64_t) (msecs + now.millitm)) *
+ANOSEC_PER_MILLISEC;
+    if (nanosecs >= NANOSEC_PER_SEC) {
+        timeout->tv_sec++;
+        nanosecs %= NANOSEC_PER_SEC;
+    }
+    timeout->tv_nsec = (long) nanosecs;
+
+#else
+
+    #error Missing definition of MR_milliseconds_from_now.
+
+#endif
+}
+
+#endif  /* MR_LL_PARALLEL_CONJ */
+
 void 
 MR_flounder(void)
 {
     MR_fatal_error("computation floundered");
 }
 
+void
+MR_sched_yield(void)
+{
+#if defined(MR_HAVE_SCHED_YIELD)
+    sched_yield();
+#elif defined(MR_CAN_DO_PENDING_IO)
+    struct timeval timeout = {0, 1};
+    select(0, NULL, NULL, NULL, &timeout);
+#endif
+}
+
 /*
 ** Check to see if any contexts that blocked on IO have become
 ** runnable. Return the number of contexts that are still blocked.
@@ -883,20 +1082,6 @@ MR_schedule_context(MR_Context *ctxt)
     MR_UNLOCK(&MR_runqueue_lock, "schedule_context");
 }
 
-#ifdef MR_LL_PARALLEL_CONJ
-void
-MR_schedule_spark_globally(const MR_Spark *proto_spark)
-{
-    MR_LOCK(&MR_runqueue_lock, "schedule_spark_globally");
-    MR_wsdeque_push_bottom(&MR_spark_queue, proto_spark);
-    MR_num_outstanding_contexts_and_global_sparks++;
-    MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);
-    MR_SIGNAL(&MR_runqueue_cond, "schedule_spark_globally");
-    MR_UNLOCK(&MR_runqueue_lock, "schedule_spark_globally");
-}
-#endif /* !MR_LL_PARALLEL_CONJ */
-
-
 #ifndef MR_HIGHLEVEL_CODE
 
 MR_define_extern_entry(MR_do_runnext);
@@ -908,12 +1093,12 @@ MR_BEGIN_CODE
 MR_define_entry(MR_do_runnext);
 #ifdef MR_THREAD_SAFE
 {
-    MR_Context      *tmp;
-    MR_Context      *prev;
+    MR_Context      *ready_context;
+    MR_Code         *resume_point;
     MR_Spark        spark;
-    unsigned        depth;
+    MR_Unsigned     depth;
     MercuryThread   thd;
-    int             wait_result;
+    struct timespec timeout;
 
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
     MR_Timer        runnext_timer;
@@ -932,9 +1117,9 @@ MR_define_entry(MR_do_runnext);
     depth = MR_ENGINE(MR_eng_c_depth);
     thd = MR_ENGINE(MR_eng_owner_thread);
 
-    MR_LOCK(&MR_runqueue_lock, "MR_do_runnext (i)");
+    MR_atomic_inc_int(&MR_num_idle_engines);
 
-    MR_num_idle_engines++;
+    MR_LOCK(&MR_runqueue_lock, "MR_do_runnext (i)");
 
     while (1) {
 
@@ -953,37 +1138,22 @@ MR_define_entry(MR_do_runnext);
             MR_destroy_thread(MR_cur_engine());
             MR_num_exited_engines++;
             MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (ii)");
+            MR_atomic_dec_int(&MR_num_idle_engines);
             pthread_exit(0);
         }
 
-        /* Search for a ready context which we can handle. */
-        tmp = MR_runqueue_head;
-        /* XXX check pending io */
-        prev = NULL;
-        while (tmp != NULL) {
-            if (tmp->MR_ctxt_resume_owner_thread == thd && 
-                tmp->MR_ctxt_resume_c_depth == depth)
-            {
-                tmp->MR_ctxt_resume_owner_thread = (MercuryThread) NULL;
-                tmp->MR_ctxt_resume_c_depth = 0;
-                MR_num_idle_engines--;
-                goto ReadyContext;
-            }
-
-            if (tmp->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
-                MR_num_idle_engines--;
+        ready_context = MR_find_ready_context(thd, depth);
+        if (ready_context != NULL) {
+            MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
+            MR_atomic_dec_int(&MR_num_idle_engines);
                 goto ReadyContext;
             }
-
-            prev = tmp;
-            tmp = tmp->MR_ctxt_next;
-        }
-
-        /* Check if the global spark queue is nonempty. */
-        if (MR_wsdeque_take_top(&MR_spark_queue, &spark)) {
-            MR_num_idle_engines--;
-            MR_num_outstanding_contexts_and_global_sparks--;
-            MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
+        /*
+        ** No suitable ready contexts, so try to steal a spark instead.
+        */
+        if (MR_attempt_steal_spark(&spark)) {
+            MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iv)");
+            MR_atomic_dec_int(&MR_num_idle_engines);
             goto ReadySpark;
         }
 
@@ -994,24 +1164,16 @@ MR_define_entry(MR_do_runnext);
                     &MR_profile_parallel_executed_nothing);
         }
 #endif
-        do {
-            wait_result = 
-                MR_WAIT(&MR_runqueue_cond, &MR_runqueue_lock, "do_runnext");
-        } while (wait_result != 0);
+        
+        MR_milliseconds_from_now(&timeout, MR_worksteal_sleep_msecs);
+        MR_TIMED_WAIT(&MR_runqueue_cond, &MR_runqueue_lock, &timeout,
+            "do_runnext");
     }
+    /* unreachable */
+    abort();
 
   ReadyContext:
 
-    if (prev != NULL) {
-        prev->MR_ctxt_next = tmp->MR_ctxt_next;
-    } else {
-        MR_runqueue_head = tmp->MR_ctxt_next;
-    }
-    if (MR_runqueue_tail == tmp) {
-        MR_runqueue_tail = prev;
-    }
-    MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
-
     /* Discard whatever unused context we may have and switch to tmp. */
     if (MR_ENGINE(MR_eng_this_context) != NULL) {
         MR_destroy_context(MR_ENGINE(MR_eng_this_context));
@@ -1022,13 +1184,40 @@ MR_define_entry(MR_do_runnext);
                 &MR_profile_parallel_executed_contexts);
     }
 #endif
-    MR_ENGINE(MR_eng_this_context) = tmp;
-    MR_load_context(tmp);
-    MR_GOTO(tmp->MR_ctxt_resume);
+    MR_ENGINE(MR_eng_this_context) = ready_context;
+    MR_load_context(ready_context);
+
+    resume_point = (MR_Code*)(ready_context->MR_ctxt_resume);
+    ready_context->MR_ctxt_resume = NULL;
+    MR_GOTO(resume_point);
 
   ReadySpark:
 
-    MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
+#if 0 /* This is a complicated optimisation that may not be worth-while */
+    if (!spark.MR_spark_sync_term->MR_st_is_shared) {
+        spark.MR_spark_sync_term_is_shared = MR_TRUE;
+        /*
+        ** If we allow the stolen spark (New) to execute immediately
+        ** there could be a race with a sibling conjunct (Old) which is
+        ** currently executing, e.g.
+        **
+        ** 1. Old enters MR_join_and_continue(), loads old value of
+        **    MR_st_count;
+        ** 2. New begins executing;
+        ** 3. New enters MR_join_and_continue(), decrements MR_st_count
+        **    atomically;
+        ** 4. Old decrements MR_st_count *non-atomically* based on the
+        **    old value of MR_st_count.
+        **
+        ** Therefore this loop delays the new spark from executing
+        ** while there is another conjunct in MR_join_and_continue()
+        ** which might decrement MR_st_count non-atomically.
+        */
+        while (spark.MR_spark_sync_term->MR_st_attempt_cheap_join) {
+            MR_sched_yield();
+        }
+    }
+#endif
 
     /* Grab a new context if we haven't got one then begin execution. */
     if (MR_ENGINE(MR_eng_this_context) == NULL) {
@@ -1047,9 +1236,13 @@ MR_define_entry(MR_do_runnext);
         MR_threadscope_post_run_context();
 #endif
     }
-    MR_parent_sp = spark.MR_spark_parent_sp;
-    MR_assert(MR_parent_sp != MR_sp);
+    MR_parent_sp = spark.MR_spark_sync_term->MR_st_parent_sp;
     MR_SET_THREAD_LOCAL_MUTABLES(spark.MR_spark_thread_local_mutables);
+   
+    MR_assert(MR_parent_sp);
+    MR_assert(MR_parent_sp != MR_sp);
+    MR_assert(spark.MR_spark_sync_term->MR_st_count > 0);
+
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
     if (MR_profile_parallel_execution) {
         MR_profiling_stop_timer(&runnext_timer, 
@@ -1077,7 +1270,7 @@ MR_define_entry(MR_do_runnext);
     MR_load_context(MR_ENGINE(MR_eng_this_context));
     MR_GOTO(MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume);
 }
-#endif
+#endif /* !MR_THREAD_SAFE */
 
 MR_END_MODULE
 
@@ -1087,135 +1280,88 @@ MR_END_MODULE
 MR_Code*
 MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label) 
 {
+    MR_bool     jnc_last;
+    MR_Context  *this_context = MR_ENGINE(MR_eng_this_context);
+
+    /*
+     * Atomically decrement and fetch the number of conjuncts yet to complete.
+     * If we're the last conjunct to complete (the parallel conjunction is
+     * finished) then jnc_last will be true.
+     */
     /*
      * XXX: We should take the current TSC time here and use it to post the
      * various 'context stopped' threadscope events.  This profile will be more
      * accurate. 
      */
-    if (!jnc_st->MR_st_is_shared) {
-        /* This parallel conjunction has only executed sequentially. */
-        if (--jnc_st->MR_st_count == 0) {
-            /*
-            ** It has finished executing, continue execution from the join
-            ** label.
-            */
-            return join_label;
-        } else {
-            /*
-            ** It has not finished executing.  Try to finish it by executing
-            ** our sparks.
-            ** This code was formerly MR_join_and_continue_1() 
-            */
-            MR_Context  *jnc_ctxt;
-            MR_bool     jnc_popped;
-            MR_Spark    jnc_spark;
-
-            jnc_ctxt = MR_ENGINE(MR_eng_this_context);
-            jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
-                &jnc_spark);
-            if (jnc_popped) {
-                MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
-                return jnc_spark.MR_spark_resume;
-            } else {
-                /*
-                ** Someone's stolen our sparks, we should try to execute
-                ** something that's been scheduled globally.  XXX: We don't yet
-                ** have work stealing so how can this happen!?
-                */
-                fprintf(stderr, "My sparks have been stolen!! %lp", pthread_self());
-                return MR_ENTRY(MR_do_runnext);
-            }
-        }
-    } else {
-        /* This parallel conjunction may be executing in parallel. */
-        MR_LOCK(&MR_sync_term_lock, "continue");
-        if (--jnc_st->MR_st_count == 0) {
-            /* This parallel conjunction has finished. */
-            if (MR_ENGINE(MR_eng_this_context) == jnc_st->MR_st_orig_context) {
+
+    jnc_last = MR_atomic_dec_int_and_is_zero(&(jnc_st->MR_st_count));
+
+    if (jnc_last) {
+        if (this_context == jnc_st->MR_st_orig_context) {
                 /*
-                ** This context originated this parallel conjunction and all
-                ** the branches have finished so jump to the join label.
+            ** It has finished executing and we're the originating context.  Jump
+            ** to the join label.
                 */
-                MR_UNLOCK(&MR_sync_term_lock, "continue i");
                 return join_label;
             } else {
-                /*
-                ** This context didn't originate this parallel conjunction and
-                ** we're the last branch to finish.  The originating context
-                ** should be suspended waiting for us to finish, so wake it up.
-                */
-                jnc_st->MR_st_orig_context->MR_ctxt_resume = join_label;
-                MR_schedule_context(jnc_st->MR_st_orig_context);
-                MR_UNLOCK(&MR_sync_term_lock, "continue ii");
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
                 MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
 #endif
+            /*
+            ** This context didn't originate this parallel conjunction and
+            ** we're the last branch to finish.  The originating context should
+            ** be suspended waiting for us to finish, so wake it up.
+            **
+            ** We could be racing with the original context, in which case we
+            ** have to make sure that it is ready to be scheduled before we
+            ** schedule it.  It will set its resume point to join_label to
+            ** indicate that it is ready.
+            */
+            while (jnc_st->MR_st_orig_context->MR_ctxt_resume != join_label) {
+                /* XXX: Need to configure using sched_yeild or spin waiting */
+                MR_ATOMIC_PAUSE;
+            }
+            MR_schedule_context(jnc_st->MR_st_orig_context);
                 return MR_ENTRY(MR_do_runnext);
             }
         } else {
+        MR_bool     popped;
+        MR_Code     *spark_resume;
+        
             /*
-            ** The parallel conjunction is being executed in parallel but it is
-            ** not yet finished.  This code was Formerly
-            ** MR_join_and_continue_2() 
-            */
-            MR_Context  *jnc_ctxt;
-            MR_bool     jnc_popped;
-            MR_Spark    jnc_spark;
-
-            jnc_ctxt = MR_ENGINE(MR_eng_this_context);
-            jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
-                &jnc_spark);
-            if (jnc_popped && (jnc_spark.MR_spark_parent_sp == MR_parent_sp)) {
-                /*
-                ** The spark at the top of the stack is from to the same parallel
-                ** conjunction that we've just been executing. We can immediately
-                ** execute the next branch of the same parallel conjunction in
-                ** the current context.
+         * The parallel conjunction it is not yet finished.  Try to work on a
+         * spark from our local stack.  The sparks on our stack are likely to
+         * cause this conjunction to be complete.
                 */
-                MR_UNLOCK(&MR_sync_term_lock, "continue_2 i");
+        popped = MR_wsdeque_pop_bottom(&this_context->MR_ctxt_spark_deque, &spark_resume);
+        if (popped) {
                 MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
-                return jnc_spark.MR_spark_resume;
+            return spark_resume;
             } else {
                 /*
-                ** The spark stack is empty or the next spark is from a different
-                ** parallel conjunction to the one we've been executing.  Either
-                ** way, there's nothing more we can do with this context right
-                ** now.  Put back the spark we won't be using.
-                */
-                if (jnc_popped) {
-                    MR_wsdeque_putback_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
-                        &jnc_spark);
-                }
-                /*
                 ** If this context originated the parallel conjunction we've been
-                ** executing, the rest of the parallel conjunction must have been
-                ** put on the global spark queue to be executed in other
-                ** contexts.  This context will need to be resumed once the
-                ** parallel conjunction is completed, so suspend the context.
+            ** executing, suspend this context such that it will be resumed   
+            ** at the join label once the parallel conjunction is completed.  
                 **
-                ** What if the other conjuncts where put on the global queue
-                ** but haven't yet been taken by other threads?  Then this step
-                ** is redundant.  It's not worth fixing, this problem will go
-                ** away once we enable work-stealing. - pbone. 
+            ** Otherwise we can reuse this context for the next piece of work.
                 */
-                if (jnc_ctxt == jnc_st->MR_st_orig_context) {
+            if (this_context == jnc_st->MR_st_orig_context) {
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
                     MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
 #endif
-                    MR_save_context(jnc_ctxt);
+                MR_save_context(this_context);
+                /* XXX: Make sure the context gets saved before we set the join
+                 * label, use a memory barrier.*/
+                this_context->MR_ctxt_resume = (join_label);
                     MR_ENGINE(MR_eng_this_context) = NULL;
                 } else {
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
                     MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
 #endif
                 }
-
-                /* Finally look for other work. */
-                MR_UNLOCK(&MR_sync_term_lock, "continue_2 ii");
                 return MR_ENTRY(MR_do_runnext);
             }
         }
-    }
 }
 #endif
 
Index: runtime/mercury_context.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.h,v
retrieving revision 1.57
diff -u -p -b -r1.57 mercury_context.h
--- runtime/mercury_context.h	3 Dec 2009 05:28:00 -0000	1.57
+++ runtime/mercury_context.h	8 Dec 2009 23:12:22 -0000
@@ -202,22 +202,6 @@
 **                  (Accessed via MR_eng_this_context.)
 */
 
-/*
-** A spark contains just enough information to begin execution of a parallel
-** conjunct.  Sparks exist on a context's local spark deque or in the global
-** spark queue.  In the former case, a spark will eventually be executed in the
-** same context (same detstack, etc.) as the code that generated the spark. In
-** the latter case the spark can be picked up and executed by any idle engine
-** in a different context.
-**
-** In the current implementation a spark is put on the global spark queue if,
-** at the time a fork instruction is reached, we think the spark has a chance
-** of being picked up for execution by an idle engine.  Otherwise the spark
-** goes on the context's spark stack. At the moment this is an irrevocable
-** decision. A future possibility is to allow idle engines to steal work
-** from the cold end of some context's spark deque.
-*/
-
 typedef struct MR_Context_Struct        MR_Context;
 
 typedef enum {
@@ -237,13 +221,21 @@ struct MR_SavedOwner_Struct {
 
 
 #ifdef MR_LL_PARALLEL_CONJ
+typedef struct MR_SyncTerm_Struct       MR_SyncTerm;
 typedef struct MR_Spark_Struct          MR_Spark;
 typedef struct MR_SparkDeque_Struct     MR_SparkDeque;
 typedef struct MR_SparkArray_Struct     MR_SparkArray;
 
+/*
+** A spark contains just enough information to begin execution of a parallel
+** conjunct.  A spark will either be executed in the same context (same
+** detstack, etc.) as the code that generated the spark, or it may be stolen
+** from its deque and executed by any idle engine in a different context.
+*/
+
 struct MR_Spark_Struct {
+    MR_SyncTerm             *MR_spark_sync_term;
     MR_Code                 *MR_spark_resume;
-    MR_Word                 *MR_spark_parent_sp;
     MR_ThreadLocalMuts      *MR_spark_thread_local_mutables;
 };
 
@@ -261,7 +253,14 @@ struct MR_Context_Struct {
 #endif
     MR_ContextSize      MR_ctxt_size;
     MR_Context          *MR_ctxt_next;
+#ifdef  MR_LL_PARALLEL_CONJ
+    /*
+    ** The value of this field is used for synchronization.
+    */
+    MR_Code * volatile  MR_ctxt_resume;
+#else
     MR_Code             *MR_ctxt_resume;
+#endif
 #ifdef  MR_THREAD_SAFE
     MercuryThread       MR_ctxt_resume_owner_thread;
     MR_Unsigned         MR_ctxt_resume_c_depth;
@@ -332,10 +331,6 @@ struct MR_Context_Struct {
 
 /*
 ** The runqueue is a linked list of contexts that are runnable.
-** The spark_queue is an array of sparks that are runnable.
-** We keep them separate to prioritise contexts (which are mainly
-** computations which have already started) over sparks (which are
-** computations which have not begun).
 */
 
 extern      MR_Context  *MR_runqueue_head;
@@ -345,8 +340,6 @@ extern      MR_Context  *MR_runqueue_tai
   extern    MercuryCond MR_runqueue_cond;
 #endif
 #ifdef  MR_LL_PARALLEL_CONJ
-  extern    MR_SparkDeque   MR_spark_queue;
-  extern    MercuryLock     MR_sync_term_lock;
   extern    MR_bool         MR_thread_pinning;
 #endif
 
@@ -400,7 +393,7 @@ extern  MR_PendingContext   *MR_pending_
   ** XXX We may need to use atomic instructions or memory fences on some
   ** architectures.
   */
-  extern volatile int   MR_num_idle_engines;
+  extern volatile MR_Integer MR_num_idle_engines;
 
   /*
   ** The number of contexts that are not in the free list (i.e. are executing
@@ -489,17 +482,15 @@ extern  void        MR_finalize_thread_s
 extern  void        MR_flounder(void);
 
 /*
+** Relinquish the processor voluntarily without blocking.
+*/
+extern  void        MR_sched_yield(void);
+
+/*
 ** Append the given context onto the end of the run queue.
 */
 extern  void        MR_schedule_context(MR_Context *ctxt);
 
-#ifdef MR_LL_PARALLEL_CONJ
-  /*
-  ** Append the given spark onto the end of the spark queue.
-  */
-  extern    void    MR_schedule_spark_globally(const MR_Spark *spark);
-#endif /* !MR_LL_PARALLEL_CONJ */
-
 #ifndef MR_HIGHLEVEL_CODE
   MR_declare_entry(MR_do_runnext);
   #define MR_runnext()                          \
@@ -733,7 +724,7 @@ extern  void        MR_schedule_context(
         /* it wouldn't be appropriate to copy the resume field */             \
         to_cptr->MR_ctxt_thread_local_mutables =                              \
             from_cptr->MR_ctxt_thread_local_mutables;                         \
-        /* it wouldn't be appropriate to copy the spark_queue field */        \
+        /* it wouldn't be appropriate to copy the spark_deque field */        \
         /* it wouldn't be appropriate to copy the saved_owners field */       \
     } while (0)
 
@@ -744,19 +735,14 @@ extern  void        MR_schedule_context(
   /*
   ** If you change MR_SyncTerm_Struct you need to update configure.in.
   **
-  ** MR_st_count is `int' so that on a 64-bit machine the total size of the
-  ** sync term is two words, not three words (assuming `int' is 32 bits).
-  **
-  ** XXX we should remove that assumption but it's a little tricky because
-  ** configure needs to understand the types as well
+  ** MR_st_count is manipulated via atomic operations, therefore it is declared
+  ** as volatile and an MR_Integer.
   */
 
-  typedef struct MR_SyncTerm_Struct MR_SyncTerm;
-
   struct MR_SyncTerm_Struct {
     MR_Context      *MR_st_orig_context;
-    volatile int    MR_st_count;
-    volatile int    MR_st_is_shared;
+    MR_Word             *MR_st_parent_sp;
+    volatile MR_Integer MR_st_count;
   };
 
   #define MR_init_sync_term(sync_term, nbranches)                             \
@@ -764,43 +750,31 @@ extern  void        MR_schedule_context(
         MR_SyncTerm *init_st = (MR_SyncTerm *) &(sync_term);                  \
                                                                               \
         init_st->MR_st_orig_context = MR_ENGINE(MR_eng_this_context);         \
+        init_st->MR_st_parent_sp = MR_parent_sp;                              \
         init_st->MR_st_count = (nbranches);                                   \
-        init_st->MR_st_is_shared = MR_FALSE;                                  \
     } while (0)
 
   /*
   ** fork_new_child(MR_SyncTerm st, MR_Code *child):
   **
   ** Create a new spark to execute the code at `child'.  The new spark is put
-  ** on the global spark queue or the context-local spark deque.  The current
-  ** context resumes at `parent'.  MR_parent_sp must already be set
-  ** appropriately before this instruction is executed.
-  **
-  ** If the spark ends up on the global spark queue then we set
-  ** `MR_st_is_shared' to true as branches of this parallel conjunction could
-  ** be executed in parallel.
+  ** on the context's spark queue.  The current context resumes at `parent'.
+  ** MR_parent_sp must already be set appropriately before this instruction
+  ** is executed.
   */
   #define MR_fork_new_child(sync_term, child)                                 \
     do {                                                                      \
         MR_Spark fnc_spark;                                                   \
+        MR_SparkDeque   *fnc_deque;                                           \
                                                                               \
+        fnc_spark.MR_spark_sync_term = (MR_SyncTerm*) &(sync_term);           \
         fnc_spark.MR_spark_resume = (child);                                  \
-        fnc_spark.MR_spark_parent_sp = MR_parent_sp;                          \
         fnc_spark.MR_spark_thread_local_mutables = MR_THREAD_LOCAL_MUTABLES;  \
-        if (MR_fork_globally_criteria) {                                      \
-            MR_SyncTerm *fnc_st = (MR_SyncTerm *) &(sync_term);               \
-            fnc_st->MR_st_is_shared = MR_TRUE;                                \
-            MR_schedule_spark_globally(&fnc_spark);                           \
-        } else {                                                              \
-            MR_schedule_spark_locally(&fnc_spark);                            \
-        }                                                                     \
+        fnc_deque = &MR_ENGINE(MR_eng_this_context)->MR_ctxt_spark_deque;     \
+        MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);       \
+        MR_wsdeque_push_bottom(fnc_deque, &fnc_spark);                        \
     } while (0)
 
-  #define MR_fork_globally_criteria                                           \
-    (MR_num_idle_engines != 0 &&                                              \
-     MR_num_outstanding_contexts_and_global_sparks <                          \
-            MR_max_outstanding_contexts)
-
   /*
   ** These macros may be used as conditions for runtime parallelism decisions.
   ** They return nonzero when parallelism is recommended (because there are
@@ -812,19 +786,6 @@ extern  void        MR_schedule_context(
   #define MR_par_cond_contexts_and_all_sparks_vs_num_cpus(target_cpus)        \
       (MR_num_outstanding_contexts_and_all_sparks < target_cpus)
 
-  #define MR_schedule_spark_locally(spark)                                    \
-    do {                                                                      \
-        MR_Context  *ssl_ctxt;                                                \
-                                                                              \
-        /*                                                                    \
-        ** Only the engine running the context is allowed to access           \
-        ** the context's spark stack, so no locking is required here.         \
-        */                                                                    \
-        ssl_ctxt = MR_ENGINE(MR_eng_this_context);                            \
-        MR_wsdeque_push_bottom(&ssl_ctxt->MR_ctxt_spark_deque, (spark));      \
-        MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);       \
-    } while (0)
-
 extern MR_Code* 
 MR_do_join_and_continue(MR_SyncTerm *sync_term, MR_Code *join_label);
 
Index: runtime/mercury_thread.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.c,v
retrieving revision 1.37
diff -u -p -b -r1.37 mercury_thread.c
--- runtime/mercury_thread.c	3 Dec 2009 05:28:00 -0000	1.37
+++ runtime/mercury_thread.c	8 Dec 2009 23:40:49 -0000
@@ -30,7 +30,7 @@
   MercuryLock       MR_global_lock;
 #endif
 
-MR_bool             MR_exit_now;
+volatile MR_bool    MR_exit_now;
 MR_bool             MR_debug_threads = MR_FALSE;
 
 MR_Unsigned         MR_num_thread_local_mutables = 0;
@@ -212,6 +212,10 @@ MR_destroy_thread(void *eng0)
 #endif
 
 #if defined(MR_THREAD_SAFE)
+/*
+** XXX: maybe shese should only be conditionally compiled when MR_DEBUG_THREADS
+** is also set. - pbone 
+*/
 
 int
 MR_mutex_lock(MercuryLock *lock, const char *from)
@@ -273,6 +277,19 @@ MR_cond_wait(MercuryCond *cond, MercuryL
     return err;
 }
 
+int
+MR_cond_timed_wait(MercuryCond *cond, MercuryLock *lock, 
+    const struct timespec *abstime, const char *from)
+{
+    int err;
+    
+    fprintf(stderr, "%ld timed-waiting on cond: %p lock: %p (%s)\n",
+        (long)pthread_self(), cond, lock, from);
+    err = pthread_cond_timedwait(cond, lock, abstime);
+    fprintf(stderr, "%ld timed-wait returned %d\n", err);
+    return err;
+}
+
 #endif  /* MR_THREAD_SAFE */
 
 MR_Unsigned
Index: runtime/mercury_thread.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.h,v
retrieving revision 1.28
diff -u -p -b -r1.28 mercury_thread.h
--- runtime/mercury_thread.h	27 Nov 2009 03:51:20 -0000	1.28
+++ runtime/mercury_thread.h	6 Dec 2009 07:50:21 -0000
@@ -41,6 +41,9 @@ extern int
 MR_cond_broadcast(MercuryCond *cond, const char *from);
 extern int
 MR_cond_wait(MercuryCond *cond, MercuryLock *lock, const char *from);
+int
+MR_cond_timed_wait(MercuryCond *cond, MercuryLock *lock, 
+    const struct timespec *abstime, const char *from);
 
   extern MR_bool    MR_debug_threads;
 
@@ -56,6 +59,8 @@ MR_cond_wait(MercuryCond *cond, MercuryL
     #define MR_SIGNAL(cnd, from)    pthread_cond_signal((cnd))
     #define MR_BROADCAST(cnd, from) pthread_cond_broadcast((cnd))
     #define MR_WAIT(cnd, mtx, from) pthread_cond_wait((cnd), (mtx))
+    #define MR_TIMED_WAIT(cond, mtx, abstime, from)                         \
+        pthread_cond_timedwait((cond), (mtx), (abstime))
   #else
     #define MR_LOCK(lck, from)                          \
                 ( MR_debug_threads ?                    \
@@ -88,6 +93,12 @@ MR_cond_wait(MercuryCond *cond, MercuryL
                 :                                       \
                     pthread_cond_wait((cnd), (mtx))     \
                 )
+    #define MR_TIMED_WAIT(cond, mtx, abstime, from)                         \
+        ( MR_debug_threads ?                                                \
+            MR_cond_timed_wait((cond), (mtx), (abstime), (from))            \
+        :                                                                   \
+            pthread_cond_timedwait((cond), (mtx), (abstime))                \
+        )
   #endif
 
     /*
@@ -127,7 +138,7 @@ MR_cond_wait(MercuryCond *cond, MercuryL
 
   extern MercuryThread      *MR_create_thread(MR_ThreadGoal *);
   extern void               MR_destroy_thread(void *eng);
-  extern MR_bool            MR_exit_now;
+  extern volatile MR_bool   MR_exit_now;
 
   /*
   ** The primordial thread. Currently used for debugging.
Index: runtime/mercury_wrapper.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.c,v
retrieving revision 1.203
diff -u -p -b -r1.203 mercury_wrapper.c
--- runtime/mercury_wrapper.c	3 Dec 2009 05:28:00 -0000	1.203
+++ runtime/mercury_wrapper.c	6 Dec 2009 23:12:52 -0000
@@ -200,6 +200,17 @@ size_t      MR_pcache_size = 8192;
 MR_Unsigned MR_max_contexts_per_thread = 2;
 MR_Unsigned MR_max_outstanding_contexts;
 
+#ifdef MR_LL_PARALLEL_CONJ
+/*
+** In grades that support parallel conjunctions, an idle engine can steal
+** parallel work from Mercury contexts. The following variables control the
+** maximum number of contexts that an idle engine will try to steal from
+** before resting, and how long to rest before attempting another steal.
+*/
+MR_Unsigned MR_worksteal_max_attempts = 24;
+MR_Unsigned MR_worksteal_sleep_msecs = 2;
+#endif
+
 /* file names for mdb's debugger I/O streams */
 const char  *MR_mdb_in_filename = NULL;
 const char  *MR_mdb_out_filename = NULL;
@@ -1240,6 +1251,8 @@ enum MR_long_option {
     MR_GEN_NONDETSTACK_REDZONE_SIZE,
     MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS,
     MR_MAX_CONTEXTS_PER_THREAD,
+    MR_WORKSTEAL_MAX_ATTEMPTS,
+    MR_WORKSTEAL_SLEEP_MSECS,
     MR_THREAD_PINNING,
     MR_PROFILE_PARALLEL_EXECUTION,
     MR_MDB_TTY,
@@ -1340,6 +1353,8 @@ struct MR_option MR_long_opts[] = {
     { "gen-nondetstack-zone-size-kwords",
         1, 0, MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS },
     { "max-contexts-per-thread",        1, 0, MR_MAX_CONTEXTS_PER_THREAD },
+    { "worksteal-max-attempts",         1, 0, MR_WORKSTEAL_MAX_ATTEMPTS },
+    { "worksteal-max-attempts",         1, 0, MR_WORKSTEAL_SLEEP_MSECS },
     { "thread-pinning",                 0, 0, MR_THREAD_PINNING },
     { "profile-parallel-execution",     0, 0, MR_PROFILE_PARALLEL_EXECUTION },
     { "mdb-tty",                        1, 0, MR_MDB_TTY },
@@ -1756,6 +1771,22 @@ MR_process_options(int argc, char **argv
                 MR_max_contexts_per_thread = size;
                 break;
 
+            case MR_WORKSTEAL_MAX_ATTEMPTS:
+#ifdef MR_LL_PARALLEL_CONJ
+                if (sscanf(MR_optarg, "%lu", &MR_worksteal_max_attempts) != 1) {
+                    MR_usage();
+                }
+#endif
+                break;
+
+            case MR_WORKSTEAL_SLEEP_MSECS:
+#ifdef MR_LL_PARALLEL_CONJ
+                if (sscanf(MR_optarg, "%lu", &MR_worksteal_sleep_msecs) != 1) {
+                    MR_usage();
+                }
+#endif
+                break;
+
             case MR_THREAD_PINNING:
 #if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
                 MR_thread_pinning = MR_TRUE;
@@ -2826,9 +2857,6 @@ MR_define_label(global_fail);
 
 MR_define_label(all_done);
     assert(MR_runqueue_head == NULL);
-#ifdef MR_LL_PARALLEL_CONJ
-    assert(MR_wsdeque_is_empty(&MR_spark_queue));
-#endif
 
 #ifdef  MR_MPROF_PROFILE_TIME
     if (MR_profiling) {
Index: runtime/mercury_wrapper.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.h,v
retrieving revision 1.82
diff -u -p -b -r1.82 mercury_wrapper.h
--- runtime/mercury_wrapper.h	17 Jun 2009 03:26:00 -0000	1.82
+++ runtime/mercury_wrapper.h	5 Dec 2009 06:37:30 -0000
@@ -256,6 +256,10 @@ extern	MR_Unsigned	MR_contexts_per_threa
 */
 extern	MR_Unsigned	MR_max_outstanding_contexts;
 
+/* work-stealing tunables (documented in mercury_wrapper.c) */
+extern	MR_Unsigned MR_worksteal_max_attempts;
+extern	MR_Unsigned MR_worksteal_sleep_msecs;
+
 extern  MR_Unsigned MR_num_threads;
 
 /* file names for the mdb debugging streams */
Index: runtime/mercury_wsdeque.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wsdeque.h,v
retrieving revision 1.1
diff -u -p -b -r1.1 mercury_wsdeque.h
--- runtime/mercury_wsdeque.h	11 Oct 2007 11:45:22 -0000	1.1
+++ runtime/mercury_wsdeque.h	5 Dec 2009 06:19:59 -0000
@@ -71,8 +71,8 @@ extern  void    MR_wsdeque_putback_botto
 ** Pop a spark off the bottom of the deque.  Must only be called by
 ** the owner of the deque.  Returns true if successful.
 */
-MR_INLINE
-MR_bool         MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Spark *ret_spark);
+MR_INLINE MR_bool
+MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Code **ret_spark_resume);
 
 /*
 ** Attempt to steal a spark from the top of the deque.
@@ -122,7 +122,7 @@ MR_wsdeque_push_bottom(MR_SparkDeque *dq
 }
 
 MR_INLINE MR_bool
-MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Spark *ret_spark)
+MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Code **ret_spark_resume)
 {
     MR_Integer              bot;
     MR_Integer              top;
@@ -143,7 +143,7 @@ MR_wsdeque_pop_bottom(MR_SparkDeque *dq,
         return MR_FALSE;
     }
 
-    *ret_spark = MR_sa_element(arr, bot);
+    (*ret_spark_resume) = MR_sa_element(arr, bot).MR_spark_resume;
     if (size > 0) {
         return MR_TRUE;
     }
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 489 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20091209/2c073259/attachment.sig>


More information about the reviews mailing list