[m-rev.] diff: Various minor improvments for the low-level C parallel runtime system.
Paul Bone
pbone at csse.unimelb.edu.au
Mon Aug 6 12:21:31 AEST 2012
Various minor improvments for the low-level C parallel runtime system.
These changes fix a couple of performance bugs and also modify the
algorithms in the RTS so that they match the ones in my thesis.
runtime/mercury_context.[ch]:
An engine with a dirty context will no-longer check for a runnable
context first. It first checks for a local spark, if the spark is not
compatible it puts the spark back on the local spark stack. Then it
saves the dirty context and jumps to MR_idle - the entry point for
engines with no contexts.
Remove the MR_MAYBE_TRAMPOLINE macro and expand out any case where it
was previously used.
We no longer execute a spark when an engine has a dirty incompatible
context. (previously we saved the old context then allocated a new
one). Therefore prepare_engine_for_spark() no-longer needs the
join_label parameter (which was used when saving a dirty context).
Consequently, the same parameter has been removed from
MR_do_steal_spark.
If a work stealing thief looses a race then it retries until it wins or
there is no work.
Use a mutex rather than a (binary) semaphore to protect the engine sleep
sync structure. This more directly reflects the intentions plus POSIX
mutexes don't always make kernel calls but semaphores do.
The MR_num_idle_engines global was not being updated correctly, in
particular it showed that there were idle engines even when there
weren't. This caused an engine creating a spark to always attempt to
notify other engines of the spark. Fixing the use of
MR_num_idle_engines improves performance by over a factor of 2x in the
naive fibs micro benchmark.
Refactor MR_join_and_continue to match the simplier structure in my
thesis.
Rename MR_destroy_context to MR_release_context, which gives a more
accurate impression.
Update some MR_assert calls that where incorrect.
runtime/mercury_engine.c:
runtime/mercury_par_builtin.c:
Conform to MR_release_context.
library/thread.m:
Conform to MR_release_context.
Add a missing MR_save_context.
diff --git a/library/thread.m b/library/thread.m
index dca359a..30ff338 100644
--- a/library/thread.m
+++ b/library/thread.m
@@ -271,7 +271,8 @@ INIT mercury_sys_init_thread_modules
}
MR_UNLOCK(&MR_thread_barrier_lock, ""thread__spawn_end_thread"");
- MR_destroy_context(MR_ENGINE(MR_eng_this_context));
+ MR_save_context(MR_ENGINE(MR_eng_this_context));
+ MR_release_context(MR_ENGINE(MR_eng_this_context));
MR_ENGINE(MR_eng_this_context) = NULL;
MR_idle();
}
diff --git a/runtime/mercury_context.c b/runtime/mercury_context.c
index 21c7130..f9971fd 100644
--- a/runtime/mercury_context.c
+++ b/runtime/mercury_context.c
@@ -99,7 +99,7 @@ MR_milliseconds_from_now(struct timespec *timeout, unsigned int msecs);
struct engine_sleep_sync_i {
sem_t es_sleep_semaphore;
- sem_t es_wake_semaphore;
+ MercuryLock es_wake_lock;
volatile unsigned es_state;
volatile unsigned es_action;
union MR_engine_wake_action_data es_action_data;
@@ -319,7 +319,8 @@ MR_init_context_stuff(void)
MR_spark_deques[i] = NULL;
sem_init(&(engine_sleep_sync_data[i].d.es_sleep_semaphore), 0, 0);
- sem_init(&(engine_sleep_sync_data[i].d.es_wake_semaphore), 0, 1);
+ pthread_mutex_init(&(engine_sleep_sync_data[i].d.es_wake_lock),
+ MR_MUTEX_ATTR);
/*
** All engines are initially working (because telling them to wake up
** before they are started would be useless).
@@ -1100,7 +1101,7 @@ MR_create_context(const char *id, MR_ContextSize ctxt_size, MR_Generator *gen)
** TODO: We should gc the cached contexts, or otherwise not cache too many.
*/
void
-MR_destroy_context(MR_Context *c)
+MR_release_context(MR_Context *c)
{
MR_assert(c);
@@ -1285,16 +1286,23 @@ MR_attempt_steal_spark(MR_Spark *spark)
int i;
int offset;
MR_SparkDeque *victim;
- int result = MR_FALSE;
+ int steal_result;
+ MR_bool result = MR_FALSE;
offset = MR_ENGINE(MR_eng_victim_counter);
for (i = 0; i < MR_num_threads; i++) {
victim = MR_spark_deques[(i + offset) % MR_num_threads];
if (victim != NULL) {
- result = (MR_wsdeque_steal_top(victim, spark)) == 1;
- if (result) {
+ steal_result = MR_wsdeque_steal_top(victim, spark);
+ while (steal_result == -1) {
+ /* This is a while loop so that we can pause here */
+ MR_ATOMIC_PAUSE;
+ steal_result = MR_wsdeque_steal_top(victim, spark);
+ }
+ if (steal_result == 1) {
/* Steal successful. */
+ result = MR_TRUE;
break;
}
}
@@ -1590,9 +1598,10 @@ try_wake_engine(MR_EngineId engine_id, int action,
** This engine is probably in the state our caller checked that it was in.
** Wait on the semaphore then re-check the state to be sure.
*/
- MR_SEM_WAIT(&(esync->d.es_wake_semaphore), "try_wake_engine, wake_sem");
- MR_CPU_LFENCE;
+ MR_LOCK(&(esync->d.es_wake_lock), "try_wake_engine, wake_lock");
if (esync->d.es_state & states) {
+ MR_atomic_dec_int(&MR_num_idle_engines);
+
/*
** We now KNOW that the engine is in one of the correct states.
**
@@ -1609,7 +1618,7 @@ try_wake_engine(MR_EngineId engine_id, int action,
"try_wake_engine sleep_sem");
success = MR_TRUE;
}
- MR_SEM_POST(&(esync->d.es_wake_semaphore), "try_wake_engine wake_sem");
+ MR_UNLOCK(&(esync->d.es_wake_lock), "try_wake_engine wake_lock");
return success;
}
@@ -1650,22 +1659,6 @@ MR_shutdown_all_engines(void)
** added in the future.
*/
-/*
-** If the call returns a non-null code pointer then jump to that address,
-** otherwise fall-through.
-*/
-#define MR_MAYBE_TRAMPOLINE_AND_ACTION(call, action) \
- do { \
- MR_Code *tramp; \
- tramp = (call); \
- if (tramp) { \
- action; \
- MR_GOTO(tramp); \
- } \
- } while (0)
-#define MR_MAYBE_TRAMPOLINE(call) \
- MR_MAYBE_TRAMPOLINE_AND_ACTION((call), )
-
MR_define_extern_entry(MR_do_idle);
#ifdef MR_THREAD_SAFE
@@ -1680,18 +1673,18 @@ static MR_Code*
do_local_spark(MR_Code *join_label);
static MR_Code*
-do_work_steal(MR_Code *join_label);
+do_work_steal(void);
static void
save_dirty_context(MR_Code *join_label);
/*
-** Prepare the engine to execute a spark. If join_label is not null then this
-** engine has a context that may not be compatible with the spark. If it isn't
-** then the context must be saved with join_label as the resume point.
+** Prepare the engine to execute a spark. Only call this if either:
+** 1) the engine does not have a context.
+** 2) the engine's context is free for use with the spark.
*/
static void
-prepare_engine_for_spark(volatile MR_Spark *spark, MR_Code *join_label);
+prepare_engine_for_spark(volatile MR_Spark *spark);
/*
** Prepare the engine to execute a context. This loads the context into the
@@ -1721,17 +1714,28 @@ MR_BEGIN_CODE
MR_define_entry(MR_do_idle);
#ifdef MR_THREAD_SAFE
{
+ MR_Code *jump_target;
/*
** Try to get a context.
*/
advertise_engine_state_idle();
- MR_MAYBE_TRAMPOLINE_AND_ACTION(do_get_context(),
- advertise_engine_state_working());
- MR_MAYBE_TRAMPOLINE_AND_ACTION(do_local_spark(NULL),
- advertise_engine_state_working());
- MR_MAYBE_TRAMPOLINE_AND_ACTION(do_work_steal(NULL),
- advertise_engine_state_working());
+ jump_target = do_get_context();
+ if (jump_target != NULL) {
+ advertise_engine_state_working();
+ MR_GOTO(jump_target);
+ }
+ jump_target = do_local_spark(NULL);
+ if (jump_target != NULL) {
+ advertise_engine_state_working();
+ MR_GOTO(jump_target);
+ }
+ jump_target = do_work_steal();
+ if (jump_target != NULL) {
+ advertise_engine_state_working();
+ MR_GOTO(jump_target);
+ }
+
MR_GOTO(MR_ENTRY(MR_do_sleep));
}
#else /* !MR_THREAD_SAFE */
@@ -1769,16 +1773,25 @@ MR_define_entry(MR_do_idle_clean_context);
#ifdef MR_THREADSCOPE
MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
#endif
-
- MR_MAYBE_TRAMPOLINE(do_get_context());
- MR_MAYBE_TRAMPOLINE(do_local_spark(NULL));
- MR_MAYBE_TRAMPOLINE(do_work_steal(NULL));
-
- /*
- ** XXX: I'm not convinced that the idle state is useful.
- */
advertise_engine_state_idle();
+ MR_Code *jump_target;
+ jump_target = do_get_context();
+ if (jump_target != NULL) {
+ advertise_engine_state_working();
+ MR_GOTO(jump_target);
+ }
+ jump_target = do_local_spark(NULL);
+ if (jump_target != NULL) {
+ advertise_engine_state_working();
+ MR_GOTO(jump_target);
+ }
+ jump_target = do_work_steal();
+ if (jump_target != NULL) {
+ advertise_engine_state_working();
+ MR_GOTO(jump_target);
+ }
+
MR_GOTO(MR_ENTRY(MR_do_sleep));
}
MR_END_MODULE
@@ -1791,46 +1804,33 @@ MR_BEGIN_CODE
MR_define_entry(MR_do_idle_dirty_context);
{
MR_Code *join_label = (MR_Code*)MR_r1;
+ MR_Code *jump_target;
- if (MR_runqueue_head != NULL) {
- /*
- ** There's a context in the run queue, save this context and attempt to
- ** get a context from the run queue.
- */
- save_dirty_context(join_label);
- MR_ENGINE(MR_eng_this_context) = NULL;
- MR_MAYBE_TRAMPOLINE(do_get_context());
-
- /*
- ** If execution reaches this point, then we lost a race for
- ** a free context. Tell threadscope about it,
- ** XXX: Consider making this a first-class event in the future.
- */
-#ifdef MR_THREADSCOPE
- MR_threadscope_post_log_msg(
- "Runtime: Lost a race for a runnable context.");
-#endif
- join_label = NULL;
+ /*
+ ** We check for local sparks first. If there is a local spark that is
+ ** compatible then execute it, it is 'left most' next to the computation
+ ** that this engine has just finished, and could be more optimal than
+ ** any suspended context. Let another engine pay the cost of the
+ ** context switch.
+ **
+ ** If there was an incompatible local spark then it may still be
+ ** executed after the jump to MR_do_idle below.
+ */
+ jump_target = do_local_spark(join_label);
+ if (jump_target != NULL) {
+ MR_GOTO(jump_target);
}
+ /*
+ ** Save our context and then look for work as per normal.
+ */
#ifdef MR_THREADSCOPE
MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
#endif
- MR_MAYBE_TRAMPOLINE(do_local_spark(join_label));
- MR_MAYBE_TRAMPOLINE(do_work_steal(join_label));
- if (join_label != NULL) {
- /*
- ** Save the dirty context. We cannot take it to sleep and
- ** it won't be used if do_get_context() succeeds.
- */
- save_dirty_context(join_label);
- MR_ENGINE(MR_eng_this_context) = NULL;
- MR_MAYBE_TRAMPOLINE(do_get_context());
- }
-
- advertise_engine_state_idle();
+ save_dirty_context(join_label);
+ MR_ENGINE(MR_eng_this_context) = NULL;
- MR_GOTO(MR_ENTRY(MR_do_sleep));
+ MR_GOTO(MR_ENTRY(MR_do_idle));
}
MR_END_MODULE
@@ -1853,6 +1853,7 @@ MR_define_entry(MR_do_sleep);
int result;
struct timespec ts;
struct timeval tv;
+ MR_Code *jump_target;
while (1) {
engine_sleep_sync_data[engine_id].d.es_state = ENGINE_STATE_SLEEPING;
@@ -1909,8 +1910,19 @@ MR_define_entry(MR_do_sleep);
MR_ENGINE(MR_eng_victim_counter) =
engine_sleep_sync_data[engine_id].d.es_action_data.
MR_ewa_worksteal_engine;
- MR_MAYBE_TRAMPOLINE(do_work_steal(NULL));
- MR_MAYBE_TRAMPOLINE(do_get_context());
+
+ jump_target = do_work_steal();
+ if (jump_target != NULL) {
+ engine_sleep_sync_data[engine_id].d.es_state =
+ ENGINE_STATE_WORKING;
+ MR_GOTO(jump_target);
+ }
+ jump_target = do_get_context();
+ if (jump_target != NULL) {
+ engine_sleep_sync_data[engine_id].d.es_state =
+ ENGINE_STATE_WORKING;
+ MR_GOTO(jump_target);
+ }
break;
case MR_ENGINE_ACTION_CONTEXT:
@@ -1918,6 +1930,8 @@ MR_define_entry(MR_do_sleep);
MR_Context *context;
MR_Code *resume_point;
+ engine_sleep_sync_data[engine_id].d.es_state =
+ ENGINE_STATE_WORKING;
context = engine_sleep_sync_data[engine_id].d.
es_action_data.MR_ewa_context;
prepare_engine_for_context(context);
@@ -1936,8 +1950,18 @@ MR_define_entry(MR_do_sleep);
case MR_ENGINE_ACTION_NONE:
default:
- MR_MAYBE_TRAMPOLINE(do_get_context());
- MR_MAYBE_TRAMPOLINE(do_work_steal(NULL));
+ jump_target = do_get_context();
+ if (jump_target != NULL) {
+ engine_sleep_sync_data[engine_id].d.es_state =
+ ENGINE_STATE_WORKING;
+ MR_GOTO(jump_target);
+ }
+ jump_target = do_work_steal();
+ if (jump_target != NULL) {
+ engine_sleep_sync_data[engine_id].d.es_state =
+ ENGINE_STATE_WORKING;
+ MR_GOTO(jump_target);
+ }
break;
}
} else {
@@ -1954,7 +1978,11 @@ MR_define_entry(MR_do_sleep);
/*
** A wait timed out, check for any sparks.
*/
- MR_MAYBE_TRAMPOLINE(do_work_steal(NULL));
+ jump_target = do_work_steal();
+ if (jump_target != NULL) {
+ advertise_engine_state_working();
+ MR_GOTO(jump_target);
+ }
break;
default:
perror("sem_timedwait");
@@ -2015,8 +2043,12 @@ prepare_engine_for_context(MR_Context *context) {
MR_debug_log_message("destroying old context %p",
MR_ENGINE(MR_eng_this_context));
#endif
+ /*
+ ** Saving the context is important. Details such as the current
+ ** stack pointer must be reset before the context is released.
+ */
MR_save_context(MR_ENGINE(MR_eng_this_context));
- MR_destroy_context(MR_ENGINE(MR_eng_this_context));
+ MR_release_context(MR_ENGINE(MR_eng_this_context));
}
MR_ENGINE(MR_eng_this_context) = context;
MR_load_context(context);
@@ -2026,27 +2058,10 @@ prepare_engine_for_context(MR_Context *context) {
}
static void
-prepare_engine_for_spark(volatile MR_Spark *spark, MR_Code *join_label)
+prepare_engine_for_spark(volatile MR_Spark *spark)
{
MR_Context *this_context = MR_ENGINE(MR_eng_this_context);
- /*
- ** We need to save this context if it is dirty and incompatible with
- ** this spark.
- */
- if ((this_context != NULL) &&
- (join_label != NULL) &&
- (spark->MR_spark_sync_term->MR_st_orig_context != this_context))
- {
-#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
- MR_debug_log_message("Saving old dirty context %p", this_context);
-#endif
- save_dirty_context(join_label);
-#ifdef MR_DEBUG_CONTEXT_CREATION_SPEED
- MR_debug_log_message("done.");
-#endif
- this_context = NULL;
- }
if (this_context == NULL) {
/*
** Get a new context
@@ -2095,14 +2110,14 @@ prepare_engine_for_spark(volatile MR_Spark *spark, MR_Code *join_label)
MR_SET_THREAD_LOCAL_MUTABLES(spark->MR_spark_thread_local_mutables);
MR_assert(MR_parent_sp);
- MR_assert(MR_parent_sp != MR_sp);
- MR_assert(spark.MR_spark_sync_term->MR_st_count > 0);
+ MR_assert(spark->MR_spark_sync_term->MR_st_count > 0);
}
static MR_Code*
do_local_spark(MR_Code *join_label)
{
volatile MR_Spark *spark;
+ MR_Context *this_context = MR_ENGINE(MR_eng_this_context);
#ifdef MR_THREADSCOPE
MR_threadscope_post_looking_for_local_spark();
@@ -2113,15 +2128,36 @@ do_local_spark(MR_Code *join_label)
return NULL;
}
+ /*
+ ** The current context may be dirty and incompatible with this spark, if
+ ** so we put the spark back ondo the deque. This test is only
+ ** applicable when running a local spark.
+ **
+ ** Our caller will then save the context and look for a different
+ ** context to run, if it cannot find a context then it will call this
+ ** function again to run the incompatible spark, allocating a new context.
+ */
+ if ((this_context != NULL) &&
+ (join_label != NULL) &&
+ (spark->MR_spark_sync_term->MR_st_orig_context != this_context))
+ {
+ /* The cast discards the volatile qualifier, which is okay */
+ MR_wsdeque_putback_bottom(&MR_ENGINE(MR_eng_spark_deque),
+ (MR_Spark*) spark);
+ return NULL;
+ }
+
#ifdef MR_THREADSCOPE
MR_threadscope_post_run_spark(spark->MR_spark_id);
#endif
- prepare_engine_for_spark(spark, join_label);
+
+ prepare_engine_for_spark(spark);
+
return spark->MR_spark_resume;
}
static MR_Code*
-do_work_steal(MR_Code *join_label)
+do_work_steal(void)
{
MR_Spark spark;
@@ -2133,18 +2169,15 @@ do_work_steal(MR_Code *join_label)
** A context may be created to execute a spark, so only attempt to
** steal sparks if doing so would not exceed the limit of outstanding
** contexts.
- **
- ** We used to check to see if the engine already had a context, in which
- ** case it can often re-use that context, but somethines it can't and a new
- ** context would be created anyway. Therefore that check has been removed.
*/
- if (MR_num_outstanding_contexts <= MR_max_outstanding_contexts) {
+ if ((MR_ENGINE(MR_eng_this_context) != NULL) ||
+ (MR_num_outstanding_contexts <= MR_max_outstanding_contexts)) {
/* Attempt to steal a spark */
if (MR_attempt_steal_spark(&spark)) {
#ifdef MR_THREADSCOPE
MR_threadscope_post_steal_spark(spark.MR_spark_id);
#endif
- prepare_engine_for_spark(&spark, join_label);
+ prepare_engine_for_spark(&spark);
return spark.MR_spark_resume;
}
}
@@ -2215,28 +2248,11 @@ MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
jnc_last = MR_atomic_dec_and_is_zero_uint(&(jnc_st->MR_st_count));
- if (this_context == jnc_st->MR_st_orig_context) {
+ if (jnc_last) {
/*
- ** This context originated this parallel conjunction.
+ ** All the conjuncts have finished,
*/
- if (jnc_last) {
- /*
- ** All the conjuncts have finished, so jump to the join label.
- */
- return join_label;
- } else {
- /*
- ** This context is dirty, it is needed to complete the parallel
- ** conjunction
- */
- MR_r1 = (MR_Word)join_label;
- return MR_ENTRY(MR_do_idle_dirty_context);
- }
- } else {
- /*
- ** This context is now clean, it can be used to execute _any_ spark.
- */
- if (jnc_last) {
+ if (this_context != jnc_st->MR_st_orig_context) {
#ifdef MR_THREADSCOPE
MR_threadscope_post_stop_context(MR_TS_STOP_REASON_FINISHED);
#endif
@@ -2259,15 +2275,26 @@ MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
MR_threadscope_post_context_runnable(jnc_st->MR_st_orig_context);
#endif
prepare_engine_for_context(jnc_st->MR_st_orig_context);
+ /*
+ ** This field must be reset to NULL
+ */
jnc_st->MR_st_orig_context->MR_ctxt_resume = NULL;
+ }
+
+ /*
+ ** Continue the parallel conjunction.
+ */
+ return join_label;
+ } else {
+ if (this_context == jnc_st->MR_st_orig_context) {
+ MR_r1 = (MR_Word)join_label;
+ return MR_ENTRY(MR_do_idle_dirty_context);
+ } else {
/*
- ** This field must be null before we execute MR_join_and_continue
- ** next.
+ ** This engine and context should look for other work.
*/
- MR_CPU_SFENCE;
- return join_label;
+ return MR_ENTRY(MR_do_idle_clean_context);
}
- return MR_ENTRY(MR_do_idle_clean_context);
}
}
#endif
diff --git a/runtime/mercury_context.h b/runtime/mercury_context.h
index 7e2b078..339a754 100644
--- a/runtime/mercury_context.h
+++ b/runtime/mercury_context.h
@@ -446,7 +446,7 @@ extern MR_Context *MR_create_context(const char *id,
MR_ContextSize ctxt_size, MR_Generator *gen);
/*
-** MR_destroy_context(context) returns the pointed-to context structure
+** MR_release_context(context) returns the pointed-to context structure
** to the free list, and releases resources as necessary.
**
** VERY IMPORTANT: Call MR_save_context() before you call
@@ -456,7 +456,7 @@ extern MR_Context *MR_create_context(const char *id,
** re-used elsewhere) stack segment may still be referenced by the context. If
** that context is re-used later then it will clober another context's stack!
*/
-extern void MR_destroy_context(MR_Context *context);
+extern void MR_release_context(MR_Context *context);
/*
** MR_init_context_stuff() initializes the lock structures for the runqueue,
diff --git a/runtime/mercury_engine.c b/runtime/mercury_engine.c
index 75ccb59..0fb35aa 100644
--- a/runtime/mercury_engine.c
+++ b/runtime/mercury_engine.c
@@ -168,8 +168,12 @@ void MR_finalize_engine(MercuryEngine *eng)
** might need to be finalized.
*/
if (eng->MR_eng_this_context) {
+ /*
+ ** Saving the context is very important before releasing it.
+ ** See the documentation for MR_release_context
+ */
MR_save_context(eng->MR_eng_this_context);
- MR_destroy_context(eng->MR_eng_this_context);
+ MR_release_context(eng->MR_eng_this_context);
}
#if MR_THREADSCOPE
diff --git a/runtime/mercury_par_builtin.h b/runtime/mercury_par_builtin.h
index 1b3eb00..a6b86bb 100644
--- a/runtime/mercury_par_builtin.h
+++ b/runtime/mercury_par_builtin.h
@@ -432,7 +432,7 @@ extern MR_LoopControl *MR_lc_create(unsigned num_workers);
(lc)->MR_lc_slots[i].MR_lcs_context->MR_ctxt_sp = \
(lc)->MR_lc_slots[i].MR_lcs_context-> \
MR_ctxt_detstack_zone->MR_zone_min; \
- MR_destroy_context((lc)->MR_lc_slots[i].MR_lcs_context); \
+ MR_release_context((lc)->MR_lc_slots[i].MR_lcs_context); \
} \
} \
} while (0);
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 490 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20120806/d9bc9804/attachment.sig>
More information about the reviews
mailing list