[m-rev.] Implement work-stealing
Paul Bone
pbone at csse.unimelb.edu.au
Wed Dec 9 10:58:16 AEDT 2009
Implement work stealing.
This patch is heavily based on earlier, uncommitted work by Peter Wang. It
has been updated so that it applies against the current version of the source.
A number of other changes have been made. Peter's original ChangeLog
follows:
Implement work stealing for parallel conjunctions. This builds on an
older patch which introduced work-stealing deques to the runtime but
didn't perform work stealing.
Previously when we came across a parallel conjunct, we would place a spark
into either the _global spark queue_ or the _local spark stack_ of the
Mercury context. A spark on the global spark queue may be picked up for
parallel execution by an idle Mercury engine, whereas a spark on a local
spark stack is confined to execution in the context that originated it.
The problem is that we have to decide, ahead of time, where to put a
spark. Ideally, we should have just enough sparks in the global queue to
keep the available Mercury engines busy, and leave the rest of the sparks
to execute in their original contexts since that is more efficient. But
we can't predict the future so have to make do with guesses using simple
heuristics. A bad decision, once made, cannot be reversed. An engine may
sit idle due to an empty global spark queue, even while there are sparks
available in some local spark stacks.
In the work stealing scheme, sparks are always placed into each context's
_local spark deque_. Idle engines actively try to steal sparks from
random spark deques. We don't need to make irreversible and potentially
suboptimal decisions about where to put sparks. Making a spark available
for parallel execution is cheap and happens by default because of the
work-stealing deques; putting a spark on a global queue implies
synchronisation with other threads. The downside is that idle engines
need to expend more time and effort to find the work from multiple places
instead of just one place.
Practically, the new scheme seems to work as well as the old scheme and
vice versa, except that the old scheme often required
`--max-context-per-threads' to be set "correctly" to get good results.
Only tested on x86-64, which has a relatively constrained memory model.
My modifications include:
The difference between 'shared' and 'private' synchronisation terms has
been removed. All sync terms are assumed to be shared and thread-safe
operations are used everywhere. This allows us to remove complicated code
used when a private synchronisation term became shared. This may change
the performance of thread stealing, in particular it may become slower due
to the assumption that all sync terms are shared and therefore atomic
operations must always be used when decrementing their count field.
I've re-factored MR_do_join_and_continue, It is now much simpler as the
conditional code in it enumerates the possible cases clearly.
This change bootchecks and successfully runs the test suite in asm_fast.gc
asm_fast.gc.par hlc.gc and hlc.par, no other grades where tested. I have not
yet tested performance.
runtime/mercury_context.c:
runtime/mercury_context.h:
Keep pointers to all spark deques in a flat array, so we have access
to them for stealing.
Added functions to manage the global array of spark deques.
Modify MR_do_run_next, it now attempts to steal work from other context's
spark queues. Threads sleeping on the condition variable in
MR_do_run_next now use a timed wait so they can wakeup and try to steal
sparks.
Re-factored MR_do_join_and_continue.
MR_num_idle_engines is used by atomic operations, it has been made an
MR_Integer so that it's size matches the expectations of the atomic
operations we have defined.
Modified the MR_SyncTerm and MR_Spark structures. Sparks now point to
their sync terms. The perant stack pointer has been moved into the
SyncTerm structure. The MR_st_is_shared field in the MR_SyncTerm
structure has been removed.
runtime/mercury_atomic_ops.c:
runtime/mercury_atomic_ops.h:
Implement a new atomic operation: decrement integer and is zero. On the
x86/x86_64 one can't atomically decrement an integer and fetch the result
in a single instruction, a loop with a 'compare and exchange' instruction
is necessary. However since we only want to test if the value has become
zero after the decrement we can use the processor's flags. This can be
done in two instructions, but more importantly a loop is not required and
only one instruction is atomic.
runtime/mercury_wrapper.c:
runtime/mercury_wrapper.h:
Added runtime tunable options for work stealing. These control the number
of attempts an idle engine will make when looking for work, and the
duration to sleep after failing to find any work.
runtime/mercury_thread.c:
runtime/mercury_thread.h:
Added MR_COND_TIMED_WAIT, which waits on condition variables like
MR_COND_WAIT except that it may time out.
runtime/mercury_wsdeque.h:
runtime/mercury_wsdeque.c:
MR_wsdeque_pop_bottom now uses it's second argument to return the code
address to jump to rather the whole spark.
runtime/mercury_conf.h.in:
configure.in:
Test for sched_yield()
Change the synchronisation term structure.
doc/user_guide.texi:
Add commented out documentation for two new tunable parameters,
`--worksteal-max-attempts' and `--worksteal-sleep-msecs'.
Implementors may want to experiment with different values but end
users shouldn't need to know about them.
Index: configure.in
===================================================================
RCS file: /home/mercury1/repository/mercury/configure.in,v
retrieving revision 1.556
diff -u -p -b -r1.556 configure.in
--- configure.in 3 Dec 2009 05:27:59 -0000 1.556
+++ configure.in 7 Dec 2009 00:04:40 -0000
@@ -1154,7 +1154,7 @@ mercury_check_for_functions \
grantpt unlockpt ptsname tcgetattr tcsetattr ioctl \
access sleep opendir readdir closedir mkdir symlink readlink \
gettimeofday setenv putenv _putenv posix_spawn sched_setaffinity \
- sched_getcpu
+ sched_getcpu sched_yield
#-----------------------------------------------------------------------------#
@@ -1935,8 +1935,8 @@ AC_CACHE_VAL(mercury_cv_sync_term_size,
int main() {
struct {
void *orig_context;
- int count;
- int is_shared;
+ void *parent_sp;
+ $mercury_cv_word_type count;
} x;
FILE *fp;
Index: doc/user_guide.texi
===================================================================
RCS file: /home/mercury1/repository/mercury/doc/user_guide.texi,v
retrieving revision 1.598
diff -u -p -b -r1.598 user_guide.texi
--- doc/user_guide.texi 5 Nov 2009 05:47:40 -0000 1.598
+++ doc/user_guide.texi 2 Dec 2009 10:22:45 -0000
@@ -9972,6 +9972,18 @@ multiplied by the word size in bytes.
@c Sets the size of the redzone on the trail to @var{size} kilobytes
@c multiplied by the word size in bytes.
+ at c @sp 1
+ at c @item --worksteal-max-attempts @var{attempts}
+ at c @findex --worksteal-max-attempts (runtime option)
+ at c Tells idle Mercury engines to attempt to steal parallel conjuncts
+ at c up to a maximum of @var{attempts} times before sleeping.
+
+ at c @sp 1
+ at c @item --worksteal-sleep-msecs @var{milliseconds}
+ at c @findex --worksteal-sleep-msecs (runtime option)
+ at c Sets the amount of time that an idle Mercury engine should sleep before
+ at c attempting more work-stealing attempts.
+
@sp 1
@item -i @var{filename}
@itemx --mdb-in @var{filename}
Index: runtime/mercury_atomic_ops.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_atomic_ops.c,v
retrieving revision 1.7
diff -u -p -b -r1.7 mercury_atomic_ops.c
--- runtime/mercury_atomic_ops.c 3 Dec 2009 05:28:00 -0000 1.7
+++ runtime/mercury_atomic_ops.c 6 Dec 2009 23:12:52 -0000
@@ -68,6 +68,15 @@ MR_OUTLINE_DEFN(
}
)
+MR_OUTLINE_DEFN(
+ MR_bool
+ MR_atomic_dec_int_and_is_zero(volatile MR_Integer *addr)
+,
+ {
+ MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY;
+ }
+)
+
#endif /* MR_LL_PARALLEL_CONJ */
/*---------------------------------------------------------------------------*/
Index: runtime/mercury_atomic_ops.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_atomic_ops.h,v
retrieving revision 1.8
diff -u -p -b -r1.8 mercury_atomic_ops.h
--- runtime/mercury_atomic_ops.h 3 Dec 2009 05:28:00 -0000 1.8
+++ runtime/mercury_atomic_ops.h 8 Dec 2009 22:11:45 -0000
@@ -269,6 +269,61 @@ MR_atomic_sub_int(volatile MR_Integer *a
#endif
/*
+ * Decrement the integer at the pointed to address and set is_zero if it is
+ * zero after the decrement. While fetching the value is more powerful on
+ * x86(_64) it requires a compare and exchange loop.
+ */
+MR_EXTERN_INLINE MR_bool
+MR_atomic_dec_int_and_is_zero(volatile MR_Integer *addr);
+
+/*
+ * Note that on x86(_64) we have to use the sub instruction rather than the
+ * dec instruction because we need it to set the CPU flags.
+ */
+#if defined(__GNUC__) && defined(__x86_64__)
+
+ #define MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY \
+ do { \
+ char is_zero; \
+ __asm__( \
+ "lock; subq $1, %0; setz %1" \
+ : "=m"(*addr), "=q"(is_zero) \
+ : "m"(*addr) \
+ ); \
+ return (MR_bool)is_zero; \
+ } while (0)
+
+#elif defined(__GNUC__) && defined(__i386__)
+
+ #define MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY \
+ do { \
+ char is_zero; \
+ __asm__( \
+ "lock: subl $1, %0; setz %1" \
+ : "=m"(*addr), "=q"(is_zero) \
+ : "m"(*addr) \
+ ); \
+ return (MR_bool)is_zero; \
+ } while (0)
+
+#elif __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
+
+ #define MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY \
+ do { \
+ is_zero = __sync_sub_and_fetch(addr, 1) == 0; \
+ } while (0)
+
+#endif
+
+#ifdef MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY
+ MR_EXTERN_INLINE MR_bool
+ MR_atomic_dec_int_and_is_zero(volatile MR_Integer *addr)
+ {
+ MR_ATOMIC_DEC_INT_AND_IS_ZERO_BODY;
+ }
+#endif
+
+/*
* Intel and AMD support a pause instruction that is roughly equivalent
* to a no-op. Intel recommend that it is used in spin-loops to improve
* performance. Without a pause instruction multiple simultaneous
Index: runtime/mercury_conf.h.in
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_conf.h.in,v
retrieving revision 1.67
diff -u -p -b -r1.67 mercury_conf.h.in
--- runtime/mercury_conf.h.in 3 Dec 2009 05:28:00 -0000 1.67
+++ runtime/mercury_conf.h.in 8 Dec 2009 22:17:52 -0000
@@ -272,6 +272,7 @@
** MR_HAVE_FESETROUND we have the fesetround() function.
** MR_HAVE_SCHED_SETAFFINITY we have the sched_setaffinity() function.
** MR_HAVE_SCHED_GETCPU we have the sched_getcpu() function (glibc specific).
+** MR_HAVE_SCHED_YIELD we have the sched_yield() function.
*/
#undef MR_HAVE_GETPID
#undef MR_HAVE_SETPGID
@@ -335,6 +336,7 @@
#undef MR_HAVE_FESETROUND
#undef MR_HAVE_SCHED_SETAFFINITY
#undef MR_HAVE_SCHED_GETCPU
+#undef MR_HAVE_SCHED_YIELD
/*
** We use mprotect() and signals to catch stack and heap overflows.
Index: runtime/mercury_context.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.c,v
retrieving revision 1.72
diff -u -p -b -r1.72 mercury_context.c
--- runtime/mercury_context.c 3 Dec 2009 05:28:00 -0000 1.72
+++ runtime/mercury_context.c 8 Dec 2009 22:36:52 -0000
@@ -35,6 +35,10 @@ ENDINIT
#include <sched.h>
#endif
+#ifdef MR_WIN32
+ #include <sys/timeb.h> /* for _ftime() */
+#endif
+
#include "mercury_memory_handlers.h"
#include "mercury_context.h"
#include "mercury_engine.h" /* for `MR_memdebug' */
@@ -47,16 +51,24 @@ ENDINIT
/*---------------------------------------------------------------------------*/
+static void
+MR_init_context_maybe_generator(MR_Context *c, const char *id,
+ MR_GeneratorPtr gen);
+
+/*---------------------------------------------------------------------------*/
+
+#ifdef MR_LL_PARALLEL_CONJ
+static void
+MR_add_spark_deque(MR_SparkDeque *sd);
+static void
+MR_delete_spark_deque(const MR_SparkDeque *sd);
+static void
+MR_milliseconds_from_now(struct timespec *timeout, unsigned int msecs);
+#endif
+
/*
-** The run queue and spark queue are protected and signalled with the
-** same lock and condition variable.
-**
-** The single sync term lock is used to prevent races in MR_join_and_continue.
-** The holder of the sync term lock may acquire the runqueue lock but not vice
-** versa. (We could also have one sync term lock per context, and make
-** MR_join_and_continue acquire the sync term lock of the context that
-** originated the parallel conjunction, but contention for the single lock
-** doesn't seem to be an issue.)
+** The run queue is protected with MR_runqueue_lock and signalled with
+** MR_runqueue_cond.
*/
MR_Context *MR_runqueue_head;
MR_Context *MR_runqueue_tail;
@@ -64,10 +76,6 @@ MR_Context *MR_runqueue_tai
MercuryLock MR_runqueue_lock;
MercuryCond MR_runqueue_cond;
#endif
-#ifdef MR_LL_PARALLEL_CONJ
- MR_SparkDeque MR_spark_queue;
- MercuryLock MR_sync_term_lock;
-#endif
MR_PendingContext *MR_pending_contexts;
#ifdef MR_THREAD_SAFE
@@ -130,13 +138,17 @@ static MR_Context *free_small_cont
#endif
#ifdef MR_LL_PARALLEL_CONJ
-int volatile MR_num_idle_engines = 0;
+MR_Integer volatile MR_num_idle_engines = 0;
+MR_Unsigned volatile MR_num_exited_engines = 0;
int volatile MR_num_outstanding_contexts_and_global_sparks = 0;
MR_Integer volatile MR_num_outstanding_contexts_and_all_sparks = 0;
-MR_Unsigned volatile MR_num_exited_engines = 0;
-
static MercuryLock MR_par_cond_stats_lock;
+static MercuryLock spark_deques_lock;
+static MR_SparkDeque **MR_spark_deques = NULL;
+static MR_Integer MR_max_spark_deques = 0;
+static MR_Integer MR_victim_counter = 0;
+
#endif
/*---------------------------------------------------------------------------*/
@@ -170,8 +182,7 @@ MR_init_thread_stuff(void)
pthread_mutex_init(&MR_global_lock, MR_MUTEX_ATTR);
pthread_mutex_init(&MR_pending_contexts_lock, MR_MUTEX_ATTR);
#ifdef MR_LL_PARALLEL_CONJ
- MR_init_wsdeque(&MR_spark_queue, MR_INITIAL_GLOBAL_SPARK_QUEUE_SIZE);
- pthread_mutex_init(&MR_sync_term_lock, MR_MUTEX_ATTR);
+ pthread_mutex_init(&spark_deques_lock, MR_MUTEX_ATTR);
pthread_mutex_init(&MR_next_cpu_lock, MR_MUTEX_ATTR);
#ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
pthread_mutex_init(&MR_par_cond_stats_lock, MR_MUTEX_ATTR);
@@ -320,7 +331,7 @@ MR_finalize_thread_stuff(void)
#endif
#ifdef MR_LL_PARALLEL_CONJ
- pthread_mutex_destroy(&MR_sync_term_lock);
+ pthread_mutex_destroy(&spark_deques_lock);
#endif
#if defined(MR_THREAD_SAFE) && defined(MR_PROFILE_PARALLEL_EXECUTION_SUPPORT)
@@ -587,6 +598,7 @@ MR_init_context_maybe_generator(MR_Conte
c->MR_ctxt_parent_sp = NULL;
MR_init_wsdeque(&c->MR_ctxt_spark_deque,
MR_INITIAL_LOCAL_SPARK_DEQUE_SIZE);
+ MR_add_spark_deque(&c->MR_ctxt_spark_deque);
#endif /* MR_LL_PARALLEL_CONJ */
#endif /* !MR_HIGHLEVEL_CODE */
@@ -715,12 +727,13 @@ MR_destroy_context(MR_Context *c)
c->MR_ctxt_nondetstack_zone->MR_zone_min);
#endif /* defined(MR_CONSERVATIVE_GC) && !defined(MR_HIGHLEVEL_CODE) */
- MR_LOCK(&free_context_list_lock, "destroy_context");
#ifdef MR_LL_PARALLEL_CONJ
MR_num_outstanding_contexts_and_global_sparks--;
MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
+ MR_delete_spark_deque(&c->MR_ctxt_spark_deque);
#endif
+ MR_LOCK(&free_context_list_lock, "destroy_context");
switch (c->MR_ctxt_size) {
case MR_CONTEXT_SIZE_REGULAR:
c->MR_ctxt_next = free_context_list;
@@ -744,12 +757,198 @@ MR_destroy_context(MR_Context *c)
MR_UNLOCK(&free_context_list_lock, "destroy_context");
}
+#ifdef MR_LL_PARALLEL_CONJ
+
+static void
+MR_add_spark_deque(MR_SparkDeque *sd)
+{
+ int slot;
+
+ MR_LOCK(&spark_deques_lock, "create_spark_deque");
+
+ for (slot = 0; slot < MR_max_spark_deques; slot++) {
+ if (MR_spark_deques[slot] == NULL) {
+ break;
+ }
+ }
+
+ if (slot == MR_max_spark_deques) {
+ if (MR_max_spark_deques == 0) {
+ MR_max_spark_deques = 1;
+ } else if (MR_max_spark_deques < 32) {
+ MR_max_spark_deques *= 2;
+ } else {
+ MR_max_spark_deques += 16;
+ }
+ MR_spark_deques = MR_GC_RESIZE_ARRAY(MR_spark_deques,
+ MR_SparkDeque *, MR_max_spark_deques);
+ }
+
+ MR_spark_deques[slot] = sd;
+
+ MR_UNLOCK(&spark_deques_lock, "create_spark_deque");
+}
+
+static void
+MR_delete_spark_deque(const MR_SparkDeque *sd)
+{
+ int i;
+
+ MR_LOCK(&spark_deques_lock, "delete_spark_deque");
+
+ for (i = 0; i < MR_max_spark_deques; i++) {
+ if (MR_spark_deques[i] == sd) {
+ MR_spark_deques[i] = NULL;
+ break;
+ }
+ }
+
+ MR_UNLOCK(&spark_deques_lock, "delete_spark_deque");
+}
+
+/* Search for a ready context which we can handle. */
+static MR_Context *
+MR_find_ready_context(MercuryThread thd, MR_Unsigned depth)
+{
+ MR_Context *cur;
+ MR_Context *prev;
+
+ cur = MR_runqueue_head;
+ /* XXX check pending io */
+ prev = NULL;
+ while (cur != NULL) {
+ if (cur->MR_ctxt_resume_owner_thread == thd &&
+ cur->MR_ctxt_resume_c_depth == depth)
+ {
+ cur->MR_ctxt_resume_owner_thread = (MercuryThread) NULL;
+ cur->MR_ctxt_resume_c_depth = 0;
+ break;
+ }
+
+ if (cur->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
+ break;
+ }
+
+ prev = cur;
+ cur = cur->MR_ctxt_next;
+ }
+
+ if (cur != NULL) {
+ if (prev != NULL) {
+ prev->MR_ctxt_next = cur->MR_ctxt_next;
+ } else {
+ MR_runqueue_head = cur->MR_ctxt_next;
+ }
+ if (MR_runqueue_tail == cur) {
+ MR_runqueue_tail = prev;
+ }
+ }
+
+ return cur;
+}
+
+static MR_bool
+MR_attempt_steal_spark(MR_Spark *spark)
+{
+ int max_attempts;
+ int attempt;
+ MR_SparkDeque *victim;
+ int steal_top;
+
+ /*
+ ** Protect against concurrent updates of MR_spark_deques and
+ ** MR_num_spark_deques. This allows only one thread to try to steal
+ ** work at any time, which may be a good thing as it limits the
+ ** amount of wasted effort.
+ */
+ MR_LOCK(&spark_deques_lock, "attempt_steal_spark");
+
+ if (MR_max_spark_deques < MR_worksteal_max_attempts) {
+ max_attempts = MR_max_spark_deques;
+ } else {
+ max_attempts = MR_worksteal_max_attempts;
+ }
+
+ for (attempt = 0; attempt < max_attempts; attempt++) {
+ MR_victim_counter++;
+ victim = MR_spark_deques[MR_victim_counter % MR_max_spark_deques];
+ if (victim != NULL) {
+ steal_top = MR_wsdeque_steal_top(victim, spark);
+ if (steal_top == 1) {
+ /* Steal successful. */
+ MR_UNLOCK(&spark_deques_lock, "attempt_steal_spark");
+ return MR_TRUE;
+ }
+ }
+ }
+
+ MR_UNLOCK(&spark_deques_lock, "attempt_steal_spark");
+
+ /* Steal unsuccessful. */
+ return MR_FALSE;
+}
+
+static void
+MR_milliseconds_from_now(struct timespec *timeout, unsigned int msecs)
+{
+#if defined(MR_HAVE_GETTIMEOFDAY)
+
+ const long NANOSEC_PER_SEC = 1000000000L;
+ struct timeval now;
+ MR_int_least64_t nanosecs;
+
+ gettimeofday(&now, NULL);
+ timeout->tv_sec = now.tv_sec;
+ nanosecs = ((MR_int_least64_t) (now.tv_usec + (msecs * 1000))) * 1000L;
+ if (nanosecs >= NANOSEC_PER_SEC) {
+ timeout->tv_sec++;
+ nanosecs %= NANOSEC_PER_SEC;
+ }
+ timeout->tv_nsec = (long) nanosecs;
+
+#elif defined(MR_WIN32)
+
+ const long NANOSEC_PER_SEC = 1000000000L;
+ const long NANOSEC_PER_MILLISEC = 1000000L;
+ struct _timeb now;
+ MR_int_least64_t nanosecs;
+
+ _ftime(&now);
+ timeout->tv_sec = now.time;
+ nanosecs = ((MR_int_least64_t) (msecs + now.millitm)) *
+ANOSEC_PER_MILLISEC;
+ if (nanosecs >= NANOSEC_PER_SEC) {
+ timeout->tv_sec++;
+ nanosecs %= NANOSEC_PER_SEC;
+ }
+ timeout->tv_nsec = (long) nanosecs;
+
+#else
+
+ #error Missing definition of MR_milliseconds_from_now.
+
+#endif
+}
+
+#endif /* MR_LL_PARALLEL_CONJ */
+
void
MR_flounder(void)
{
MR_fatal_error("computation floundered");
}
+void
+MR_sched_yield(void)
+{
+#if defined(MR_HAVE_SCHED_YIELD)
+ sched_yield();
+#elif defined(MR_CAN_DO_PENDING_IO)
+ struct timeval timeout = {0, 1};
+ select(0, NULL, NULL, NULL, &timeout);
+#endif
+}
+
/*
** Check to see if any contexts that blocked on IO have become
** runnable. Return the number of contexts that are still blocked.
@@ -883,20 +1082,6 @@ MR_schedule_context(MR_Context *ctxt)
MR_UNLOCK(&MR_runqueue_lock, "schedule_context");
}
-#ifdef MR_LL_PARALLEL_CONJ
-void
-MR_schedule_spark_globally(const MR_Spark *proto_spark)
-{
- MR_LOCK(&MR_runqueue_lock, "schedule_spark_globally");
- MR_wsdeque_push_bottom(&MR_spark_queue, proto_spark);
- MR_num_outstanding_contexts_and_global_sparks++;
- MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);
- MR_SIGNAL(&MR_runqueue_cond, "schedule_spark_globally");
- MR_UNLOCK(&MR_runqueue_lock, "schedule_spark_globally");
-}
-#endif /* !MR_LL_PARALLEL_CONJ */
-
-
#ifndef MR_HIGHLEVEL_CODE
MR_define_extern_entry(MR_do_runnext);
@@ -908,12 +1093,12 @@ MR_BEGIN_CODE
MR_define_entry(MR_do_runnext);
#ifdef MR_THREAD_SAFE
{
- MR_Context *tmp;
- MR_Context *prev;
+ MR_Context *ready_context;
+ MR_Code *resume_point;
MR_Spark spark;
- unsigned depth;
+ MR_Unsigned depth;
MercuryThread thd;
- int wait_result;
+ struct timespec timeout;
#ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
MR_Timer runnext_timer;
@@ -932,9 +1117,9 @@ MR_define_entry(MR_do_runnext);
depth = MR_ENGINE(MR_eng_c_depth);
thd = MR_ENGINE(MR_eng_owner_thread);
- MR_LOCK(&MR_runqueue_lock, "MR_do_runnext (i)");
+ MR_atomic_inc_int(&MR_num_idle_engines);
- MR_num_idle_engines++;
+ MR_LOCK(&MR_runqueue_lock, "MR_do_runnext (i)");
while (1) {
@@ -953,37 +1138,22 @@ MR_define_entry(MR_do_runnext);
MR_destroy_thread(MR_cur_engine());
MR_num_exited_engines++;
MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (ii)");
+ MR_atomic_dec_int(&MR_num_idle_engines);
pthread_exit(0);
}
- /* Search for a ready context which we can handle. */
- tmp = MR_runqueue_head;
- /* XXX check pending io */
- prev = NULL;
- while (tmp != NULL) {
- if (tmp->MR_ctxt_resume_owner_thread == thd &&
- tmp->MR_ctxt_resume_c_depth == depth)
- {
- tmp->MR_ctxt_resume_owner_thread = (MercuryThread) NULL;
- tmp->MR_ctxt_resume_c_depth = 0;
- MR_num_idle_engines--;
- goto ReadyContext;
- }
-
- if (tmp->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
- MR_num_idle_engines--;
+ ready_context = MR_find_ready_context(thd, depth);
+ if (ready_context != NULL) {
+ MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
+ MR_atomic_dec_int(&MR_num_idle_engines);
goto ReadyContext;
}
-
- prev = tmp;
- tmp = tmp->MR_ctxt_next;
- }
-
- /* Check if the global spark queue is nonempty. */
- if (MR_wsdeque_take_top(&MR_spark_queue, &spark)) {
- MR_num_idle_engines--;
- MR_num_outstanding_contexts_and_global_sparks--;
- MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
+ /*
+ ** No suitable ready contexts, so try to steal a spark instead.
+ */
+ if (MR_attempt_steal_spark(&spark)) {
+ MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iv)");
+ MR_atomic_dec_int(&MR_num_idle_engines);
goto ReadySpark;
}
@@ -994,24 +1164,16 @@ MR_define_entry(MR_do_runnext);
&MR_profile_parallel_executed_nothing);
}
#endif
- do {
- wait_result =
- MR_WAIT(&MR_runqueue_cond, &MR_runqueue_lock, "do_runnext");
- } while (wait_result != 0);
+
+ MR_milliseconds_from_now(&timeout, MR_worksteal_sleep_msecs);
+ MR_TIMED_WAIT(&MR_runqueue_cond, &MR_runqueue_lock, &timeout,
+ "do_runnext");
}
+ /* unreachable */
+ abort();
ReadyContext:
- if (prev != NULL) {
- prev->MR_ctxt_next = tmp->MR_ctxt_next;
- } else {
- MR_runqueue_head = tmp->MR_ctxt_next;
- }
- if (MR_runqueue_tail == tmp) {
- MR_runqueue_tail = prev;
- }
- MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
-
/* Discard whatever unused context we may have and switch to tmp. */
if (MR_ENGINE(MR_eng_this_context) != NULL) {
MR_destroy_context(MR_ENGINE(MR_eng_this_context));
@@ -1022,13 +1184,40 @@ MR_define_entry(MR_do_runnext);
&MR_profile_parallel_executed_contexts);
}
#endif
- MR_ENGINE(MR_eng_this_context) = tmp;
- MR_load_context(tmp);
- MR_GOTO(tmp->MR_ctxt_resume);
+ MR_ENGINE(MR_eng_this_context) = ready_context;
+ MR_load_context(ready_context);
+
+ resume_point = (MR_Code*)(ready_context->MR_ctxt_resume);
+ ready_context->MR_ctxt_resume = NULL;
+ MR_GOTO(resume_point);
ReadySpark:
- MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (iii)");
+#if 0 /* This is a complicated optimisation that may not be worth-while */
+ if (!spark.MR_spark_sync_term->MR_st_is_shared) {
+ spark.MR_spark_sync_term_is_shared = MR_TRUE;
+ /*
+ ** If we allow the stolen spark (New) to execute immediately
+ ** there could be a race with a sibling conjunct (Old) which is
+ ** currently executing, e.g.
+ **
+ ** 1. Old enters MR_join_and_continue(), loads old value of
+ ** MR_st_count;
+ ** 2. New begins executing;
+ ** 3. New enters MR_join_and_continue(), decrements MR_st_count
+ ** atomically;
+ ** 4. Old decrements MR_st_count *non-atomically* based on the
+ ** old value of MR_st_count.
+ **
+ ** Therefore this loop delays the new spark from executing
+ ** while there is another conjunct in MR_join_and_continue()
+ ** which might decrement MR_st_count non-atomically.
+ */
+ while (spark.MR_spark_sync_term->MR_st_attempt_cheap_join) {
+ MR_sched_yield();
+ }
+ }
+#endif
/* Grab a new context if we haven't got one then begin execution. */
if (MR_ENGINE(MR_eng_this_context) == NULL) {
@@ -1047,9 +1236,13 @@ MR_define_entry(MR_do_runnext);
MR_threadscope_post_run_context();
#endif
}
- MR_parent_sp = spark.MR_spark_parent_sp;
- MR_assert(MR_parent_sp != MR_sp);
+ MR_parent_sp = spark.MR_spark_sync_term->MR_st_parent_sp;
MR_SET_THREAD_LOCAL_MUTABLES(spark.MR_spark_thread_local_mutables);
+
+ MR_assert(MR_parent_sp);
+ MR_assert(MR_parent_sp != MR_sp);
+ MR_assert(spark.MR_spark_sync_term->MR_st_count > 0);
+
#ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
if (MR_profile_parallel_execution) {
MR_profiling_stop_timer(&runnext_timer,
@@ -1077,7 +1270,7 @@ MR_define_entry(MR_do_runnext);
MR_load_context(MR_ENGINE(MR_eng_this_context));
MR_GOTO(MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume);
}
-#endif
+#endif /* !MR_THREAD_SAFE */
MR_END_MODULE
@@ -1087,135 +1280,88 @@ MR_END_MODULE
MR_Code*
MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
{
+ MR_bool jnc_last;
+ MR_Context *this_context = MR_ENGINE(MR_eng_this_context);
+
+ /*
+ * Atomically decrement and fetch the number of conjuncts yet to complete.
+ * If we're the last conjunct to complete (the parallel conjunction is
+ * finished) then jnc_last will be true.
+ */
/*
* XXX: We should take the current TSC time here and use it to post the
* various 'context stopped' threadscope events. This profile will be more
* accurate.
*/
- if (!jnc_st->MR_st_is_shared) {
- /* This parallel conjunction has only executed sequentially. */
- if (--jnc_st->MR_st_count == 0) {
- /*
- ** It has finished executing, continue execution from the join
- ** label.
- */
- return join_label;
- } else {
- /*
- ** It has not finished executing. Try to finish it by executing
- ** our sparks.
- ** This code was formerly MR_join_and_continue_1()
- */
- MR_Context *jnc_ctxt;
- MR_bool jnc_popped;
- MR_Spark jnc_spark;
-
- jnc_ctxt = MR_ENGINE(MR_eng_this_context);
- jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
- &jnc_spark);
- if (jnc_popped) {
- MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
- return jnc_spark.MR_spark_resume;
- } else {
- /*
- ** Someone's stolen our sparks, we should try to execute
- ** something that's been scheduled globally. XXX: We don't yet
- ** have work stealing so how can this happen!?
- */
- fprintf(stderr, "My sparks have been stolen!! %lp", pthread_self());
- return MR_ENTRY(MR_do_runnext);
- }
- }
- } else {
- /* This parallel conjunction may be executing in parallel. */
- MR_LOCK(&MR_sync_term_lock, "continue");
- if (--jnc_st->MR_st_count == 0) {
- /* This parallel conjunction has finished. */
- if (MR_ENGINE(MR_eng_this_context) == jnc_st->MR_st_orig_context) {
+
+ jnc_last = MR_atomic_dec_int_and_is_zero(&(jnc_st->MR_st_count));
+
+ if (jnc_last) {
+ if (this_context == jnc_st->MR_st_orig_context) {
/*
- ** This context originated this parallel conjunction and all
- ** the branches have finished so jump to the join label.
+ ** It has finished executing and we're the originating context. Jump
+ ** to the join label.
*/
- MR_UNLOCK(&MR_sync_term_lock, "continue i");
return join_label;
} else {
- /*
- ** This context didn't originate this parallel conjunction and
- ** we're the last branch to finish. The originating context
- ** should be suspended waiting for us to finish, so wake it up.
- */
- jnc_st->MR_st_orig_context->MR_ctxt_resume = join_label;
- MR_schedule_context(jnc_st->MR_st_orig_context);
- MR_UNLOCK(&MR_sync_term_lock, "continue ii");
#ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
#endif
+ /*
+ ** This context didn't originate this parallel conjunction and
+ ** we're the last branch to finish. The originating context should
+ ** be suspended waiting for us to finish, so wake it up.
+ **
+ ** We could be racing with the original context, in which case we
+ ** have to make sure that it is ready to be scheduled before we
+ ** schedule it. It will set its resume point to join_label to
+ ** indicate that it is ready.
+ */
+ while (jnc_st->MR_st_orig_context->MR_ctxt_resume != join_label) {
+ /* XXX: Need to configure using sched_yeild or spin waiting */
+ MR_ATOMIC_PAUSE;
+ }
+ MR_schedule_context(jnc_st->MR_st_orig_context);
return MR_ENTRY(MR_do_runnext);
}
} else {
+ MR_bool popped;
+ MR_Code *spark_resume;
+
/*
- ** The parallel conjunction is being executed in parallel but it is
- ** not yet finished. This code was Formerly
- ** MR_join_and_continue_2()
- */
- MR_Context *jnc_ctxt;
- MR_bool jnc_popped;
- MR_Spark jnc_spark;
-
- jnc_ctxt = MR_ENGINE(MR_eng_this_context);
- jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
- &jnc_spark);
- if (jnc_popped && (jnc_spark.MR_spark_parent_sp == MR_parent_sp)) {
- /*
- ** The spark at the top of the stack is from to the same parallel
- ** conjunction that we've just been executing. We can immediately
- ** execute the next branch of the same parallel conjunction in
- ** the current context.
+ * The parallel conjunction it is not yet finished. Try to work on a
+ * spark from our local stack. The sparks on our stack are likely to
+ * cause this conjunction to be complete.
*/
- MR_UNLOCK(&MR_sync_term_lock, "continue_2 i");
+ popped = MR_wsdeque_pop_bottom(&this_context->MR_ctxt_spark_deque, &spark_resume);
+ if (popped) {
MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
- return jnc_spark.MR_spark_resume;
+ return spark_resume;
} else {
/*
- ** The spark stack is empty or the next spark is from a different
- ** parallel conjunction to the one we've been executing. Either
- ** way, there's nothing more we can do with this context right
- ** now. Put back the spark we won't be using.
- */
- if (jnc_popped) {
- MR_wsdeque_putback_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
- &jnc_spark);
- }
- /*
** If this context originated the parallel conjunction we've been
- ** executing, the rest of the parallel conjunction must have been
- ** put on the global spark queue to be executed in other
- ** contexts. This context will need to be resumed once the
- ** parallel conjunction is completed, so suspend the context.
+ ** executing, suspend this context such that it will be resumed
+ ** at the join label once the parallel conjunction is completed.
**
- ** What if the other conjuncts where put on the global queue
- ** but haven't yet been taken by other threads? Then this step
- ** is redundant. It's not worth fixing, this problem will go
- ** away once we enable work-stealing. - pbone.
+ ** Otherwise we can reuse this context for the next piece of work.
*/
- if (jnc_ctxt == jnc_st->MR_st_orig_context) {
+ if (this_context == jnc_st->MR_st_orig_context) {
#ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
#endif
- MR_save_context(jnc_ctxt);
+ MR_save_context(this_context);
+ /* XXX: Make sure the context gets saved before we set the join
+ * label, use a memory barrier.*/
+ this_context->MR_ctxt_resume = (join_label);
MR_ENGINE(MR_eng_this_context) = NULL;
} else {
#ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
#endif
}
-
- /* Finally look for other work. */
- MR_UNLOCK(&MR_sync_term_lock, "continue_2 ii");
return MR_ENTRY(MR_do_runnext);
}
}
- }
}
#endif
Index: runtime/mercury_context.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.h,v
retrieving revision 1.57
diff -u -p -b -r1.57 mercury_context.h
--- runtime/mercury_context.h 3 Dec 2009 05:28:00 -0000 1.57
+++ runtime/mercury_context.h 8 Dec 2009 23:12:22 -0000
@@ -202,22 +202,6 @@
** (Accessed via MR_eng_this_context.)
*/
-/*
-** A spark contains just enough information to begin execution of a parallel
-** conjunct. Sparks exist on a context's local spark deque or in the global
-** spark queue. In the former case, a spark will eventually be executed in the
-** same context (same detstack, etc.) as the code that generated the spark. In
-** the latter case the spark can be picked up and executed by any idle engine
-** in a different context.
-**
-** In the current implementation a spark is put on the global spark queue if,
-** at the time a fork instruction is reached, we think the spark has a chance
-** of being picked up for execution by an idle engine. Otherwise the spark
-** goes on the context's spark stack. At the moment this is an irrevocable
-** decision. A future possibility is to allow idle engines to steal work
-** from the cold end of some context's spark deque.
-*/
-
typedef struct MR_Context_Struct MR_Context;
typedef enum {
@@ -237,13 +221,21 @@ struct MR_SavedOwner_Struct {
#ifdef MR_LL_PARALLEL_CONJ
+typedef struct MR_SyncTerm_Struct MR_SyncTerm;
typedef struct MR_Spark_Struct MR_Spark;
typedef struct MR_SparkDeque_Struct MR_SparkDeque;
typedef struct MR_SparkArray_Struct MR_SparkArray;
+/*
+** A spark contains just enough information to begin execution of a parallel
+** conjunct. A spark will either be executed in the same context (same
+** detstack, etc.) as the code that generated the spark, or it may be stolen
+** from its deque and executed by any idle engine in a different context.
+*/
+
struct MR_Spark_Struct {
+ MR_SyncTerm *MR_spark_sync_term;
MR_Code *MR_spark_resume;
- MR_Word *MR_spark_parent_sp;
MR_ThreadLocalMuts *MR_spark_thread_local_mutables;
};
@@ -261,7 +253,14 @@ struct MR_Context_Struct {
#endif
MR_ContextSize MR_ctxt_size;
MR_Context *MR_ctxt_next;
+#ifdef MR_LL_PARALLEL_CONJ
+ /*
+ ** The value of this field is used for synchronization.
+ */
+ MR_Code * volatile MR_ctxt_resume;
+#else
MR_Code *MR_ctxt_resume;
+#endif
#ifdef MR_THREAD_SAFE
MercuryThread MR_ctxt_resume_owner_thread;
MR_Unsigned MR_ctxt_resume_c_depth;
@@ -332,10 +331,6 @@ struct MR_Context_Struct {
/*
** The runqueue is a linked list of contexts that are runnable.
-** The spark_queue is an array of sparks that are runnable.
-** We keep them separate to prioritise contexts (which are mainly
-** computations which have already started) over sparks (which are
-** computations which have not begun).
*/
extern MR_Context *MR_runqueue_head;
@@ -345,8 +340,6 @@ extern MR_Context *MR_runqueue_tai
extern MercuryCond MR_runqueue_cond;
#endif
#ifdef MR_LL_PARALLEL_CONJ
- extern MR_SparkDeque MR_spark_queue;
- extern MercuryLock MR_sync_term_lock;
extern MR_bool MR_thread_pinning;
#endif
@@ -400,7 +393,7 @@ extern MR_PendingContext *MR_pending_
** XXX We may need to use atomic instructions or memory fences on some
** architectures.
*/
- extern volatile int MR_num_idle_engines;
+ extern volatile MR_Integer MR_num_idle_engines;
/*
** The number of contexts that are not in the free list (i.e. are executing
@@ -489,17 +482,15 @@ extern void MR_finalize_thread_s
extern void MR_flounder(void);
/*
+** Relinquish the processor voluntarily without blocking.
+*/
+extern void MR_sched_yield(void);
+
+/*
** Append the given context onto the end of the run queue.
*/
extern void MR_schedule_context(MR_Context *ctxt);
-#ifdef MR_LL_PARALLEL_CONJ
- /*
- ** Append the given spark onto the end of the spark queue.
- */
- extern void MR_schedule_spark_globally(const MR_Spark *spark);
-#endif /* !MR_LL_PARALLEL_CONJ */
-
#ifndef MR_HIGHLEVEL_CODE
MR_declare_entry(MR_do_runnext);
#define MR_runnext() \
@@ -733,7 +724,7 @@ extern void MR_schedule_context(
/* it wouldn't be appropriate to copy the resume field */ \
to_cptr->MR_ctxt_thread_local_mutables = \
from_cptr->MR_ctxt_thread_local_mutables; \
- /* it wouldn't be appropriate to copy the spark_queue field */ \
+ /* it wouldn't be appropriate to copy the spark_deque field */ \
/* it wouldn't be appropriate to copy the saved_owners field */ \
} while (0)
@@ -744,19 +735,14 @@ extern void MR_schedule_context(
/*
** If you change MR_SyncTerm_Struct you need to update configure.in.
**
- ** MR_st_count is `int' so that on a 64-bit machine the total size of the
- ** sync term is two words, not three words (assuming `int' is 32 bits).
- **
- ** XXX we should remove that assumption but it's a little tricky because
- ** configure needs to understand the types as well
+ ** MR_st_count is manipulated via atomic operations, therefore it is declared
+ ** as volatile and an MR_Integer.
*/
- typedef struct MR_SyncTerm_Struct MR_SyncTerm;
-
struct MR_SyncTerm_Struct {
MR_Context *MR_st_orig_context;
- volatile int MR_st_count;
- volatile int MR_st_is_shared;
+ MR_Word *MR_st_parent_sp;
+ volatile MR_Integer MR_st_count;
};
#define MR_init_sync_term(sync_term, nbranches) \
@@ -764,43 +750,31 @@ extern void MR_schedule_context(
MR_SyncTerm *init_st = (MR_SyncTerm *) &(sync_term); \
\
init_st->MR_st_orig_context = MR_ENGINE(MR_eng_this_context); \
+ init_st->MR_st_parent_sp = MR_parent_sp; \
init_st->MR_st_count = (nbranches); \
- init_st->MR_st_is_shared = MR_FALSE; \
} while (0)
/*
** fork_new_child(MR_SyncTerm st, MR_Code *child):
**
** Create a new spark to execute the code at `child'. The new spark is put
- ** on the global spark queue or the context-local spark deque. The current
- ** context resumes at `parent'. MR_parent_sp must already be set
- ** appropriately before this instruction is executed.
- **
- ** If the spark ends up on the global spark queue then we set
- ** `MR_st_is_shared' to true as branches of this parallel conjunction could
- ** be executed in parallel.
+ ** on the context's spark queue. The current context resumes at `parent'.
+ ** MR_parent_sp must already be set appropriately before this instruction
+ ** is executed.
*/
#define MR_fork_new_child(sync_term, child) \
do { \
MR_Spark fnc_spark; \
+ MR_SparkDeque *fnc_deque; \
\
+ fnc_spark.MR_spark_sync_term = (MR_SyncTerm*) &(sync_term); \
fnc_spark.MR_spark_resume = (child); \
- fnc_spark.MR_spark_parent_sp = MR_parent_sp; \
fnc_spark.MR_spark_thread_local_mutables = MR_THREAD_LOCAL_MUTABLES; \
- if (MR_fork_globally_criteria) { \
- MR_SyncTerm *fnc_st = (MR_SyncTerm *) &(sync_term); \
- fnc_st->MR_st_is_shared = MR_TRUE; \
- MR_schedule_spark_globally(&fnc_spark); \
- } else { \
- MR_schedule_spark_locally(&fnc_spark); \
- } \
+ fnc_deque = &MR_ENGINE(MR_eng_this_context)->MR_ctxt_spark_deque; \
+ MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks); \
+ MR_wsdeque_push_bottom(fnc_deque, &fnc_spark); \
} while (0)
- #define MR_fork_globally_criteria \
- (MR_num_idle_engines != 0 && \
- MR_num_outstanding_contexts_and_global_sparks < \
- MR_max_outstanding_contexts)
-
/*
** These macros may be used as conditions for runtime parallelism decisions.
** They return nonzero when parallelism is recommended (because there are
@@ -812,19 +786,6 @@ extern void MR_schedule_context(
#define MR_par_cond_contexts_and_all_sparks_vs_num_cpus(target_cpus) \
(MR_num_outstanding_contexts_and_all_sparks < target_cpus)
- #define MR_schedule_spark_locally(spark) \
- do { \
- MR_Context *ssl_ctxt; \
- \
- /* \
- ** Only the engine running the context is allowed to access \
- ** the context's spark stack, so no locking is required here. \
- */ \
- ssl_ctxt = MR_ENGINE(MR_eng_this_context); \
- MR_wsdeque_push_bottom(&ssl_ctxt->MR_ctxt_spark_deque, (spark)); \
- MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks); \
- } while (0)
-
extern MR_Code*
MR_do_join_and_continue(MR_SyncTerm *sync_term, MR_Code *join_label);
Index: runtime/mercury_thread.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.c,v
retrieving revision 1.37
diff -u -p -b -r1.37 mercury_thread.c
--- runtime/mercury_thread.c 3 Dec 2009 05:28:00 -0000 1.37
+++ runtime/mercury_thread.c 8 Dec 2009 23:40:49 -0000
@@ -30,7 +30,7 @@
MercuryLock MR_global_lock;
#endif
-MR_bool MR_exit_now;
+volatile MR_bool MR_exit_now;
MR_bool MR_debug_threads = MR_FALSE;
MR_Unsigned MR_num_thread_local_mutables = 0;
@@ -212,6 +212,10 @@ MR_destroy_thread(void *eng0)
#endif
#if defined(MR_THREAD_SAFE)
+/*
+** XXX: maybe shese should only be conditionally compiled when MR_DEBUG_THREADS
+** is also set. - pbone
+*/
int
MR_mutex_lock(MercuryLock *lock, const char *from)
@@ -273,6 +277,19 @@ MR_cond_wait(MercuryCond *cond, MercuryL
return err;
}
+int
+MR_cond_timed_wait(MercuryCond *cond, MercuryLock *lock,
+ const struct timespec *abstime, const char *from)
+{
+ int err;
+
+ fprintf(stderr, "%ld timed-waiting on cond: %p lock: %p (%s)\n",
+ (long)pthread_self(), cond, lock, from);
+ err = pthread_cond_timedwait(cond, lock, abstime);
+ fprintf(stderr, "%ld timed-wait returned %d\n", err);
+ return err;
+}
+
#endif /* MR_THREAD_SAFE */
MR_Unsigned
Index: runtime/mercury_thread.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.h,v
retrieving revision 1.28
diff -u -p -b -r1.28 mercury_thread.h
--- runtime/mercury_thread.h 27 Nov 2009 03:51:20 -0000 1.28
+++ runtime/mercury_thread.h 6 Dec 2009 07:50:21 -0000
@@ -41,6 +41,9 @@ extern int
MR_cond_broadcast(MercuryCond *cond, const char *from);
extern int
MR_cond_wait(MercuryCond *cond, MercuryLock *lock, const char *from);
+int
+MR_cond_timed_wait(MercuryCond *cond, MercuryLock *lock,
+ const struct timespec *abstime, const char *from);
extern MR_bool MR_debug_threads;
@@ -56,6 +59,8 @@ MR_cond_wait(MercuryCond *cond, MercuryL
#define MR_SIGNAL(cnd, from) pthread_cond_signal((cnd))
#define MR_BROADCAST(cnd, from) pthread_cond_broadcast((cnd))
#define MR_WAIT(cnd, mtx, from) pthread_cond_wait((cnd), (mtx))
+ #define MR_TIMED_WAIT(cond, mtx, abstime, from) \
+ pthread_cond_timedwait((cond), (mtx), (abstime))
#else
#define MR_LOCK(lck, from) \
( MR_debug_threads ? \
@@ -88,6 +93,12 @@ MR_cond_wait(MercuryCond *cond, MercuryL
: \
pthread_cond_wait((cnd), (mtx)) \
)
+ #define MR_TIMED_WAIT(cond, mtx, abstime, from) \
+ ( MR_debug_threads ? \
+ MR_cond_timed_wait((cond), (mtx), (abstime), (from)) \
+ : \
+ pthread_cond_timedwait((cond), (mtx), (abstime)) \
+ )
#endif
/*
@@ -127,7 +138,7 @@ MR_cond_wait(MercuryCond *cond, MercuryL
extern MercuryThread *MR_create_thread(MR_ThreadGoal *);
extern void MR_destroy_thread(void *eng);
- extern MR_bool MR_exit_now;
+ extern volatile MR_bool MR_exit_now;
/*
** The primordial thread. Currently used for debugging.
Index: runtime/mercury_wrapper.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.c,v
retrieving revision 1.203
diff -u -p -b -r1.203 mercury_wrapper.c
--- runtime/mercury_wrapper.c 3 Dec 2009 05:28:00 -0000 1.203
+++ runtime/mercury_wrapper.c 6 Dec 2009 23:12:52 -0000
@@ -200,6 +200,17 @@ size_t MR_pcache_size = 8192;
MR_Unsigned MR_max_contexts_per_thread = 2;
MR_Unsigned MR_max_outstanding_contexts;
+#ifdef MR_LL_PARALLEL_CONJ
+/*
+** In grades that support parallel conjunctions, an idle engine can steal
+** parallel work from Mercury contexts. The following variables control the
+** maximum number of contexts that an idle engine will try to steal from
+** before resting, and how long to rest before attempting another steal.
+*/
+MR_Unsigned MR_worksteal_max_attempts = 24;
+MR_Unsigned MR_worksteal_sleep_msecs = 2;
+#endif
+
/* file names for mdb's debugger I/O streams */
const char *MR_mdb_in_filename = NULL;
const char *MR_mdb_out_filename = NULL;
@@ -1240,6 +1251,8 @@ enum MR_long_option {
MR_GEN_NONDETSTACK_REDZONE_SIZE,
MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS,
MR_MAX_CONTEXTS_PER_THREAD,
+ MR_WORKSTEAL_MAX_ATTEMPTS,
+ MR_WORKSTEAL_SLEEP_MSECS,
MR_THREAD_PINNING,
MR_PROFILE_PARALLEL_EXECUTION,
MR_MDB_TTY,
@@ -1340,6 +1353,8 @@ struct MR_option MR_long_opts[] = {
{ "gen-nondetstack-zone-size-kwords",
1, 0, MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS },
{ "max-contexts-per-thread", 1, 0, MR_MAX_CONTEXTS_PER_THREAD },
+ { "worksteal-max-attempts", 1, 0, MR_WORKSTEAL_MAX_ATTEMPTS },
+ { "worksteal-max-attempts", 1, 0, MR_WORKSTEAL_SLEEP_MSECS },
{ "thread-pinning", 0, 0, MR_THREAD_PINNING },
{ "profile-parallel-execution", 0, 0, MR_PROFILE_PARALLEL_EXECUTION },
{ "mdb-tty", 1, 0, MR_MDB_TTY },
@@ -1756,6 +1771,22 @@ MR_process_options(int argc, char **argv
MR_max_contexts_per_thread = size;
break;
+ case MR_WORKSTEAL_MAX_ATTEMPTS:
+#ifdef MR_LL_PARALLEL_CONJ
+ if (sscanf(MR_optarg, "%lu", &MR_worksteal_max_attempts) != 1) {
+ MR_usage();
+ }
+#endif
+ break;
+
+ case MR_WORKSTEAL_SLEEP_MSECS:
+#ifdef MR_LL_PARALLEL_CONJ
+ if (sscanf(MR_optarg, "%lu", &MR_worksteal_sleep_msecs) != 1) {
+ MR_usage();
+ }
+#endif
+ break;
+
case MR_THREAD_PINNING:
#if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
MR_thread_pinning = MR_TRUE;
@@ -2826,9 +2857,6 @@ MR_define_label(global_fail);
MR_define_label(all_done);
assert(MR_runqueue_head == NULL);
-#ifdef MR_LL_PARALLEL_CONJ
- assert(MR_wsdeque_is_empty(&MR_spark_queue));
-#endif
#ifdef MR_MPROF_PROFILE_TIME
if (MR_profiling) {
Index: runtime/mercury_wrapper.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.h,v
retrieving revision 1.82
diff -u -p -b -r1.82 mercury_wrapper.h
--- runtime/mercury_wrapper.h 17 Jun 2009 03:26:00 -0000 1.82
+++ runtime/mercury_wrapper.h 5 Dec 2009 06:37:30 -0000
@@ -256,6 +256,10 @@ extern MR_Unsigned MR_contexts_per_threa
*/
extern MR_Unsigned MR_max_outstanding_contexts;
+/* work-stealing tunables (documented in mercury_wrapper.c) */
+extern MR_Unsigned MR_worksteal_max_attempts;
+extern MR_Unsigned MR_worksteal_sleep_msecs;
+
extern MR_Unsigned MR_num_threads;
/* file names for the mdb debugging streams */
Index: runtime/mercury_wsdeque.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wsdeque.h,v
retrieving revision 1.1
diff -u -p -b -r1.1 mercury_wsdeque.h
--- runtime/mercury_wsdeque.h 11 Oct 2007 11:45:22 -0000 1.1
+++ runtime/mercury_wsdeque.h 5 Dec 2009 06:19:59 -0000
@@ -71,8 +71,8 @@ extern void MR_wsdeque_putback_botto
** Pop a spark off the bottom of the deque. Must only be called by
** the owner of the deque. Returns true if successful.
*/
-MR_INLINE
-MR_bool MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Spark *ret_spark);
+MR_INLINE MR_bool
+MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Code **ret_spark_resume);
/*
** Attempt to steal a spark from the top of the deque.
@@ -122,7 +122,7 @@ MR_wsdeque_push_bottom(MR_SparkDeque *dq
}
MR_INLINE MR_bool
-MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Spark *ret_spark)
+MR_wsdeque_pop_bottom(MR_SparkDeque *dq, MR_Code **ret_spark_resume)
{
MR_Integer bot;
MR_Integer top;
@@ -143,7 +143,7 @@ MR_wsdeque_pop_bottom(MR_SparkDeque *dq,
return MR_FALSE;
}
- *ret_spark = MR_sa_element(arr, bot);
+ (*ret_spark_resume) = MR_sa_element(arr, bot).MR_spark_resume;
if (size > 0) {
return MR_TRUE;
}
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 489 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20091209/2c073259/attachment.sig>
More information about the reviews
mailing list