[m-rev.] diff: Parallel RTS changes for idle engines and notifications.

Paul Bone pbone at csse.unimelb.edu.au
Thu Oct 18 17:10:40 AEDT 2012


If anyone wants to review this then it's best not reviewed as a changeset but
as a design.  It is the implementation of the design described in
compiler/notes/par_engine_state.{txt,dot}, so that is the correct place to
start.

I've ran the parallel fibs (without granularity control) in a loop for 24 hours
and havn't found any lingering problems.  Cetainly this is more stable than the
previous code.

(Also I'm back from Europe)

--

Parallel RTS changes for idle engines and notifications.

This redesign prevents at least one race condition that could previously
occur with scheduling of contexts.  This work actually represents an attempt
to design and check these algorithms rather than my previous work which was
no-where near as thorough.

runtime/mercury_context.[ch]:
    Modified along with this are the actions that an engine can undertake,
    many have been renamed but MR_ENGINE_ACTION_CONTEXT_ADVICE has been
    added, which is given to an engine to instruct that engine to check the
    context run queue.

    There are now two procedures for notifying/waking engines.
    try_wake_engine and try_notify_engine, the former is used only when an
    engine is sleeping, the latter is to be used when the engine is idle but
    not sleeping.

    I've also modified how the MR_num_idle_engines counter is used.  It
    now counts the number of engines in the stealing or sleeping states.

    A lot of fprintfs have been added for thread debugging.  They use the
    MR_DEBUG_THREADS macro and --debug-threads runtime option.

    We now support polling and non-polling modes for work stealing.  Engines
    can either be notified when there is a new spark, slowing down spark
    creation time.  Or engines can wake up and poll other engines for
    sparks meaning that the system doesn't idle well.

    Remove MR_do_idle_clean_context which was a specialised entry point for
    the MR_do_idle code.  However there is not much difference between it
    and MR_do_idle, so it is not worth maintaining two procedures.

    Remove MR_do_idle_dirty_context, this logic has been re-implemented in
    MR_join_and_continue where it can be faster.

    Make MR_attempt_steal_spark skip stealing from the current engine.

    Add a memory fence to MR_join_and_continue to make sure that context
    data is saved before marking the context as runnable.

    Modify the spark deque structure to reduce false-sharing.  (Very minor
    speed-up).

diff --git a/runtime/mercury_context.c b/runtime/mercury_context.c
index a736620..84042f3 100644
--- a/runtime/mercury_context.c
+++ b/runtime/mercury_context.c
@@ -72,43 +72,74 @@ MR_init_context_maybe_generator(MR_Context *c, const char *id,
 static void
 MR_milliseconds_from_now(struct timespec *timeout, unsigned int msecs);
 
+
 /*
-** These states are bitfields so they can be combined when passed to
-** try_wake_engine.  The definitions of the starts are:
+** Engine states and notifications
+** -------------------------------
+**
+** An engine may be in one of the following states, see the es_state field
+** engine_sleep_sync_i
+**
+** working      The engine has work to do and is working on it.
+**              The engine will not check for notifications, all
+**              notifications will be ignored.
+**
+** idle         The engine finished its work and is looking for
+**              more work.  It is looking for a context to resume or a local
+**              spark.  If found, the engine will move to the working state,
+**              if not, it will check for notifications and if there are
+**              none it moves to the stealing state.  Only notify an idle
+**              engine with notifications that may be ignored.
 **
-** working      the engine has work to do and is working on it.
+** stealing     The engine is now attempting to work steal.  It has now
+**              incremented the idle engine count to make it easier to
+**              receive notifications.  If it finds a spark it will decrement
+**              the count and execute the spark.  Otherwise it checks for
+**              notifications and moves to the sleeping state.  This state
+**              is similar to idle but separate as it allows another engine
+**              to understand if this engine has modified the idle engine
+**              count (which we don't want to do in the idle state as that
+**              will often find a local spark to execute).
 **
-** sleeping     The engine has no work to do and is sleeping on it's sleep
-**              semaphore.
+** sleeping     The engine has committed to going to sleep, to wake it up
+**              one must post to its sleep semaphore ensuring that it does
+**              not sleep.  Any notification can be sent at this stage as
+**              all will be acted upon, including the context notification
+**              which cannot be dropped.
 **
-** idle         The engine has recently finished it's work and is looking for
-**              more work before it goes to sleep.  This state is useful when
-**              there are no sleeping engines but there are idle engines,
-**              signalling an idle engine will prevent it from sleeping and
-**              allow it to re-check the work queues.
+** notified
+**              The engine has received a notification, it cannot receive
+**              another notification now.  This state is initiated by the
+**              notifier, and therefore is done with either a compare and
+**              swap or a lock depending on the state of the engine.  See
+**              try_wake_engine and try_notify_engine.  Upon receiving the
+**              notification the engine will set its new status
+**              appropriately.
 **
-** woken        The engine was either sleeping or idle and has been signaled
-**              and possibly been given work to do.  DO NOT signal these
-**              engines again doing so may leak work.
+** More information about these states including which transitions are legal
+** can be found in notes/par_engine_state.{txt,dot}
+*/
+
+/*
+** Busy isn't a normal state, but it's used with the CAS code to make some
+** operations atomic.
 */
-#define ENGINE_STATE_WORKING    0x0001
-#define ENGINE_STATE_SLEEPING   0x0002
-#define ENGINE_STATE_IDLE       0x0004
-#define ENGINE_STATE_WOKEN      0x0008
-#define ENGINE_STATE_ALL        0xFFFF
+#define ENGINE_STATE_BUSY           0x0000
+#define ENGINE_STATE_WORKING        0x0001
+#define ENGINE_STATE_IDLE           0x0002
+#define ENGINE_STATE_STEALING       0x0004
+#define ENGINE_STATE_SLEEPING       0x0008
+#define ENGINE_STATE_NOTIFIED       0x0010
+#define ENGINE_STATE_ALL            0xFFFF
 
 struct engine_sleep_sync_i {
-    sem_t                               es_sleep_semaphore;
-    MercuryLock                         es_wake_lock;
-    volatile unsigned                   es_state;
-    volatile unsigned                   es_action;
-    union MR_engine_wake_action_data    es_action_data;
+    sem_t                                       es_sleep_semaphore;
+    MercuryLock                                 es_wake_lock;
+    volatile MR_Unsigned                        es_state;
+    volatile unsigned                           es_action;
+    volatile union MR_engine_wake_action_data   es_action_data;
 };
 
-#define CACHE_LINE_SIZE 64
-#define PAD_CACHE_LINE(s) \
-    ((CACHE_LINE_SIZE) > (s) ? (CACHE_LINE_SIZE) - (s) : 0)
-
 typedef struct {
     struct engine_sleep_sync_i d;
     /*
@@ -124,8 +155,7 @@ engine_sleep_sync *engine_sleep_sync_data;
 
 
 /*
-** The run queue is protected with MR_runqueue_lock and signalled with
-** MR_runqueue_cond.
+** The run queue is protected with MR_runqueue_lock.
 */
 MR_Context              *MR_runqueue_head;
 MR_Context              *MR_runqueue_tail;
@@ -230,14 +260,29 @@ MR_SparkDeque           **MR_spark_deques = NULL;
 
 #ifdef MR_LL_PARALLEL_CONJ
 /*
-** Try to wake up a sleeping engine and tell it to do action. The engine
-** is only woken if the engine is in one of the states in the bitfield states.
-** If the engine is woken, this function returns MR_TRUE, otherwise it
-** returns MR_FALSE.
+** Try to wake up a sleeping engine and tell it to do action. The engine is
+** only woken if it is in the sleeping state.  If the engine is not sleeping
+** use try_notify_engine below.  If the engine is woken without a race, this
+** function returns MR_TRUE, otherwise it returns MR_FALSE.
 */
 static MR_bool
 try_wake_engine(MR_EngineId engine_id, int action,
-    union MR_engine_wake_action_data *action_data, unsigned states);
+    union MR_engine_wake_action_data *action_data);
+
+/*
+** Send a notification to the engine.  This is applicable if the engine is
+** in any other state (not sleeping).  This function does not use the
+** semaphore so it cannot wake a sleeping engine.  Don't confuse the
+** dropable and non-dropable notifications with the notify/wake methods.
+** The only connection is that in general non-dropable notifications should
+** be used wit try_notify_engine.
+**
+** The engine's current state must be passed in engine_state as it is used
+** with the CAS operation.
+*/
+static MR_bool
+try_notify_engine(MR_EngineId engine_id, int action,
+    union MR_engine_wake_action_data *action_data, MR_Unsigned engine_state);
 #endif
 
 /*
@@ -1285,6 +1330,7 @@ MR_attempt_steal_spark(MR_Spark *spark)
 {
     int             i;
     int             offset;
+    int             victim_id;
     MR_SparkDeque   *victim;
     int             steal_result;
     MR_bool         result = MR_FALSE;
@@ -1292,7 +1338,14 @@ MR_attempt_steal_spark(MR_Spark *spark)
     offset = MR_ENGINE(MR_eng_victim_counter);
 
     for (i = 0; i < MR_num_threads; i++) {
-        victim = MR_spark_deques[(i + offset) % MR_num_threads];
+        victim_id = (i + offset) % MR_num_threads;
+        if (victim_id == MR_ENGINE(MR_eng_id)) {
+            /*
+            ** There's no point in stealing from ourself.
+            */
+            continue;
+        }
+        victim = MR_spark_deques[victim_id];
         if (victim != NULL) {
             steal_result = MR_wsdeque_steal_top(victim, spark);
             /*
@@ -1312,7 +1365,7 @@ MR_attempt_steal_spark(MR_Spark *spark)
         }
     }
 
-    MR_ENGINE(MR_eng_victim_counter) = (i % MR_num_threads);
+    MR_ENGINE(MR_eng_victim_counter) = victim_id;
     return result;
 }
 
@@ -1485,8 +1538,10 @@ MR_schedule_context(MR_Context *ctxt)
 {
 #ifdef MR_LL_PARALLEL_CONJ
     MR_EngineId engine_id;
-    union MR_engine_wake_action_data wake_action_data;
-    wake_action_data.MR_ewa_context = ctxt;
+    union MR_engine_wake_action_data notify_context_data;
+    engine_sleep_sync *esync;
+
+    notify_context_data.MR_ewa_context = ctxt;
 
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
     MR_threadscope_post_context_runnable(ctxt);
@@ -1496,6 +1551,7 @@ MR_schedule_context(MR_Context *ctxt)
     ** Try to give this context straight to the engine that would execute it.
     */
     engine_id = ctxt->MR_ctxt_resume_owner_engine;
+    esync = &(engine_sleep_sync_data[engine_id]);
 #ifdef MR_DEBUG_THREADS
     if (MR_debug_threads) {
         fprintf(stderr, "%ld Scheduling context %p desired engine: %d\n",
@@ -1544,22 +1600,20 @@ MR_schedule_context(MR_Context *ctxt)
                 MR_SELF_THREAD_ID);
         }
 #endif
+
         /*
-        ** There is a bug on this line, we should never give a context
-        ** notification to an idle engine as the notification can be lost if
-        ** the engine writes-over its own state and then a later
-        ** notification is passed to the engine.
-        **
-        ** However I don't want to fix it as fixing it may make the window
-        ** for the race condition documented above wider.
+        ** Only engine_id may execute this context, if it is sleeping
+        ** attempt to wake it.
         */
-        if (try_wake_engine(engine_id, MR_ENGINE_ACTION_CONTEXT,
-            &wake_action_data, ENGINE_STATE_IDLE | ENGINE_STATE_SLEEPING))
-        {
-            /*
-            ** We have successfully given the context to the correct engine.
-            */
-            return;
+        if (esync->d.es_state == ENGINE_STATE_SLEEPING) {
+            if (try_wake_engine(engine_id, MR_ENGINE_ACTION_CONTEXT,
+                &notify_context_data))
+            {
+                /*
+                ** We have successfully given the context to the correct engine.
+                */
+                return;
+            }
         }
     } else {
         /*
@@ -1568,7 +1622,7 @@ MR_schedule_context(MR_Context *ctxt)
         */
         if (MR_num_idle_engines > 0) {
             if (MR_try_wake_an_engine(engine_id, MR_ENGINE_ACTION_CONTEXT,
-                &wake_action_data, NULL))
+                &notify_context_data, NULL))
             {
                 /*
                 ** The context has been given to an engine.
@@ -1589,6 +1643,51 @@ MR_schedule_context(MR_Context *ctxt)
         MR_runqueue_tail = ctxt;
     }
     MR_UNLOCK(&MR_runqueue_lock, "schedule_context");
+
+#ifdef MR_LL_PARALLEL_CONJ
+    if (ctxt->MR_ctxt_resume_engine_required == MR_TRUE) {
+        /*
+        ** The engine is only runnable on a single context, that context was
+        ** busy earlier and couldn't be handed the engine.  If that context
+        ** is now idle or stealing it may have already checked the runqueue
+        ** (where we just put the context).  Therefore we re-attempt to
+        ** notify the engine to ensure that it re-checks the runqueue.
+        **
+        ** This is only a problem with only a single engine can execute a
+        ** context, in any other case the current engine will eventually check
+        ** the runqueue.
+        **
+        ** The updates to the run queue are guaranteed by the compiler and
+        ** the processor to be visible before the runqueue is unlocked.
+        ** And the engine's update of its state from working->idle will be
+        ** available before it can lock the runqueue.  Therefore, if the
+        ** engine is working we do not message it because it will check the
+        ** runqueue anyway.
+        */
+
+        MR_Unsigned state;
+        MR_bool notified;
+
+        state = esync->d.es_state;
+        while (state & (ENGINE_STATE_SLEEPING | ENGINE_STATE_IDLE |
+                ENGINE_STATE_STEALING)) {
+            if (state == ENGINE_STATE_SLEEPING) {
+                if (try_wake_engine(engine_id,
+                        MR_ENGINE_ACTION_CONTEXT_ADVICE, NULL)) {
+                    break;
+                }
+            } else if ((state == ENGINE_STATE_IDLE)
+                    || (state == ENGINE_STATE_STEALING)) {
+                if (try_notify_engine(engine_id,
+                        MR_ENGINE_ACTION_CONTEXT_ADVICE, NULL, state)) {
+                    break;
+                }
+            }
+            MR_sched_yield();
+            state = esync->d.es_state;
+        }
+    }
+#endif /* MR_LL_PARALLEL_CONJ */
 }
 
 #ifdef MR_LL_PARALLEL_CONJ
@@ -1603,6 +1702,26 @@ MR_try_wake_an_engine(MR_EngineId preferred_engine, int action,
     int i = 0;
     int state;
     MR_bool result;
+    MR_Unsigned valid_states;
+
+    /*
+    ** Set the valid set of states that can be notified for this action.
+    */
+    switch (action) {
+        case MR_ENGINE_ACTION_SHUTDOWN:
+        case MR_ENGINE_ACTION_CONTEXT_ADVICE:
+            valid_states = ENGINE_STATE_IDLE | ENGINE_STATE_STEALING |
+                ENGINE_STATE_SLEEPING;
+            break;
+        case MR_ENGINE_ACTION_WORKSTEAL_ADVICE:
+            valid_states = ENGINE_STATE_STEALING | ENGINE_STATE_SLEEPING;
+            break;
+        case MR_ENGINE_ACTION_CONTEXT:
+            valid_states = ENGINE_STATE_SLEEPING;
+            break;
+        default:
+            abort();
+    }
 
     /*
     ** Right now this algorithm is naive, it searches from the preferred engine
@@ -1617,35 +1736,64 @@ MR_try_wake_an_engine(MR_EngineId preferred_engine, int action,
             continue;
         }
         state = engine_sleep_sync_data[current_engine].d.es_state;
-        if (state == ENGINE_STATE_SLEEPING) {
-            result = try_wake_engine(current_engine, action, action_data,
-                    ENGINE_STATE_SLEEPING);
-            if (result) {
-                if (target_eng) {
-                    *target_eng = current_engine;
-                }
-                return MR_TRUE;
+        if (state & valid_states) {
+            switch (state) {
+                case ENGINE_STATE_SLEEPING:
+                    result = try_wake_engine(current_engine, action, action_data);
+                    if (result) {
+                        goto success;
+                    }
+                    break;
+
+                case ENGINE_STATE_IDLE:
+                case ENGINE_STATE_STEALING:
+                    result = try_notify_engine(current_engine, action,
+                        action_data, state);
+                    if (result) {
+                        goto success;
+                    }
+                    break;
             }
         }
     }
 
     return MR_FALSE;
+
+success:
+    if (target_eng) {
+        *target_eng = current_engine;
+    }
+    return MR_TRUE;
 }
 
 static MR_bool
 try_wake_engine(MR_EngineId engine_id, int action,
-    union MR_engine_wake_action_data *action_data, unsigned states)
+    union MR_engine_wake_action_data *action_data)
 {
     MR_bool success = MR_FALSE;
     engine_sleep_sync *esync = &(engine_sleep_sync_data[engine_id]);
 
+#ifdef MR_DEBUG_THREADS
+    if (MR_debug_threads) {
+        fprintf(stderr,
+            "%ld Trying to wake up engine %d, action %d\n",
+            MR_SELF_THREAD_ID, engine_id, action);
+    }
+#endif
+
     /*
-    ** This engine is probably in the state our caller checked that it was in.
-    ** Wait on the semaphore then re-check the state to be sure.
+    ** Our caller made an initial check of the engine's state.  But we check
+    ** it again after taking the lock.
     */
     MR_LOCK(&(esync->d.es_wake_lock), "try_wake_engine, wake_lock");
-    if (esync->d.es_state & states) {
+    if (esync->d.es_state == ENGINE_STATE_SLEEPING) {
         MR_atomic_dec_int(&MR_num_idle_engines);
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Decrement MR_num_idle_engines %d\n",
+                MR_SELF_THREAD_ID, MR_num_idle_engines);
+        }
+#endif
 
         /*
         ** We now KNOW that the engine is in one of the correct states.
@@ -1657,7 +1805,7 @@ try_wake_engine(MR_EngineId engine_id, int action,
         if (action_data) {
             esync->d.es_action_data = *action_data;
         }
-        esync->d.es_state = ENGINE_STATE_WOKEN;
+        esync->d.es_state = ENGINE_STATE_NOTIFIED;
         MR_CPU_SFENCE;
         MR_SEM_POST(&(esync->d.es_sleep_semaphore),
             "try_wake_engine sleep_sem");
@@ -1665,20 +1813,126 @@ try_wake_engine(MR_EngineId engine_id, int action,
     }
     MR_UNLOCK(&(esync->d.es_wake_lock), "try_wake_engine wake_lock");
 
+#ifdef MR_DEBUG_THREADS
+    if (MR_debug_threads) {
+        fprintf(stderr,
+            "%ld Wake result %d\n",
+            MR_SELF_THREAD_ID, success);
+    }
+#endif
+
     return success;
 }
 
+MR_bool
+try_notify_engine(MR_EngineId engine_id, int action,
+    union MR_engine_wake_action_data *action_data, MR_Unsigned engine_state)
+{
+    engine_sleep_sync   *esync = &(engine_sleep_sync_data[engine_id]);
+
+#ifdef MR_DEBUG_THREADS
+    if (MR_debug_threads) {
+        fprintf(stderr,
+            "%ld Trying to notify engine %d, action %d, state %d\n",
+            MR_SELF_THREAD_ID, engine_id, action, engine_state);
+    }
+#endif
+
+    /*
+    ** As in try_wake_engine, we expect our caller to read the current state
+    ** of the engine.  But in this case it should also provide the state of
+    ** the engine so we can use it for the CAS below.
+    */
+    if (MR_compare_and_swap_uint(&(esync->d.es_state), engine_state,
+            ENGINE_STATE_BUSY)) {
+        /* Tell the engine what to do. */
+        esync->d.es_action = action;
+        if (action_data) {
+            esync->d.es_action_data = *action_data;
+        }
+        if (engine_state == ENGINE_STATE_STEALING) {
+            /*
+            ** The engine was idle if it was in the stealing state.
+            ** It is not idle anymore so fixup the count.
+            */
+            MR_atomic_dec_int(&MR_num_idle_engines);
+#ifdef MR_DEBUG_THREADS
+            if (MR_debug_threads) {
+                fprintf(stderr, "%ld Decrement MR_num_idle_engines %d\n",
+                    MR_SELF_THREAD_ID, MR_num_idle_engines);
+            }
+#endif
+        }
+
+        /* Write the data before we move into the working state. */
+        MR_CPU_SFENCE;
+        esync->d.es_state = ENGINE_STATE_NOTIFIED;
+
+        /*
+        ** We don't adjust the idle engine counter, the engine itself does
+        ** that, especially if this message is dropable.
+        */
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Notified engine\n", MR_SELF_THREAD_ID);
+        }
+#endif
+        return MR_TRUE;
+    } else {
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Could not notify engine (lost CAS race)\n",
+                MR_SELF_THREAD_ID);
+        }
+#endif
+        return MR_FALSE;
+    }
+}
+
 void
 MR_shutdown_all_engines(void)
 {
     int i;
+    MR_bool result;
 
     for (i = 0; i < MR_num_threads; i++) {
+        engine_sleep_sync *esync = &(engine_sleep_sync_data[i]);
         if (i == MR_ENGINE(MR_eng_id)) {
             continue;
         }
-        try_wake_engine(i, MR_ENGINE_ACTION_SHUTDOWN, NULL,
-            ENGINE_STATE_ALL);
+
+        while (1) {
+            MR_Unsigned state = esync->d.es_state;
+
+            /*
+            ** We can only notify the engine if it is in the idle or
+            ** sleeping states.  Only in these states can we be sure that
+            ** the engine will observe our message.  If it is sleeping then
+            ** the semaphore is used for synchronization.  If it is idle, it
+            ** must do a CAS before it sleeps (and it cannot work because
+            ** there will be no available work if the system is shutting
+            ** down.
+            */
+            if (state == ENGINE_STATE_IDLE) {
+                if (try_notify_engine(i, MR_ENGINE_ACTION_SHUTDOWN, NULL,
+                        state)) {
+                    break;
+                }
+            } else if (state == ENGINE_STATE_SLEEPING) {
+                result = try_wake_engine(i, MR_ENGINE_ACTION_SHUTDOWN,
+                    NULL);
+                break;
+            }
+            /*
+            ** An engine may still appear to be working if this processor
+            ** has not yet seen the other processor write to its state
+            ** field yet.  If this happens, wait until it does.
+            **
+            ** Yield to the OS because we cannot know how long we may have
+            ** to wait.
+            */
+            MR_sched_yield();
+        }
     }
 
     for (i = 0; i < (MR_num_threads - 1); i++) {
@@ -1699,6 +1953,18 @@ MR_shutdown_all_engines(void)
 **
 */
 
+static void
+action_shutdown_engine(void);
+
+static MR_Code*
+action_worksteal(MR_EngineId victim_engine_id);
+
+/*
+** This always returns a valid code address.
+*/
+static MR_Code*
+action_context(MR_Context *context);
+
 /*
 ** The run queue used to include timing code. It has been removed and may be
 ** added in the future.
@@ -1707,8 +1973,7 @@ MR_shutdown_all_engines(void)
 MR_define_extern_entry(MR_do_idle);
 
 #ifdef MR_THREAD_SAFE
-MR_define_extern_entry(MR_do_idle_clean_context);
-MR_define_extern_entry(MR_do_idle_dirty_context);
+MR_define_extern_entry(MR_do_idle_worksteal);
 MR_define_extern_entry(MR_do_sleep);
 
 static MR_Code*
@@ -1757,34 +2022,46 @@ MR_BEGIN_MODULE(scheduler_module_idle)
     MR_init_entry_an(MR_do_idle);
 MR_BEGIN_CODE
 MR_define_entry(MR_do_idle);
-  #ifdef MR_THREAD_SAFE
 {
-    MR_Code *jump_target;
+#ifdef MR_THREAD_SAFE
+    MR_Code             *jump_target;
+    MR_EngineId         engine_id = MR_ENGINE(MR_eng_id);
+    engine_sleep_sync   *esync =
+        &(engine_sleep_sync_data[MR_ENGINE(MR_eng_id)]);
+
     /*
-    ** Try to get a context.
+    ** We can set the idle status without a compare and swap.  There are no
+    ** notifications that could have arrived while the engine was working,
+    ** and that cannot safely be ignored.  This is a deliberate design
+    ** choice, to avoid a compare and swap in the common state transitions
+    ** between idle and working, and vice versa.
+    **
+    ** We must advertise that we are in the idle state now (even if we're
+    ** about to find work) before checking the context run queue.
+    ** schedule_context() requires this so that it can reliably deliver a
+    ** context advice message.
     */
-    advertise_engine_state_idle();
+    esync->d.es_state = ENGINE_STATE_IDLE;
 
+    /*
+    ** Try to get a context.
+    */
     jump_target = do_get_context();
     if (jump_target != NULL) {
-        advertise_engine_state_working();
+        esync->d.es_state = ENGINE_STATE_WORKING;
         MR_GOTO(jump_target);
     }
     jump_target = do_local_spark(NULL);
     if (jump_target != NULL) {
-        advertise_engine_state_working();
-        MR_GOTO(jump_target);
-    }
-    jump_target = do_work_steal();
-    if (jump_target != NULL) {
-        advertise_engine_state_working();
+        esync->d.es_state = ENGINE_STATE_WORKING;
         MR_GOTO(jump_target);
     }
 
-    MR_GOTO(MR_ENTRY(MR_do_sleep));
-}
-  #else /* !MR_THREAD_SAFE */
-{
+    /*
+    ** TODO: Use multiple entry points into a single MODULE structure.
+    */
+    MR_GOTO(MR_ENTRY(MR_do_idle_worksteal));
+#else /* !MR_THREAD_SAFE */
     /*
     ** When an engine becomes idle in a non parallel grade, it simply picks up
     ** another context.
@@ -1805,35 +2082,83 @@ MR_define_entry(MR_do_idle);
 
     MR_load_context(MR_ENGINE(MR_eng_this_context));
     MR_GOTO(MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume);
+#endif /* !MR_THREAD_SAFE */
 }
-  #endif /* !MR_THREAD_SAFE */
 MR_END_MODULE
 
 #ifdef MR_THREAD_SAFE
-MR_BEGIN_MODULE(scheduler_module_idle_clean_context)
-    MR_init_entry_an(MR_do_idle_clean_context);
+MR_BEGIN_MODULE(scheduler_module_idle_worksteal)
+    MR_init_entry_an(MR_do_idle_worksteal);
 MR_BEGIN_CODE
-MR_define_entry(MR_do_idle_clean_context);
+MR_define_entry(MR_do_idle_worksteal);
 {
-#ifdef MR_THREADSCOPE
-    MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
-#endif
-    advertise_engine_state_idle();
+    MR_Code             *jump_target;
+    MR_EngineId         engine_id = MR_ENGINE(MR_eng_id);
+    engine_sleep_sync   *esync =
+        &(engine_sleep_sync_data[engine_id]);
+
+    if (!MR_compare_and_swap_uint(&(esync->d.es_state), ENGINE_STATE_IDLE,
+            ENGINE_STATE_STEALING)) {
+        while (esync->d.es_state == ENGINE_STATE_BUSY) {
+            MR_ATOMIC_PAUSE;
+        }
 
-    MR_Code *jump_target;
-    jump_target = do_get_context();
-    if (jump_target != NULL) {
-        advertise_engine_state_working();
-        MR_GOTO(jump_target);
+        /*
+        ** The compare and swap failed, which means there is a notification.
+        */
+        switch(esync->d.es_action) {
+            case MR_ENGINE_ACTION_SHUTDOWN:
+                action_shutdown_engine();
+
+            case MR_ENGINE_ACTION_CONTEXT_ADVICE:
+                MR_GOTO(MR_ENTRY(MR_do_idle));
+
+            case MR_ENGINE_ACTION_WORKSTEAL_ADVICE:
+                jump_target = action_worksteal(
+                    esync->d.es_action_data.MR_ewa_worksteal_engine);
+                if (jump_target != NULL) {
+                    MR_GOTO(jump_target);
+                } else {
+                    MR_GOTO(MR_ENTRY(MR_do_idle));
+                }
+
+            case MR_ENGINE_ACTION_CONTEXT:
+            case MR_ENGINE_ACTION_NONE:
+            default:
+                abort();
+                break;
+        }
+        /*
+        ** We attempted to act on the notification but we lost a race above
+        ** when attempting to worksteal.  Now we continue into the
+        ** workstealing state.
+        */
+        esync->d.es_state = ENGINE_STATE_STEALING;
     }
-    jump_target = do_local_spark(NULL);
-    if (jump_target != NULL) {
-        advertise_engine_state_working();
-        MR_GOTO(jump_target);
+
+    /*
+    ** The compare and swap must be visible before the increment.
+    */
+    MR_CPU_SFENCE;
+    MR_atomic_inc_int(&MR_num_idle_engines);
+#ifdef MR_DEBUG_THREADS
+    if (MR_debug_threads) {
+        fprintf(stderr, "%ld Increment MR_num_idle_engines %d\n",
+            MR_SELF_THREAD_ID, MR_num_idle_engines);
     }
+#endif
+
     jump_target = do_work_steal();
     if (jump_target != NULL) {
-        advertise_engine_state_working();
+        MR_atomic_dec_int(&MR_num_idle_engines);
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Decrement MR_num_idle_engines %d\n",
+                MR_SELF_THREAD_ID, MR_num_idle_engines);
+        }
+#endif
+        MR_CPU_SFENCE;
+        esync->d.es_state = ENGINE_STATE_WORKING;
         MR_GOTO(jump_target);
     }
 
@@ -1843,42 +2168,6 @@ MR_END_MODULE
 #endif /* MR_THREAD_SAFE */
 
 #ifdef MR_THREAD_SAFE
-MR_BEGIN_MODULE(scheduler_module_idle_dirty_context)
-    MR_init_entry_an(MR_do_idle_dirty_context);
-MR_BEGIN_CODE
-MR_define_entry(MR_do_idle_dirty_context);
-{
-    MR_Code *join_label = (MR_Code*)MR_r1;
-    MR_Code *jump_target;
-
-    /*
-    ** We check for local sparks first.  If there is a local spark that is
-    ** compatible then execute it, it is 'left most' next to the computation
-    ** that this engine has just finished, and could be more optimal than
-    ** any suspended context.  Let another engine pay the cost of the
-    ** context switch.
-    **
-    ** If there was an incompatible local spark then it may still be
-    ** executed after the jump to MR_do_idle below.
-    */
-    jump_target = do_local_spark(join_label);
-    if (jump_target != NULL) {
-        MR_GOTO(jump_target);
-    }
-
-    /*
-    ** Save our context and then look for work as per normal.
-    */
-#ifdef MR_THREADSCOPE
-    MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
-#endif
-    save_dirty_context(join_label);
-    MR_ENGINE(MR_eng_this_context) = NULL;
-
-    MR_GOTO(MR_ENTRY(MR_do_idle));
-}
-MR_END_MODULE
-
 /*
 ** Put the engine to sleep since there's no work to do.
 **
@@ -1893,20 +2182,40 @@ MR_BEGIN_MODULE(scheduler_module_idle_sleep)
 MR_BEGIN_CODE
 MR_define_entry(MR_do_sleep);
 {
-    MR_EngineId engine_id = MR_ENGINE(MR_eng_id);
-    unsigned action;
-    int result;
+    MR_EngineId         engine_id = MR_ENGINE(MR_eng_id);
+    engine_sleep_sync   *esync =
+        &(engine_sleep_sync_data[MR_ENGINE(MR_eng_id)]);
+
+    unsigned        action;
+    int             result;
+    MR_Code         *jump_target;
+    MR_Unsigned     state;
+#ifdef MR_WORKSTEAL_POLLING
     struct timespec ts;
-    struct timeval tv;
-    MR_Code *jump_target;
+    struct timeval  tv;
+#endif
+
+    if (MR_compare_and_swap_uint(&(esync->d.es_state), ENGINE_STATE_STEALING,
+            ENGINE_STATE_SLEEPING)) {
+        /*
+        ** We have permission to sleep, and must commit to sleeping.
+        */
+
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Engine %d going to sleep\n",
+                MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
+        }
+#endif
 
-    while (1) {
-        engine_sleep_sync_data[engine_id].d.es_state = ENGINE_STATE_SLEEPING;
-        MR_CPU_SFENCE;
 #ifdef MR_THREADSCOPE
         MR_threadscope_post_engine_sleeping();
 #endif
+
+retry_sleep:
 #if defined(MR_HAVE_GETTIMEOFDAY) && defined(MR_HAVE_SEMAPHORE_H)
+
+#ifdef MR_WORKSTEAL_POLLING
         gettimeofday(&tv, NULL);
         /* Sleep for 2ms */
         tv.tv_usec += 2000;
@@ -1918,98 +2227,13 @@ MR_define_entry(MR_do_sleep);
         ts.tv_sec = tv.tv_sec;
         ts.tv_nsec = tv.tv_usec * 1000;
         result = sem_timedwait(
-            &(engine_sleep_sync_data[engine_id].d.es_sleep_semaphore),
+            &(esync->d.es_sleep_semaphore),
             &ts);
 #else
-        MR_fatal_error(
-            "low-level parallel grades need gettimeofday() and "
-            "sem_timedwait()\n");
+        result = sem_wait(&(esync->d.es_sleep_semaphore));
 #endif
 
-        if (0 == result) {
-            MR_CPU_LFENCE;
-            action = engine_sleep_sync_data[engine_id].d.es_action;
-#ifdef MR_DEBUG_THREADS
-            if (MR_debug_threads) {
-                fprintf(stderr,
-                    "%ld Engine %d is awake and will do action %d\n",
-                    MR_SELF_THREAD_ID, engine_id, action);
-            }
-#endif
-
-            switch(action) {
-                case MR_ENGINE_ACTION_SHUTDOWN:
-                    /*
-                    ** The primordial thread has the responsibility of cleaning
-                    ** up the Mercury runtime. It cannot exit by this route.
-                    */
-                    assert(engine_id != 0);
-                    MR_atomic_dec_int(&MR_num_idle_engines);
-                    MR_destroy_thread(MR_cur_engine());
-                    MR_SEM_POST(&shutdown_semaphore,
-                        "MR_do_sleep shutdown_sem");
-                    pthread_exit(0);
-                    break;
-
-                case MR_ENGINE_ACTION_WORKSTEAL:
-                    MR_ENGINE(MR_eng_victim_counter) =
-                        engine_sleep_sync_data[engine_id].d.es_action_data.
-                        MR_ewa_worksteal_engine;
-
-                    jump_target = do_work_steal();
-                    if (jump_target != NULL) {
-                        engine_sleep_sync_data[engine_id].d.es_state =
-                            ENGINE_STATE_WORKING;
-                        MR_GOTO(jump_target);
-                    }
-                    jump_target = do_get_context();
-                    if (jump_target != NULL) {
-                        engine_sleep_sync_data[engine_id].d.es_state =
-                            ENGINE_STATE_WORKING;
-                        MR_GOTO(jump_target);
-                    }
-                    break;
-
-                case MR_ENGINE_ACTION_CONTEXT:
-                    {
-                        MR_Context  *context;
-                        MR_Code     *resume_point;
-
-                        engine_sleep_sync_data[engine_id].d.es_state =
-                            ENGINE_STATE_WORKING;
-                        context = engine_sleep_sync_data[engine_id].d.
-                            es_action_data.MR_ewa_context;
-                        prepare_engine_for_context(context);
-
-                        #ifdef MR_DEBUG_STACK_SEGMENTS
-                        MR_debug_log_message("resuming old context: %p",
-                            context);
-                        #endif
-
-                        resume_point = (MR_Code*)(context->MR_ctxt_resume);
-                        context->MR_ctxt_resume = NULL;
-
-                        MR_GOTO(resume_point);
-                    }
-                    break;
-
-                case MR_ENGINE_ACTION_NONE:
-                default:
-                    jump_target = do_get_context();
-                    if (jump_target != NULL) {
-                        engine_sleep_sync_data[engine_id].d.es_state =
-                            ENGINE_STATE_WORKING;
-                        MR_GOTO(jump_target);
-                    }
-                    jump_target = do_work_steal();
-                    if (jump_target != NULL) {
-                        engine_sleep_sync_data[engine_id].d.es_state =
-                            ENGINE_STATE_WORKING;
-                        MR_GOTO(jump_target);
-                    }
-                    break;
-            }
-        } else {
+        if (result != 0) {
             /*
             ** Sem_wait reported an error.
             */
@@ -2018,25 +2242,223 @@ MR_define_entry(MR_do_sleep);
                     /*
                     ** An interrupt woke the engine, go back to sleep.
                     */
-                    break;
+#ifdef MR_DEBUG_THREADS
+                    if (MR_debug_threads) {
+                        fprintf(stderr, "%ld Engine sleep interrupted\n",
+                            MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
+                    }
+#endif
+                    goto retry_sleep;
                 case ETIMEDOUT:
                     /*
                     ** A wait timed out, check for any sparks.
                     */
-                    jump_target = do_work_steal();
+#ifdef MR_DEBUG_THREADS
+                    if (MR_debug_threads) {
+                        fprintf(stderr, "%ld Engine sleep timed out\n",
+                            MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
+                    }
+#endif
+                    MR_LOCK(&(esync->d.es_wake_lock), "do_sleep, wake_lock");
+                    state = esync->d.es_state;
+                    if (state == ENGINE_STATE_NOTIFIED) {
+                        /*
+                        ** A notification occurred after the timeout but
+                        ** before we took the lock above.
+                        **
+                        ** So set a null jump target and do not get a spark.
+                        ** Then we will execute the goto below and wait on
+                        ** the semaphore once more, which will instantly
+                        ** succeed and proceed to interpret the notification
+                        ** below.
+                        */
+                        jump_target = NULL;
+                    } else {
+                        jump_target = do_work_steal();
+                        if (jump_target != NULL) {
+                            MR_atomic_dec_int(&MR_num_idle_engines);
+#ifdef MR_DEBUG_THREADS
+                            if (MR_debug_threads) {
+                                fprintf(stderr, "%ld Decrement MR_num_idle_engines %d\n",
+                                    MR_SELF_THREAD_ID, MR_num_idle_engines);
+                            }
+#endif
+                            MR_CPU_SFENCE;
+                            esync->d.es_state = ENGINE_STATE_WORKING;
+                        }
+                    }
+                    MR_UNLOCK(&(esync->d.es_wake_lock), "do_sleep, wake_lock");
                     if (jump_target != NULL) {
-                        advertise_engine_state_working();
                         MR_GOTO(jump_target);
                     }
-                    break;
+                    goto retry_sleep;
                 default:
                     perror("sem_timedwait");
                     abort();
             }
-        }
+        } /* if sem_wait raised an error */
+#else
+        MR_fatal_error(
+            "low-level parallel grades need gettimeofday() and "
+            "sem_timedwait()\n");
+#endif
+    } else {
+        /*
+        ** The compare and swap failed, retrieve the new state.
+        ** Wait until the engine state is no-longer busy, indicating that
+        ** the action information is available.
+        */
+        do {
+            state = esync->d.es_state;
+            MR_ATOMIC_PAUSE;
+        } while (state == ENGINE_STATE_BUSY);
+        /* read state above before reading action below */
+        MR_CPU_LFENCE;
     }
+    /*
+    ** Either we slept and were notified, or were notified before we slept.
+    ** Either way, check why we were notified.
+    */
+    MR_assert(state == ENGINE_STATE_NOTIFIED);
+
+    action = esync->d.es_action;
+    esync->d.es_action = MR_ENGINE_ACTION_NONE;
+
+    switch(action) {
+        case MR_ENGINE_ACTION_SHUTDOWN:
+            action_shutdown_engine();
+
+        case MR_ENGINE_ACTION_WORKSTEAL_ADVICE:
+            jump_target = action_worksteal(
+                esync->d.es_action_data.MR_ewa_worksteal_engine);
+            if (jump_target != NULL) {
+                MR_GOTO(jump_target);
+            } else {
+                MR_GOTO(MR_ENTRY(MR_do_idle));
+            }
+
+        case MR_ENGINE_ACTION_CONTEXT:
+            MR_GOTO(action_context(esync->d.es_action_data.MR_ewa_context));
+
+        case MR_ENGINE_ACTION_CONTEXT_ADVICE:
+            MR_GOTO(MR_ENTRY(MR_do_idle));
+
+        case MR_ENGINE_ACTION_NONE:
+        default:
+            fprintf(stderr,
+                "Mercury runtime: Engine woken with no action\n");
+            break;
+    } /* Switch on action */
+    /*
+    ** Each case ends with a GOTO, so execution cannot reach here
+    */
+    abort();
 }
 MR_END_MODULE
+
+static void
+action_shutdown_engine(void)
+{
+    MR_EngineId engine_id = MR_ENGINE(MR_eng_id);
+
+#ifdef MR_DEBUG_THREADS
+    if (MR_debug_threads) {
+        fprintf(stderr, "%ld Engine %d doing ACTION_SHUTDOWN\n",
+            MR_SELF_THREAD_ID, engine_id);
+    }
+#endif
+    /*
+    ** The primordial thread has the responsibility of cleaning
+    ** up the Mercury runtime. It cannot exit by this route.
+    */
+    MR_assert(engine_id != 0);
+    MR_destroy_thread(MR_cur_engine());
+    MR_SEM_POST(&shutdown_semaphore, "MR_do_sleep shutdown_sem");
+    pthread_exit(0);
+}
+
+static MR_Code*
+action_worksteal(MR_EngineId victim_engine_id)
+{
+    MR_SparkDeque *victim;
+    int steal_result;
+    MR_Spark spark;
+    engine_sleep_sync *esync =
+        &(engine_sleep_sync_data[MR_ENGINE(MR_eng_id)]);
+
+#ifdef MR_DEBUG_THREADS
+    if (MR_debug_threads) {
+        fprintf(stderr, "%ld Engine %d workstealing\n",
+            MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
+    }
+#endif
+
+    victim = MR_spark_deques[victim_engine_id];
+    MR_assert(victim != NULL);
+    steal_result = MR_wsdeque_steal_top(victim, &spark);
+    while (steal_result == -1) {
+        /*
+        ** Collision, relax the CPU and try again.
+        */
+        MR_ATOMIC_PAUSE;
+        steal_result = MR_wsdeque_steal_top(victim, &spark);
+    }
+
+    if (steal_result == 1) {
+        esync->d.es_state = ENGINE_STATE_WORKING;
+
+        /*
+        ** Steal from this engine next time, it may have more work.
+        */
+        MR_ENGINE(MR_eng_victim_counter) = victim_engine_id;
+#ifdef MR_THREADSCOPE
+        MR_threadscope_post_steal_spark(spark.MR_spark_id);
+#endif
+#ifdef MR_DEBUG_THREADS
+        if (MR_debug_threads) {
+            fprintf(stderr, "%ld Engine %d executing spark\n",
+                MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
+        }
+#endif
+        prepare_engine_for_spark(&spark);
+        return spark.MR_spark_resume;
+    } else {
+        /*
+        ** The deque is empty, next time try a different deque.
+        ** (+1 will do).
+        */
+        MR_ENGINE(MR_eng_victim_counter) = victim_engine_id + 1;
+        return NULL;
+    }
+}
+
+static MR_Code*
+action_context(MR_Context *context)
+{
+    MR_Code *resume_point;
+    engine_sleep_sync *esync =
+        &(engine_sleep_sync_data[MR_ENGINE(MR_eng_id)]);
+
+    esync->d.es_state = ENGINE_STATE_WORKING;
+    prepare_engine_for_context(context);
+
+#ifdef MR_DEBUG_STACK_SEGMENTS
+    MR_debug_log_message("resuming old context: %p",
+        context);
+#endif
+#ifdef MR_DEBUG_THREADS
+    if (MR_debug_threads) {
+        fprintf(stderr,
+            "%ld Engine %d running context %p from action\n",
+            MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id), context);
+    }
+#endif
+
+    resume_point = (MR_Code*)(context->MR_ctxt_resume);
+    context->MR_ctxt_resume = NULL;
+
+    return resume_point;
+}
 #endif
 
 #ifdef MR_THREAD_SAFE
@@ -2056,6 +2478,12 @@ do_get_context(void)
     MR_threadscope_post_looking_for_global_context();
     #endif
 
+    /*
+    ** We can only read the runqueue head after the store to engine's state
+    ** has finished.
+    */
+    MR_CPU_MFENCE;
+
     if (MR_runqueue_head != NULL) {
         MR_LOCK(&MR_runqueue_lock, "do_get_context (i)");
         ready_context = MR_find_ready_context();
@@ -2067,6 +2495,12 @@ do_get_context(void)
             #ifdef MR_DEBUG_STACK_SEGMENTS
             MR_debug_log_message("resuming old context: %p", ready_context);
             #endif
+            #ifdef MR_DEBUG_THREADS
+            if (MR_debug_threads) {
+                fprintf(stderr, "%ld Resuming context %p\n",
+                    MR_SELF_THREAD_ID, ready_context);
+            }
+            #endif
 
             resume_point = (MR_Code*)(ready_context->MR_ctxt_resume);
             ready_context->MR_ctxt_resume = NULL;
@@ -2222,6 +2656,13 @@ do_work_steal(void)
 #ifdef MR_THREADSCOPE
             MR_threadscope_post_steal_spark(spark.MR_spark_id);
 #endif
+#ifdef MR_DEBUG_THREADS
+            if (MR_debug_threads) {
+                fprintf(stderr, "%ld Engine %d executing spark\n",
+                    MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
+            }
+#endif
+
             prepare_engine_for_spark(&spark);
             return spark.MR_spark_resume;
         }
@@ -2247,24 +2688,6 @@ save_dirty_context(MR_Code *join_label) {
     this_context->MR_ctxt_resume = join_label;
     MR_ENGINE(MR_eng_this_context) = NULL;
 }
-
-static void
-advertise_engine_state_idle(void)
-{
-    engine_sleep_sync_data[MR_ENGINE(MR_eng_id)].d.es_state =
-        ENGINE_STATE_IDLE;
-    MR_CPU_SFENCE;
-    MR_atomic_inc_int(&MR_num_idle_engines);
-}
-
-static void
-advertise_engine_state_working(void)
-{
-    MR_atomic_dec_int(&MR_num_idle_engines);
-    MR_CPU_SFENCE;
-    engine_sleep_sync_data[MR_ENGINE(MR_eng_id)].d.es_state =
-        ENGINE_STATE_WORKING;
-}
 #endif /* MR_THREAD_SAFE */
 
 #endif /* !MR_HIGHLEVEL_CODE */
@@ -2275,6 +2698,7 @@ MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
 {
     MR_bool     jnc_last;
     MR_Context  *this_context = MR_ENGINE(MR_eng_this_context);
+    MR_Code     *jump_target;
 
   #ifdef MR_THREADSCOPE
     MR_threadscope_post_end_par_conjunct((MR_Word*)jnc_st);
@@ -2316,6 +2740,11 @@ MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
                 /* XXX: Need to configure using sched_yeild or spin waiting */
                 MR_ATOMIC_PAUSE;
             }
+            /*
+            ** We must read the resume label before we read the context as
+            ** the context is written first.
+            */
+            MR_CPU_LFENCE;
 #ifdef MR_THREADSCOPE
             MR_threadscope_post_context_runnable(jnc_st->MR_st_orig_context);
 #endif
@@ -2331,14 +2760,55 @@ MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
         */
         return join_label;
     } else {
-        if (this_context == jnc_st->MR_st_orig_context) {
-            MR_r1 = (MR_Word)join_label;
-            return MR_ENTRY(MR_do_idle_dirty_context);
+        volatile MR_Spark *spark;
+
+#ifdef MR_THREADSCOPE
+        MR_threadscope_post_looking_for_local_spark();
+#endif
+        spark = MR_wsdeque_pop_bottom(&MR_ENGINE(MR_eng_spark_deque));
+        if (spark != NULL) {
+            if ((this_context == jnc_st->MR_st_orig_context) &&
+                    (spark->MR_spark_sync_term != jnc_st)) {
+                /*
+                ** This spark is not compatible with the context.
+                **
+                ** Change the context.
+                */
+#ifdef MR_THREADSCOPE
+                MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
+#endif
+                save_dirty_context(join_label);
+                if (MR_runqueue_head != NULL) {
+                    /*
+                    ** There might be a suspended context. We should try
+                    ** to execute that.
+                    */
+                    MR_wsdeque_putback_bottom(&MR_ENGINE(MR_eng_spark_deque),
+                        (MR_Spark*) spark);
+                    return MR_ENTRY(MR_do_idle);
+                }
+            }
+            prepare_engine_for_spark(spark);
+
+            return spark->MR_spark_resume;
         } else {
-            /*
-            ** This engine and context should look for other work.
-            */
-            return MR_ENTRY(MR_do_idle_clean_context);
+            if (this_context == jnc_st->MR_st_orig_context) {
+                /*
+                ** Save our context and then look for work as per normal.
+                */
+#ifdef MR_THREADSCOPE
+                MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
+#endif
+                save_dirty_context(join_label);
+            } else {
+                /*
+                ** This engine and context should look for other work.
+                */
+#ifdef MR_THREADSCOPE
+                MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
+#endif
+            }
+            return MR_ENTRY(MR_do_idle);
         }
     }
 }
@@ -2412,8 +2882,7 @@ void mercury_sys_init_scheduler_wrapper_init(void)
 #ifndef MR_HIGHLEVEL_CODE
     scheduler_module_idle();
 #ifdef MR_THREAD_SAFE
-    scheduler_module_idle_clean_context();
-    scheduler_module_idle_dirty_context();
+    scheduler_module_idle_worksteal();
     scheduler_module_idle_sleep();
 #endif
 #endif
diff --git a/runtime/mercury_context.h b/runtime/mercury_context.h
index 339a754..d184143 100644
--- a/runtime/mercury_context.h
+++ b/runtime/mercury_context.h
@@ -258,9 +258,20 @@ struct MR_Spark_Struct {
 #endif
 };
 
+#define CACHE_LINE_SIZE 64
+#define PAD_CACHE_LINE(s) \
+    ((CACHE_LINE_SIZE) > (s) ? (CACHE_LINE_SIZE) - (s) : 0)
+
 struct MR_SparkDeque_Struct {
-    volatile MR_Integer     MR_sd_bottom;
+    /*
+    ** The top index is modified by theifs, the other fields are modified by
+    ** the owner.  Therefore we pad out the structure to reduce false
+    ** sharing.
+    */
     volatile MR_Integer     MR_sd_top;
+    char padding[PAD_CACHE_LINE(sizeof(MR_Integer))];
+
+    volatile MR_Integer     MR_sd_bottom;
     volatile MR_SparkArray  *MR_sd_active_array;
 };
 #endif  /* !MR_LL_PARALLEL_CONJ */
@@ -528,20 +539,6 @@ extern  void        MR_schedule_context(MR_Context *ctxt);
     do {                                                \
         MR_GOTO(MR_ENTRY(MR_do_idle));                  \
     } while (0)
-/*
-** MR_do_idle_clean_context should be used by an engine that is becoming idle
-** with a context that may be re-used.
-*/
-  MR_declare_entry(MR_do_idle_clean_context);
-/*
-** The same is true for MR_do_idle_dirty_context except that the context is
-** blocked on the end of a parallel conjunction, It may nnly be used to execute
-** sparks that contribute to that parallel conjunction.
-**
-** This takes one argument in MR_r1, the join label at the end of the parallel
-** conjunction.
-*/
-  MR_declare_entry(MR_do_idle_dirty_context);
 #endif
 
 #ifndef MR_CONSERVATIVE_GC
@@ -628,6 +625,12 @@ extern  void        MR_schedule_context(MR_Context *ctxt);
   #define MR_IF_THREADSCOPE(x)
 #endif
 
+#ifdef MR_WORKSTEAL_POLLING
+  #define MR_IF_NOT_WORKSTEAL_POLLING(x)
+#else
+  #define MR_IF_NOT_WORKSTEAL_POLLING(x) x
+#endif
+
 #define MR_load_context(cptr)                                                 \
     do {                                                                      \
         MR_Context  *load_context_c;                                          \
@@ -823,7 +826,9 @@ do {                                                                         \
     MR_IF_THREADSCOPE(                                                       \
         MR_uint_least32_t   id;                                              \
     )                                                                        \
-    union MR_engine_wake_action_data action_data;                            \
+    MR_IF_NOT_WORKSTEAL_POLLING(                                             \
+        union MR_engine_wake_action_data action_data;                        \
+    )                                                                        \
                                                                              \
     fnc_spark.MR_spark_sync_term = (MR_SyncTerm*) &(sync_term);              \
     fnc_spark.MR_spark_resume = (child);                                     \
@@ -837,12 +842,14 @@ do {                                                                         \
     MR_IF_THREADSCOPE(                                                       \
         MR_threadscope_post_sparking(&(sync_term), fnc_spark.MR_spark_id);   \
     )                                                                        \
-    action_data.MR_ewa_worksteal_engine = MR_ENGINE(MR_eng_id);              \
-    if (MR_num_idle_engines > 0) {                                           \
-        MR_try_wake_an_engine(MR_ENGINE(MR_eng_id),                          \
-            MR_ENGINE_ACTION_WORKSTEAL,                                      \
-            &action_data, NULL);                                             \
-    }                                                                        \
+    MR_IF_NOT_WORKSTEAL_POLLING(                                             \
+        action_data.MR_ewa_worksteal_engine = MR_ENGINE(MR_eng_id);          \
+        if (MR_num_idle_engines > 0) {                                       \
+            MR_try_wake_an_engine(MR_ENGINE(MR_eng_id),                      \
+                MR_ENGINE_ACTION_WORKSTEAL_ADVICE,                           \
+                &action_data, NULL);                                         \
+        }                                                                    \
+    )                                                                        \
 } while (0)
 
   /*
@@ -885,10 +892,18 @@ MR_do_join_and_continue(MR_SyncTerm *sync_term, MR_Code *join_label);
 ** exported here for use by the MR_fork_new_child macro above.
 */
 
-#define MR_ENGINE_ACTION_NONE 0
-#define MR_ENGINE_ACTION_CONTEXT 1
-#define MR_ENGINE_ACTION_WORKSTEAL 2
-#define MR_ENGINE_ACTION_SHUTDOWN 3
+#define MR_ENGINE_ACTION_NONE               0x0000
+/*
+** ACTION_CONTEXT applies when an engine is being given a context directly
+*/
+#define MR_ENGINE_ACTION_CONTEXT            0x0001
+#define MR_ENGINE_ACTION_SHUTDOWN           0x0002
+#define MR_ENGINE_ACTION_WORKSTEAL_ADVICE   0x0004
+/*
+** ACTION_CONTEXT_ADVICE applies when a context is on the run queue that
+** this engine should check.
+*/
+#define MR_ENGINE_ACTION_CONTEXT_ADVICE     0x0008
 
 union MR_engine_wake_action_data {
     /*
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 490 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20121018/6b4cff25/attachment.sig>


More information about the reviews mailing list