[m-rev.] for review: work stealing for parallel conjunctions
    Julien Fischer 
    juliensf at csse.unimelb.edu.au
       
    Tue Nov 25 23:28:28 AEDT 2008
    
    
  
On Fri, 7 Nov 2008, Peter Wang wrote:
> Benchmark results at a later date.
>
> Branches: main
>
> Implement work stealing for parallel conjunctions.  This builds on an older
> patch which introduced work-stealing deques to the runtime but didn't
> perform work stealing.
>
> Previously when we came across a parallel conjunct, we would place a spark
> into either the _global spark queue_ or the _local spark stack_ of the
> Mercury context.  A spark on the global spark queue may be picked up for
> parallel execution by an idle Mercury engine, whereas a spark on a local
> spark stack is confined to execution in the context that originated it.
>
> The problem is that we have to decide, ahead of time, where to put a spark.
> Ideally, we should have just enough sparks in the global queue to keep the
> available Mercury engines busy, and leave the rest of the sparks to execute
> in their original contexts since that is more efficient.  But we can't
> predict the future so have to make do with guesses using simple heuristics.
> A bad decision, once made, cannot be reversed.  An engine may sit idle due
> to an empty global spark queue, even while there are sparks available in
> some local spark stacks.
>
> In the work stealing scheme, sparks are always placed into each context's
> _local spark deque_.  Idle engines actively try to steal sparks from random
> spark deques.  We don't need to make irreversible and potentially suboptimal
> decisions about where to put sparks.  Making a spark available for parallel
> execution is cheap and happens by default because of the work-stealing
> deques; putting a spark on a global queue implies synchronisation with other
> threads.  The downside is that idle engines need to expend more time and
> effort to find the work from multiple places instead of just one place.
>
> Practically, the new scheme seems to work as well as the old scheme and vice
> versa, except that the old scheme often required `--max-context-per-threads'
> to be set "correctly" to get good results.
>
> Only tested on x86-64, which has a relatively constrained memory model.
You should check that it works on plain old x86 as well before
committing.
> configure.in:
> runtime/mercury_conf.h.in:
>      Check if sched_yield() is available.
>
>      Account for changed size of sync terms.
>
> runtime/mercury_atomic_ops.c:
> runtime/mercury_atomic_ops.h:
>      Add an atomic decrement operation.
>
> runtime/mercury_context.c:
>      Keep pointers to all spark deques in a flat array, so we have access
>      to them for stealing.
>
>      Add code to steal sparks in MR_do_runnext.  Clean up MR_do_runnext.
>
>      Add MR_sched_yield().
>
>      Delete references to the global spark queue, MR_num_idle_engines,
>      MR_num_outstanding_contexts_and_sparks.
>
> runtime/mercury_context.h:
>      Make sparks point to their parent sync terms so the MR_st_is_shared
>      field can be modified when a spark is stolen.
>
>      Move the parent_sp field from sparks to sync terms as it is the same
>      for all sparks from the same parallel conjunction.
>
>      Change MR_SyncTerm, MR_fork_new_child(), MR_join_and_continue() for
>      work-stealing.  Under the old scheme, we could rely on the fact that a
>      sync term which was "unshared" (placed on a local spark stack) would
>      remain unshared, and so we could avoid costly locks or atomic
>      instructions when decrementing the "count" field of a sync term.  This
>      no longer holds, as sync terms may become shared at any time by having
>      one of its sparks stolen.  Fortunately, with a little extra work, we
>      can still avoid costly operations in the common case.
>
>      Replace MR_choose_parallel_over_sequential_cond() by a dummy
>      definition, as the choice no longer exists.
>
> runtime/mercury_thread.c:
>      Use atomic decrement to count the number of engines we are waiting for
>      at start up, since we have atomic decrement now.
>
> runtime/mercury_thread.h:
>      Add MR_TIMED_WAIT wrapper for pthread_cond_wait().
>
> runtime/mercury_wrapper.c:
> runtime/mercury_wrapper.h:
>      Add MERCURY_OPTIONS `--worksteal-max-attempts' and
>      `--worksteal-sleep-msecs' and associated globals.
>
>      Delete `--max-contexts-per-thread' option.
>
>      Count the number of engines we are waiting for at startup.
>
> runtime/mercury_wsdeque.h:
>      Make MR_wsdeque_pop_bottom() return only the resume code address after
>      successfully popping a spark, as that is the only part of the spark
>      needed now.
>
> doc/user_guide.texi:
>      Add commented out documentation for two new tunable parameters,
>      `--worksteal-max-attempts' and `--worksteal-sleep-msecs'.
>      Implementors may want to experiment with different values but end
>      users shouldn't need to know about them.
>
...
> diff --git a/runtime/mercury_context.c b/runtime/mercury_context.c
> index d329c50..44c3ea1 100644
> --- a/runtime/mercury_context.c
> +++ b/runtime/mercury_context.c
> @@ -27,28 +27,31 @@ ENDINIT
> 	#include <unistd.h>	/* for select() on OS X */
>   #endif
> #endif
> +#ifdef MR_WIN32
> +  #include <sys/timeb.h>    /* for _ftime() */
> +#endif
>
> #include "mercury_memory_handlers.h"
> #include "mercury_context.h"
> #include "mercury_engine.h"             /* for `MR_memdebug' */
> #include "mercury_reg_workarounds.h"    /* for `MR_fd*' stuff */
>
> -static void
> -MR_init_context_maybe_generator(MR_Context *c, const char *id,
> -    MR_GeneratorPtr gen);
> +static  void    MR_init_context_maybe_generator(MR_Context *c, const char *id,
> +                    MR_GeneratorPtr gen);
> +
> +#ifdef  MR_LL_PARALLEL_CONJ
> +static  void    MR_add_spark_deque(MR_SparkDeque *sd);
> +static  void    MR_delete_spark_deque(const MR_SparkDeque *sd);
> +static  void    MR_sleep_runqueue(unsigned int msecs);
> +static  void    MR_milliseconds_from_now(struct timespec *timeout,
> +                    unsigned int msecs);
> +#endif
>
> /*---------------------------------------------------------------------------*/
>
> /*
> -** The run queue and spark queue are protected and signalled with the
> -** same lock and condition variable.
> -**
> -** The single sync term lock is used to prevent races in MR_join_and_continue.
> -** The holder of the sync term lock may acquire the runqueue lock but not vice
> -** versa.  (We could also have one sync term lock per context, and make
> -** MR_join_and_continue acquire the sync term lock of the context that
> -** originated the parallel conjunction, but contention for the single lock
> -** doesn't seem to be an issue.)
> +** The run queue is protected with MR_runqueue_lock and signalled
> +** MR_runqueue_cond.
signalled by?
> */
> MR_Context              *MR_runqueue_head;
> MR_Context              *MR_runqueue_tail;
> @@ -56,10 +59,6 @@ MR_Context              *MR_runqueue_tail;
>   MercuryLock           MR_runqueue_lock;
>   MercuryCond           MR_runqueue_cond;
> #endif
> -#ifdef  MR_LL_PARALLEL_CONJ
> -  MR_SparkDeque         MR_spark_queue;
> -  MercuryLock           MR_sync_term_lock;
> -#endif
>
> MR_PendingContext       *MR_pending_contexts;
> #ifdef  MR_THREAD_SAFE
> @@ -79,8 +78,10 @@ static MR_Context       *free_small_context_list = NULL;
> #endif
>
> #ifdef  MR_LL_PARALLEL_CONJ
> -int MR_num_idle_engines = 0;
> -int MR_num_outstanding_contexts_and_sparks = 0;
> +static MercuryLock      spark_deques_lock;
> +static MR_SparkDeque    **MR_spark_deques = NULL;
> +static int              MR_max_spark_deques = 0;
> +static int              MR_victim_counter = 0;
> #endif
>
> /*---------------------------------------------------------------------------*/
> @@ -96,8 +97,7 @@ MR_init_thread_stuff(void)
>     pthread_mutex_init(&MR_global_lock, MR_MUTEX_ATTR);
>     pthread_mutex_init(&MR_pending_contexts_lock, MR_MUTEX_ATTR);
>   #ifdef MR_LL_PARALLEL_CONJ
> -    MR_init_wsdeque(&MR_spark_queue, MR_INITIAL_GLOBAL_SPARK_QUEUE_SIZE);
> -    pthread_mutex_init(&MR_sync_term_lock, MR_MUTEX_ATTR);
> +    pthread_mutex_init(&spark_deques_lock, MR_MUTEX_ATTR);
>   #endif
>     pthread_mutex_init(&MR_STM_lock, MR_MUTEX_ATTR);
>   #ifndef MR_THREAD_LOCAL_STORAGE
> @@ -128,7 +128,7 @@ MR_finalize_runqueue(void)
>     pthread_mutex_destroy(&free_context_list_lock);
> #endif
> #ifdef  MR_LL_PARALLEL_CONJ
> -    pthread_mutex_destroy(&MR_sync_term_lock);
> +    pthread_mutex_destroy(&spark_deques_lock);
> #endif
> }
>
> @@ -264,6 +264,7 @@ MR_init_context_maybe_generator(MR_Context *c, const char *id,
>     c->MR_ctxt_parent_sp = NULL;
>     MR_init_wsdeque(&c->MR_ctxt_spark_deque,
>         MR_INITIAL_LOCAL_SPARK_DEQUE_SIZE);
> +    MR_add_spark_deque(&c->MR_ctxt_spark_deque);
>   #endif /* MR_LL_PARALLEL_CONJ */
>
> #endif /* !MR_HIGHLEVEL_CODE */
> @@ -315,10 +316,6 @@ MR_create_context(const char *id, MR_ContextSize ctxt_size, MR_Generator *gen)
>
>     MR_LOCK(&free_context_list_lock, "create_context");
>
> -#ifdef MR_LL_PARALLEL_CONJ
> -    MR_num_outstanding_contexts_and_sparks++;
> -#endif
> -
>     /*
>     ** Regular contexts have stacks at least as big as small contexts,
>     ** so we can return a regular context in place of a small context
> @@ -373,11 +370,11 @@ MR_destroy_context(MR_Context *c)
>         c->MR_ctxt_nondetstack_zone->MR_zone_min);
> #endif /* defined(MR_CONSERVATIVE_GC) && !defined(MR_HIGHLEVEL_CODE) */
>
> -    MR_LOCK(&free_context_list_lock, "destroy_context");
> -#ifdef MR_LL_PARALLEL_CONJ
> -    MR_num_outstanding_contexts_and_sparks--;
> +#ifdef  MR_LL_PARALLEL_CONJ
> +    MR_delete_spark_deque(&c->MR_ctxt_spark_deque);
> #endif
>
> +    MR_LOCK(&free_context_list_lock, "destroy_context");
>     switch (c->MR_ctxt_size) {
>         case MR_CONTEXT_SIZE_REGULAR:
>             c->MR_ctxt_next = free_context_list;
> @@ -391,12 +388,216 @@ MR_destroy_context(MR_Context *c)
>     MR_UNLOCK(&free_context_list_lock, "destroy_context");
> }
>
> +#ifdef MR_LL_PARALLEL_CONJ
> +
> +static void
> +MR_add_spark_deque(MR_SparkDeque *sd)
> +{
> +    int             slot;
> +
> +    MR_LOCK(&spark_deques_lock, "create_spark_deque");
> +
> +    /* Search for an spark deque slot. */
> +    for (slot = 0; slot < MR_max_spark_deques; slot++) {
> +        if (MR_spark_deques[slot] == NULL) {
> +            break;
> +        }
> +    }
> +
> +    if (slot == MR_max_spark_deques) {
> +        if (MR_max_spark_deques == 0) {
> +            MR_max_spark_deques = 1;
> +        } else if (MR_max_spark_deques < 32) {
> +            MR_max_spark_deques *= 2;
> +        } else {
> +            MR_max_spark_deques += 16;
> +        }
> +        MR_spark_deques = MR_GC_RESIZE_ARRAY(MR_spark_deques,
> +            MR_SparkDeque *, MR_max_spark_deques);
> +    }
> +
> +    MR_spark_deques[slot] = sd;
> +
> +    MR_UNLOCK(&spark_deques_lock, "create_spark_deque");
> +}
> +
> +static void
> +MR_delete_spark_deque(const MR_SparkDeque *sd)
> +{
> +    int i;
> +
> +    MR_LOCK(&spark_deques_lock, "delete_spark_deque");
> +
> +    for (i = 0; i < MR_max_spark_deques; i++) {
> +        if (MR_spark_deques[i] == sd) {
> +            MR_spark_deques[i] = NULL;
> +            break;
> +        }
> +    }
> +
> +    MR_UNLOCK(&spark_deques_lock, "delete_spark_deque");
> +}
> +
> +/* Search for a ready context which we can handle. */
> +static MR_Context *
> +MR_find_ready_context(MercuryThread thd, MR_Unsigned depth)
> +{
> +    MR_Context  *cur;
> +    MR_Context  *prev;
> +
> +    cur = MR_runqueue_head;
> +    /* XXX check pending io */
Meaning?
> +    prev = NULL;
> +    while (cur != NULL) {
> +        if (cur->MR_ctxt_resume_owner_thread == thd &&
> +            cur->MR_ctxt_resume_c_depth == depth)
> +        {
> +            cur->MR_ctxt_resume_owner_thread = (MercuryThread) NULL;
> +            cur->MR_ctxt_resume_c_depth = 0;
> +            break;
> +        }
> +
> +        if (cur->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
> +            break;
> +        }
> +
> +        prev = cur;
> +        cur = cur->MR_ctxt_next;
> +    }
> +
> +    if (cur != NULL) {
> +        if (prev != NULL) {
> +            prev->MR_ctxt_next = cur->MR_ctxt_next;
> +        } else {
> +            MR_runqueue_head = cur->MR_ctxt_next;
> +        }
> +        if (MR_runqueue_tail == cur) {
> +            MR_runqueue_tail = prev;
> +        }
> +    }
> +
> +    return cur;
> +}
...
> @@ -651,9 +833,13 @@ MR_define_entry(MR_do_runnext);
>             MR_CONTEXT_SIZE_SMALL, NULL);
>         MR_load_context(MR_ENGINE(MR_eng_this_context));
>     }
> -    MR_parent_sp = spark.MR_spark_parent_sp;
> -    MR_assert(MR_parent_sp != MR_sp);
> +    MR_parent_sp = spark.MR_spark_sync_term->MR_st_parent_sp;
>     MR_SET_THREAD_LOCAL_MUTABLES(spark.MR_spark_thread_local_mutables);
> +
> +    assert(MR_parent_sp);
> +    assert(MR_parent_sp != MR_sp);
> +    assert(spark.MR_spark_sync_term->MR_st_count > 0);
s/assert/MR_assert/
> +
>     MR_GOTO(spark.MR_spark_resume);
> }
> #else /* !MR_THREAD_SAFE */
> @@ -675,7 +861,7 @@ MR_define_entry(MR_do_runnext);
>     MR_load_context(MR_ENGINE(MR_eng_this_context));
>     MR_GOTO(MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume);
> }
> -#endif
> +#endif /* !MR_THREAD_SAFE */
>
> MR_END_MODULE
>
> diff --git a/runtime/mercury_context.h b/runtime/mercury_context.h
> index e12c48f..3f98c13 100644
> --- a/runtime/mercury_context.h
> +++ b/runtime/mercury_context.h
...
> -  #define MR_join_and_continue_2()                                            \
> +  #define MR_jnc_nonlast_conjunct(jnc_st, join_label)                         \
>     do {                                                                      \
> -        MR_Context  *jnc_ctxt;                                                \
> +        MR_Context  *jnc_this_ctxt;                                           \
>         MR_bool     jnc_popped;                                               \
> -        MR_Spark    jnc_spark;                                                \
> +        MR_Code     *jnc_spark_resume;                                        \
>                                                                               \
> -        jnc_ctxt = MR_ENGINE(MR_eng_this_context);                            \
> -        jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,    \
> -            &jnc_spark);                                                      \
> -        if (jnc_popped && (jnc_spark.MR_spark_parent_sp == MR_parent_sp)) {   \
> -            /*                                                                \
> -            ** The spark at the top of the stack is due to the same parallel  \
> -            ** conjunction that we've just been executing. We can immediately \
> -            ** execute the next branch of the same parallel conjunction in    \
> -            ** the current context.                                           \
> -            */                                                                \
> -            MR_UNLOCK(&MR_sync_term_lock, "continue_2 i");                    \
> -            MR_GOTO(jnc_spark.MR_spark_resume);                               \
> +        jnc_this_ctxt = MR_ENGINE(MR_eng_this_context);                       \
> +        jnc_popped = MR_wsdeque_pop_bottom(                                   \
> +            &jnc_this_ctxt->MR_ctxt_spark_deque, &jnc_spark_resume);          \
> +        if (jnc_popped) {                                                     \
> +            MR_GOTO(jnc_spark_resume);                                        \
>         } else {                                                              \
>             /*                                                                \
> -            ** The spark stack is empty or the next spark is from a different \
> -            ** parallel conjunction to the one we've been executing.  Either  \
> -            ** way, there's nothing more we can do with this context right    \
> -            ** now.  Put back the spark we won't be using.                    \
> -            */                                                                \
> -            if (jnc_popped) {                                                 \
> -                MR_wsdeque_putback_bottom(&jnc_ctxt->MR_ctxt_spark_deque,     \
> -                    &jnc_spark);                                              \
> -            }                                                                 \
> -            /*                                                                \
>             ** If this context originated the parallel conjunction we've been \
> -            ** executing, the rest of the parallel conjunction must have been \
> -            ** put on the global spark queue to be executed in other          \
> -            ** contexts.  This context will need to be resumed once the       \
> -            ** parallel conjunction is completed, so suspend the context.     \
> +            ** executing, suspend this context such that it will be resumed   \
> +            ** at the join label once the parallel conjunction is completed.  \
> +            **                                                                \
> +            ** Otherwise we can reuse this context for the next piece of work.\
>             */                                                                \
> -            if (jnc_ctxt == jnc_st->MR_st_orig_context) {                     \
> -                MR_save_context(jnc_ctxt);                                    \
> +            if (jnc_this_ctxt == jnc_st->MR_st_orig_context) {                \
> +                MR_save_context(jnc_this_ctxt);                               \
> +                jnc_this_ctxt->MR_ctxt_resume = (join_label);                 \
>                 MR_ENGINE(MR_eng_this_context) = NULL;                        \
>             }                                                                 \
> -            /* Finally look for other work. */                                \
> -            MR_UNLOCK(&MR_sync_term_lock, "continue_2 ii");                   \
>             MR_runnext();                                                     \
>         }                                                                     \
>     } while (0)
>
> +  /* XXX this is meaningless now */
> +  #define MR_choose_parallel_over_sequential_cond(target_cpus) (MR_TRUE)
> +
You should mention why it cannot be removed yet - because
compiler/granularity.m will still generate references to it.
Doesn't the above make the granularity transformation redundant?  If so,
why keep it?
The diff looked fine to me otherwise.  Is there anyway to get the old
behaviour?  If not, then it's probably worth tagging the repository
before committing this change.
Julien.
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to:       mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions:          mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------
    
    
More information about the reviews
mailing list