[m-rev.] for review: Support dynamic creation of Mercury engines in low-level C parallel grades.
Paul Bone
paul at bone.id.au
Tue Jun 24 19:42:50 AEST 2014
On Wed, Jun 18, 2014 at 05:27:26PM +1000, Peter Wang wrote:
> For review by Paul.
>
Thanks, Generally this looks good, I have some specific comments
throughout.
>
> This change allows Mercury engines (each in a separate OS thread) to be
> created and destroyed dynamically in low-level C grades.
>
> We divide Mercury engines into two types:
>
> "Shared" engines may execute code from any Mercury thread.
> Shared engines may steal work from other shared engines, so are also
> called work-stealing engines; we do not have shared engines that
> refrain from work-stealing.
>
> "Exclusive" engines execute code only for a single Mercury thread.
>
> Only exclusive engines may be created and destroyed dynamically so far.
> This assumption could be lifted when and if the need should arise.
>
> Exclusive engines are a means for the user to map a Mercury thread directly
> to an OS thread. Calls to blocking procedures on that thread will not block
> progress in arbitrary other Mercury threads. Foreign code which depends on
> the OS thread-local state is usable when called from that thread.
>
> We do not yet allow shared engines to steal parallel work from exclusive
> engines.
>
> runtime/mercury_wrapper.c:
> runtime/mercury_wrapper.h:
> Rename MR_num_threads to MR_num_ws_engines. It counts only
> work-stealing engines. Move comment to the header file.
>
> Add MR_max_engines. The default value is arbitrary.
>
> Add MERCURY_OPTIONS `--max-engines' option.
>
> Define MR_num_ws_engines and MR_max_engines only with
> MR_LL_PARALLEL_CONJ.
>
> runtime/mercury_context.c:
> runtime/mercury_context.h:
> Rename MR_num_idle_engines to MR_num_ws_engines. It only counts
> idle work-stealing engines.
MR_num_ws_engines is the same name as above. I guess, and hope, that this
comment is in error and these are seperate variables with distinct names.
>
> Extend MR_spark_deques to MR_max_engines length.
>
> Extend engine_sleep_sync_data to MR_max_engines length.
>
> Add function to index engine_sleep_sync_data with optional bounds
> checking.
>
> Replace instances of MR_num_threads by MR_num_ws_engines or
> MR_max_engines as appropriate.
>
> Add MR_ctxt_exclusive_engine field.
>
> Rename existing MR_Context fields to remove the implication that the
> engine "owns" the context. The new exclusive_engine field does
> imply a kind of ownership, hence potential confusion.
>
> Rename MR_SavedOwner, too.
>
> Make MR_find_ready_context respect MR_ctxt_exclusive_engine.
>
> Make MR_schedule_context respect MR_ctxt_exclusive_engine.
>
> Rename MR_try_wake_an_engine to MR_try_wake_ws_engine
> and restrict it to work-stealing engines.
What if a context that has an exclusive engine becomes runnable, how do you
wake it? I suggest instead parameterizing MR_try_wake_an_engine. Is it not
valid to send MR_ENGINE_ACTION_CONTEXT to an exclusive engine? For example
if the exclusive context blocked on a thread.channel or something?
> Rename MR_shutdown_all_engines to MR_shutdown_ws_engines
> and restrict it to work-stealing engines.
I was initially confused by this, i didn't realize that these engines would
shutdown by themselves when they finnished their work. The changelog should
reflect this.
> Make try_wake_engine and try_notify_engine decrement
> MR_num_idle_ws_engines only for shared engines.
>
> In MR_do_idle, make exclusive engines bypass work-stealing
> and skip to the sleep state.
>
> In MR_do_sleep, make exclusive engines ignore work-stealing advice
> and abort the program if told to shut down.
> Assert that a context with an exclusive_engine really is only loaded
> by that engine.
>
> In MR_fork_new_child, make exclusive engines not attempt to wake
> work-stealing engines. Its sparks cannot be stolen anyway.
>
> Make do_work_steal fail the attempt for exclusive engines.
> There is one call where this might happen.
Where? I couldn't find / didn't see it.
> Add notes to MR_attempt_steal_spark. Its behaviour is unchanged.
>
> Replace a call to MR_destroy_thread by MR_finalize_thread_engine.
>
> Delete MR_num_exited_engines. It was unused.
>
> runtime/mercury_thread.c:
> runtime/mercury_thread.h:
> Delete MR_next_engine_id and MR_next_engine_id_lock.
> Engine ids can now be recycled so they are not enough.
This didn't parse well.
> Extend MR_all_engine_bases to MR_max_engines entries.
>
> Add MR_all_engine_bases_lock to protect MR_all_engine_bases.
>
> Add MR_highest_engine_id.
>
> Add MR_EngineType with the two options described.
>
> Split the main part of MR_init_engine into a new function which
> accepts an engine type. MR_init_engine is used by generated code so
> maintain the interface.
>
> Factor out setup/shutdown for thread support.
>
> Make MR_finalize_thread_engine call the shutdown function.
>
> Specialise MR_create_thread into MR_create_worksteal_thread.
> The generic form was unused.
>
> Move thread pinning into MR_create_worksteal_thread as other threads
> do not require it.
>
> Delete MR_destroy_thread. Its one caller can use
> MR_finalize_thread_engine.
>
> Delete declaration for non-existent variable
> MR_init_engine_array_lock.
>
> runtime/mercury_memory_zones.c:
> Replace MR_num_threads by appropriate counters (I hope).
I never got around to testing and tuning those settings so I wouldn't worry.
Your changes look reasonable.
> runtime/mercury_memory_handlers.c:
> runtime/mercury_par_builtin.h:
> Conform to changes.
>
> runtime/mercury_threadscope.c:
> XXX don't know about this
Leave it, I'm working on some large scale changes for parallel profiling.
> library/thread.m:
> Add hidden predicate `spawn_native' for testing.
> The interface is subject to change.
>
> Share much of the code with the high-level C backend.
>
> library/par_builtin.m:
> Delete `num_os_threads' as it is unused.
>
> doc/user_guide.texi:
> Document MERCURY_OPTIONS `--max-engines' option.
> diff --git a/runtime/mercury_context.c b/runtime/mercury_context.c
> index db79ee3..382aa7d 100644
> --- a/runtime/mercury_context.c
> +++ b/runtime/mercury_context.c
> @@ -242,16 +249,15 @@ static MR_Context *free_small_context_list = NULL;
> #endif
>
> #ifdef MR_LL_PARALLEL_CONJ
> -MR_Integer volatile MR_num_idle_engines = 0;
> -MR_Unsigned volatile MR_num_exited_engines = 0;
> +MR_Integer volatile MR_num_idle_ws_engines = 0;
> static MR_Integer volatile MR_num_outstanding_contexts = 0;
> -static sem_t shutdown_semaphore;
> +static sem_t shutdown_ws_semaphore;
>
> static MercuryLock MR_par_cond_stats_lock;
> /*
> ** The spark deques are kept in engine id order.
> **
> -** This array will contain MR_num_threads pointers to deques.
> +** This array will contain MR_max_engines pointers to deques.
> */
> MR_SparkDeque **MR_spark_deques = NULL;
> #endif
If there arn't (currently) that many engines, then are some slots in the
array NULL? This comment should say whether or not this is true.
> @@ -1794,14 +1845,17 @@ try_wake_engine(MR_EngineId engine_id, int action,
> */
> MR_LOCK(&(esync->d.es_wake_lock), "try_wake_engine, wake_lock");
> if (esync->d.es_state == ENGINE_STATE_SLEEPING) {
> - MR_atomic_dec_int(&MR_num_idle_engines);
> + /* XXX not sure about this */
> + if (engine_id < MR_num_ws_engines) {
> + MR_atomic_dec_int(&MR_num_idle_ws_engines);
> #ifdef MR_DEBUG_THREADS
> if (MR_debug_threads) {
> - fprintf(stderr, "%ld Decrement MR_num_idle_engines %"
> + fprintf(stderr, "%ld Decrement MR_num_idle_ws_engines %"
> MR_INTEGER_LENGTH_MODIFIER "d\n",
> - MR_SELF_THREAD_ID, MR_num_idle_engines);
> + MR_SELF_THREAD_ID, MR_num_idle_ws_engines);
> }
> #endif
> + }
I think it's correct to decrement MR_num_idle_ws_engines for when waking
work stealing engines, not any engine. So this code is correct.
> diff --git a/runtime/mercury_context.h b/runtime/mercury_context.h
> index 47cb294..e033ef4 100644
> --- a/runtime/mercury_context.h
> +++ b/runtime/mercury_context.h
> @@ -254,6 +260,7 @@ struct MR_Spark_Struct {
> MR_Code *MR_spark_resume;
> MR_ThreadLocalMuts *MR_spark_thread_local_mutables;
> #ifdef MR_THREADSCOPE
> + /* XXX this is not wide enough for higher engine ids */
> MR_uint_least32_t MR_spark_id;
> #endif
> };
Okay thanks.
> diff --git a/runtime/mercury_thread.c b/runtime/mercury_thread.c
> index 99b5f68..33b1873 100644
> +/*
> +** Additional setup/shutdown of the engine for threads support.
> +*/
>
> -void
> -MR_destroy_thread(void *eng0)
> +static void
> +MR_setup_engine_for_threads(MercuryEngine *eng, MR_EngineType engine_type)
> {
> - MercuryEngine *eng = eng0;
> - MR_destroy_engine(eng);
> + #ifndef MR_HIGHLEVEL_CODE
> + MR_EngineId min;
> + MR_EngineId max;
> + MR_EngineId id;
> +
> + MR_LOCK(&MR_all_engine_bases_lock, "MR_setup_engine_for_threads");
> +
> + /* Allocate an engine id. */
> + if (engine_type == MR_ENGINE_TYPE_SHARED) {
> + min = 0;
> + max = MR_num_ws_engines;
> + } else {
> + min = MR_num_ws_engines;
> + max = MR_max_engines;
> + }
> + for (id = min; id < max; id++) {
> + if (MR_all_engine_bases[id] == NULL) {
> + break;
> }
> + }
> + if (id == max) {
> + MR_fatal_error("exhausted engine ids");
> + }
> + if (MR_highest_engine_id < id) {
> + MR_highest_engine_id = id;
> + }
> +
> + eng->MR_eng_id = id;
> + eng->MR_eng_type = engine_type;
> + eng->MR_eng_victim_counter = (id + 1) % MR_num_ws_engines;
> +
> + MR_all_engine_bases[id] = eng;
> + MR_spark_deques[id] = eng->MR_eng_spark_deque;
>
> + #ifdef MR_THREADSCOPE
> + MR_threadscope_setup_engine(eng);
> + #endif
> +
> + MR_UNLOCK(&MR_all_engine_bases_lock, "MR_setup_engine_for_threads");
> #endif
> +}
> +
> +static void
> +MR_shutdown_engine_for_threads(MercuryEngine *eng)
> +{
> + #ifndef MR_HIGHLEVEL_CODE
> + MR_EngineId id = eng->MR_eng_id;
> +
> + MR_LOCK(&MR_all_engine_bases_lock, "MR_shutdown_engine_for_threads");
> +
> + assert(MR_all_engine_bases[id] == eng);
> + MR_all_engine_bases[id] = NULL;
> +
> + if (MR_highest_engine_id == id) {
> + int i;
> + for (i = id - 1; i >= 0; i--) {
> + if (MR_all_engine_bases[i] != NULL) {
> + MR_highest_engine_id = (MR_EngineId) i;
> + break;
> + }
> + }
> + }
> +
> + assert(MR_spark_deques[id] == eng->MR_eng_spark_deque);
> + MR_spark_deques[id] = NULL;
> +
> + MR_UNLOCK(&MR_all_engine_bases_lock, "MR_shutdown_engine_for_threads");
> + #endif
> +}
> +#endif /* MR_THREAD_SAFE */
So we leave the engine sleep sync data slot allocated? Is that deliberate?
Or maybe I've forgotten something since I wrote that code.
> diff --git a/runtime/mercury_wrapper.c b/runtime/mercury_wrapper.c
> index f9c09ca..82aa750 100644
> --- a/runtime/mercury_wrapper.c
> +++ b/runtime/mercury_wrapper.c
> @@ -323,15 +323,10 @@ static char *MR_mem_usage_report_prefix = NULL;
>
> static int MR_num_output_args = 0;
>
> -/*
> -** This is initialized to zero. If it is still zero after configuration of the
> -** runtime but before threads are started, then we set it to the number of
> -** processors on the system (if support is available to detect this).
> -** Otherwise, we fall back to 1.
> -*/
> -MR_Unsigned MR_num_threads = 0;
> +#ifdef MR_LL_PARALLEL_CONJ
> +MR_Unsigned MR_num_ws_engines = 0;
> +MR_Unsigned MR_max_engines = 8192;
That's a lot of pthreads. However if you're not using them all then the
only cost is memory for three arrays that hold null pointers. 3*8*8192 =
192KB. It doesn't seem like much but, and I choose these words
specifically, 1024 pthreads ought to be enough for anybody.
> diff --git a/runtime/mercury_wrapper.h b/runtime/mercury_wrapper.h
> index 4f7ce48..3a4a50c 100644
> --- a/runtime/mercury_wrapper.h
> +++ b/runtime/mercury_wrapper.h
> @@ -258,7 +258,7 @@ extern MR_Unsigned MR_contexts_per_thread;
>
> /*
> ** The number of outstanding contexts we can create
> -** (MR_contexts_per_thread * MR_num_threads).
> +** (MR_contexts_per_thread * MR_num_ws_engines).
> */
> extern MR_Unsigned MR_max_outstanding_contexts;
>
That raises a question. Are contexts created for thread.spawn_native
counted towards this limit?
--
Paul Bone
More information about the reviews
mailing list