[m-rev.] for review: Support dynamic creation of Mercury engines in low-level C parallel grades.
Peter Wang
novalazy at gmail.com
Wed Jun 18 17:27:26 AEST 2014
For review by Paul.
----
This change allows Mercury engines (each in a separate OS thread) to be
created and destroyed dynamically in low-level C grades.
We divide Mercury engines into two types:
"Shared" engines may execute code from any Mercury thread.
Shared engines may steal work from other shared engines, so are also
called work-stealing engines; we do not have shared engines that
refrain from work-stealing.
"Exclusive" engines execute code only for a single Mercury thread.
Only exclusive engines may be created and destroyed dynamically so far.
This assumption could be lifted when and if the need should arise.
Exclusive engines are a means for the user to map a Mercury thread directly
to an OS thread. Calls to blocking procedures on that thread will not block
progress in arbitrary other Mercury threads. Foreign code which depends on
the OS thread-local state is usable when called from that thread.
We do not yet allow shared engines to steal parallel work from exclusive
engines.
runtime/mercury_wrapper.c:
runtime/mercury_wrapper.h:
Rename MR_num_threads to MR_num_ws_engines. It counts only
work-stealing engines. Move comment to the header file.
Add MR_max_engines. The default value is arbitrary.
Add MERCURY_OPTIONS `--max-engines' option.
Define MR_num_ws_engines and MR_max_engines only with
MR_LL_PARALLEL_CONJ.
runtime/mercury_context.c:
runtime/mercury_context.h:
Rename MR_num_idle_engines to MR_num_ws_engines. It only counts
idle work-stealing engines.
Extend MR_spark_deques to MR_max_engines length.
Extend engine_sleep_sync_data to MR_max_engines length.
Add function to index engine_sleep_sync_data with optional bounds
checking.
Replace instances of MR_num_threads by MR_num_ws_engines or
MR_max_engines as appropriate.
Add MR_ctxt_exclusive_engine field.
Rename existing MR_Context fields to remove the implication that the
engine "owns" the context. The new exclusive_engine field does
imply a kind of ownership, hence potential confusion.
Rename MR_SavedOwner, too.
Make MR_find_ready_context respect MR_ctxt_exclusive_engine.
Make MR_schedule_context respect MR_ctxt_exclusive_engine.
Rename MR_try_wake_an_engine to MR_try_wake_ws_engine
and restrict it to work-stealing engines.
Rename MR_shutdown_all_engines to MR_shutdown_ws_engines
and restrict it to work-stealing engines.
Make try_wake_engine and try_notify_engine decrement
MR_num_idle_ws_engines only for shared engines.
In MR_do_idle, make exclusive engines bypass work-stealing
and skip to the sleep state.
In MR_do_sleep, make exclusive engines ignore work-stealing advice
and abort the program if told to shut down.
Assert that a context with an exclusive_engine really is only loaded
by that engine.
In MR_fork_new_child, make exclusive engines not attempt to wake
work-stealing engines. Its sparks cannot be stolen anyway.
Make do_work_steal fail the attempt for exclusive engines.
There is one call where this might happen.
Add notes to MR_attempt_steal_spark. Its behaviour is unchanged.
Replace a call to MR_destroy_thread by MR_finalize_thread_engine.
Delete MR_num_exited_engines. It was unused.
runtime/mercury_thread.c:
runtime/mercury_thread.h:
Delete MR_next_engine_id and MR_next_engine_id_lock.
Engine ids can now be recycled so they are not enough.
Extend MR_all_engine_bases to MR_max_engines entries.
Add MR_all_engine_bases_lock to protect MR_all_engine_bases.
Add MR_highest_engine_id.
Add MR_EngineType with the two options described.
Split the main part of MR_init_engine into a new function which
accepts an engine type. MR_init_engine is used by generated code so
maintain the interface.
Factor out setup/shutdown for thread support.
Make MR_finalize_thread_engine call the shutdown function.
Specialise MR_create_thread into MR_create_worksteal_thread.
The generic form was unused.
Move thread pinning into MR_create_worksteal_thread as other threads
do not require it.
Delete MR_destroy_thread. Its one caller can use
MR_finalize_thread_engine.
Delete declaration for non-existent variable
MR_init_engine_array_lock.
runtime/mercury_engine.c:
runtime/mercury_engine.h:
Add MR_eng_type field.
Make MR_eng_spark_deque a pointer to separately-allocated memory.
The reason is given in MR_attempt_steal_spark.
Add MR_ENGINE_ID_NONE, a dummy value for MR_ctxt_exclusive_engine.
Delete misplaced declaration of MR_all_engine_bases.
runtime/mercury_memory_zones.c:
Replace MR_num_threads by appropriate counters (I hope).
runtime/mercury_memory_handlers.c:
runtime/mercury_par_builtin.h:
Conform to changes.
runtime/mercury_threadscope.c:
XXX don't know about this
library/thread.m:
Add hidden predicate `spawn_native' for testing.
The interface is subject to change.
Share much of the code with the high-level C backend.
library/par_builtin.m:
Delete `num_os_threads' as it is unused.
doc/user_guide.texi:
Document MERCURY_OPTIONS `--max-engines' option.
---
doc/user_guide.texi | 7 +
library/par_builtin.m | 22 ---
library/thread.m | 142 ++++++++++++-----
runtime/mercury_context.c | 314 ++++++++++++++++++++++++--------------
runtime/mercury_context.h | 74 ++++-----
runtime/mercury_engine.c | 43 +++---
runtime/mercury_engine.h | 22 ++-
runtime/mercury_memory_handlers.c | 2 +-
runtime/mercury_memory_zones.c | 15 +-
runtime/mercury_par_builtin.h | 6 +-
runtime/mercury_thread.c | 189 ++++++++++++++---------
runtime/mercury_thread.h | 63 +++++---
runtime/mercury_threadscope.c | 2 +-
runtime/mercury_wrapper.c | 58 ++++---
runtime/mercury_wrapper.h | 22 ++-
15 files changed, 618 insertions(+), 363 deletions(-)
diff --git a/doc/user_guide.texi b/doc/user_guide.texi
index cea1f54..84b85f3 100644
--- a/doc/user_guide.texi
+++ b/doc/user_guide.texi
@@ -10257,6 +10257,13 @@ is available from the operating system.
If it cannot or support is unavailable it defaults to @samp{1}.
@sp 1
+ at item --max-engines @var{num}
+ at findex --max-engines (runtime option)
+Tells the runtime system to allow a maximum of @var{num}
+POSIX threads, each with its own Mercury engine.
+This only affects programs in low-level C parallel grades.
+
+ at sp 1
@item --max-contexts-per-thread @var{num}
@findex --max-contexts-per-thread (runtime option)
Tells the runtime system to create at most @var{num} contexts per
diff --git a/library/par_builtin.m b/library/par_builtin.m
index 6c2b1a5..3b30342 100644
--- a/library/par_builtin.m
+++ b/library/par_builtin.m
@@ -124,15 +124,6 @@
%
:- impure pred evaluate_parallelism_condition is semidet.
- % num_os_threads(Num)
- %
- % Num is the number of OS threads the runtime is configured to use, which
- % the runtime records in the variable MR_num_threads. This is the value
- % given by the user as the argument of the -P option in the MERCURY_OPTIONS
- % environment variable.
- %
-:- pred num_os_threads(int::out) is det.
-
% Close the file that was used to log the parallel condition decisions.
%
% The parallel condition stats file is opened the first time it is
@@ -507,19 +498,6 @@ mercury_sys_init_lc_write_out_proc_statics(FILE *deep_fp,
%-----------------------------------------------------------------------------%
:- pragma foreign_proc("C",
- num_os_threads(NThreads::out),
- [will_not_call_mercury, will_not_throw_exception, thread_safe,
- promise_pure],
-"
- /*
- ** MR_num_threads is available in all grades. Although it won't make sense
- ** for non-parallel grades, it will still reflect the value configured by
- ** the user.
- */
- NThreads = MR_num_threads
-").
-
-:- pragma foreign_proc("C",
par_cond_close_stats_file(_IO0::di, _IO::uo),
[will_not_call_mercury, thread_safe, promise_pure, tabled_for_io],
"
diff --git a/library/thread.m b/library/thread.m
index 385bf06..3555d3d 100644
--- a/library/thread.m
+++ b/library/thread.m
@@ -56,6 +56,17 @@
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
+:- interface.
+
+ % spawn_native(Closure, IO0, IO)
+ % Currently only for testing.
+ %
+:- pred spawn_native(pred(io, io)::in(pred(di, uo) is cc_multi),
+ io::di, io::uo) is cc_multi.
+
+%-----------------------------------------------------------------------------%
+%-----------------------------------------------------------------------------%
+
:- implementation.
:- pragma foreign_decl("C", "
@@ -121,9 +132,7 @@
#if !defined(MR_HIGHLEVEL_CODE)
MR_Context *ctxt;
- MR_LOCK(&MR_thread_barrier_lock, ""thread.spawn"");
- MR_thread_barrier_count++;
- MR_UNLOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+ ML_incr_thread_barrier_count();
ctxt = MR_create_context(""spawn"", MR_CONTEXT_SIZE_REGULAR, NULL);
ctxt->MR_ctxt_resume = MR_ENTRY(mercury__thread__spawn_begin_thread);
@@ -141,7 +150,7 @@
#else /* MR_HIGHLEVEL_CODE */
#if defined(MR_THREAD_SAFE)
- ML_create_thread(Goal);
+ ML_create_exclusive_thread(Goal);
#else
MR_fatal_error(""spawn/3 requires a .par grade in high-level C grades."");
#endif
@@ -174,6 +183,23 @@
%-----------------------------------------------------------------------------%
+spawn_native(_Goal, !IO) :-
+ private_builtin.sorry("spawn_native").
+
+:- pragma foreign_proc("C",
+ spawn_native(Goal::(pred(di, uo) is cc_multi), _IO0::di, _IO::uo),
+ [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+#ifdef MR_THREAD_SAFE
+ ML_create_exclusive_thread(Goal);
+#else
+ MR_fatal_error(""spawn_native/3 requires a .par grade."");
+#endif
+").
+
+%-----------------------------------------------------------------------------%
+
:- pragma no_inline(yield/2).
:- pragma foreign_proc("C",
yield(_IO0::di, _IO::uo),
@@ -258,20 +284,7 @@ INIT mercury_sys_init_thread_modules
}
MR_define_label(mercury__thread__spawn_end_thread);
{
- MR_LOCK(&MR_thread_barrier_lock, ""thread__spawn_end_thread"");
- MR_thread_barrier_count--;
- if (MR_thread_barrier_count == 0) {
- /*
- ** If this is the last spawned context to terminate and the
- ** main context was just waiting on us in order to terminate
- ** then reschedule the main context.
- */
- if (MR_thread_barrier_context) {
- MR_schedule_context(MR_thread_barrier_context);
- MR_thread_barrier_context = NULL;
- }
- }
- MR_UNLOCK(&MR_thread_barrier_lock, ""thread__spawn_end_thread"");
+ ML_decr_thread_barrier_count();
MR_save_context(MR_ENGINE(MR_eng_this_context));
MR_release_context(MR_ENGINE(MR_eng_this_context));
@@ -318,27 +331,27 @@ INIT mercury_sys_init_thread_modules
%-----------------------------------------------------------------------------%
%
-% High-level C implementation
+% High-level C and low-level C exclusive threads
%
:- pragma foreign_decl("C", "
-#if defined(MR_HIGHLEVEL_CODE) && defined(MR_THREAD_SAFE)
+#if defined(MR_THREAD_SAFE)
#include <pthread.h>
- int ML_create_thread(MR_Word goal);
- void *ML_thread_wrapper(void *arg);
+ int ML_create_exclusive_thread(MR_Word goal);
+ void *ML_exclusive_thread_wrapper(void *arg);
typedef struct ML_ThreadWrapperArgs ML_ThreadWrapperArgs;
struct ML_ThreadWrapperArgs {
MR_Word goal;
MR_ThreadLocalMuts *thread_local_mutables;
};
-#endif /* MR_HIGHLEVEL_CODE && MR_THREAD_SAFE */
+#endif /* MR_THREAD_SAFE */
").
:- pragma foreign_code("C", "
-#if defined(MR_HIGHLEVEL_CODE) && defined(MR_THREAD_SAFE)
- int ML_create_thread(MR_Word goal)
+#if defined(MR_THREAD_SAFE)
+ int ML_create_exclusive_thread(MR_Word goal)
{
ML_ThreadWrapperArgs *args;
pthread_t thread;
@@ -354,13 +367,11 @@ INIT mercury_sys_init_thread_modules
args->thread_local_mutables =
MR_clone_thread_local_mutables(MR_THREAD_LOCAL_MUTABLES);
- MR_LOCK(&MR_thread_barrier_lock, ""thread.spawn"");
- MR_thread_barrier_count++;
- MR_UNLOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+ ML_incr_thread_barrier_count();
pthread_attr_init(&attrs);
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
- if (pthread_create(&thread, &attrs, ML_thread_wrapper, args)) {
+ if (pthread_create(&thread, &attrs, ML_exclusive_thread_wrapper, args)) {
MR_fatal_error(""Unable to create thread."");
}
pthread_attr_destroy(&attrs);
@@ -368,7 +379,7 @@ INIT mercury_sys_init_thread_modules
return MR_TRUE;
}
- void *ML_thread_wrapper(void *arg)
+ void *ML_exclusive_thread_wrapper(void *arg)
{
ML_ThreadWrapperArgs *args = arg;
MR_Word goal;
@@ -377,6 +388,18 @@ INIT mercury_sys_init_thread_modules
MR_fatal_error(""Unable to init thread."");
}
+ /*
+ ** Set the context to have the current engine as its exclusive engine.
+ ** MR_ctxt_resume_c_depth must be initialised to the correct value
+ ** immediately, and it must account for MR_call_engine incrementing
+ ** MR_eng_c_depth.
+ */
+ MR_assert(MR_ENGINE(MR_eng_this_context) != NULL);
+ MR_ENGINE(MR_eng_this_context)->MR_ctxt_exclusive_engine =
+ MR_ENGINE(MR_eng_id);
+ MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume_c_depth =
+ MR_ENGINE(MR_eng_c_depth) + 1;
+
MR_assert(MR_THREAD_LOCAL_MUTABLES == NULL);
MR_SET_THREAD_LOCAL_MUTABLES(args->thread_local_mutables);
@@ -385,18 +408,13 @@ INIT mercury_sys_init_thread_modules
ML_call_back_to_mercury_cc_multi(goal);
- MR_finalize_thread_engine();;
+ MR_finalize_thread_engine();
- MR_LOCK(&MR_thread_barrier_lock, ""ML_thread_wrapper"");
- MR_thread_barrier_count--;
- if (MR_thread_barrier_count == 0) {
- MR_SIGNAL(&MR_thread_barrier_cond, ""ML_thread_wrapper"");
- }
- MR_UNLOCK(&MR_thread_barrier_lock, ""ML_thread_wrapper"");
+ ML_decr_thread_barrier_count();
return NULL;
}
-#endif /* MR_HIGHLEVEL_CODE && MR_THREAD_SAFE */
+#endif /* MR_THREAD_SAFE */
").
:- pred call_back_to_mercury(pred(io, io), io, io).
@@ -419,6 +437,54 @@ call_back_to_mercury(Goal, !IO) :-
%-----------------------------------------------------------------------------%
+:- pragma foreign_decl("C",
+"
+#if defined(MR_THREAD_SAFE) || !defined(MR_HIGHLEVEL_CODE)
+ static void ML_incr_thread_barrier_count(void);
+ static void ML_decr_thread_barrier_count(void);
+#endif
+").
+
+:- pragma foreign_code("C",
+"
+#if defined(MR_THREAD_SAFE) || !defined(MR_HIGHLEVEL_CODE)
+
+ static void ML_incr_thread_barrier_count(void)
+ {
+ MR_LOCK(&MR_thread_barrier_lock, ""ML_incr_thread_barrier_count"");
+ MR_thread_barrier_count++;
+ MR_UNLOCK(&MR_thread_barrier_lock, ""ML_incr_thread_barrier_count"");
+ }
+
+ static void ML_decr_thread_barrier_count(void)
+ {
+ MR_LOCK(&MR_thread_barrier_lock, ""ML_decr_thread_barrier_count"");
+ MR_thread_barrier_count--;
+ #ifdef MR_HIGHLEVEL_CODE
+ if (MR_thread_barrier_count == 0) {
+ MR_SIGNAL(&MR_thread_barrier_cond, ""ML_decr_thread_barrier_count"");
+ }
+ #else
+ if (MR_thread_barrier_count == 0) {
+ /*
+ ** If this is the last spawned context to terminate and the
+ ** main context was just waiting on us in order to terminate
+ ** then reschedule the main context.
+ */
+ if (MR_thread_barrier_context) {
+ MR_schedule_context(MR_thread_barrier_context);
+ MR_thread_barrier_context = NULL;
+ }
+ }
+ #endif
+ MR_UNLOCK(&MR_thread_barrier_lock, ""ML_decr_thread_barrier_count"");
+ }
+
+#endif /* MR_THREAD_SAFE || !MR_HIGHLEVEL_CODE */
+").
+
+%-----------------------------------------------------------------------------%
+
:- pragma foreign_code("C#", "
public class MercuryThread {
object[] Goal;
diff --git a/runtime/mercury_context.c b/runtime/mercury_context.c
index db79ee3..382aa7d 100644
--- a/runtime/mercury_context.c
+++ b/runtime/mercury_context.c
@@ -151,6 +151,13 @@ typedef struct {
static
engine_sleep_sync *engine_sleep_sync_data;
+
+static engine_sleep_sync *
+get_engine_sleep_sync_data(MR_EngineId i)
+{
+ MR_assert(i < MR_max_engines);
+ return &engine_sleep_sync_data[i];
+}
#endif /* MR_LL_PARALLEL_CONJ */
@@ -242,16 +249,15 @@ static MR_Context *free_small_context_list = NULL;
#endif
#ifdef MR_LL_PARALLEL_CONJ
-MR_Integer volatile MR_num_idle_engines = 0;
-MR_Unsigned volatile MR_num_exited_engines = 0;
+MR_Integer volatile MR_num_idle_ws_engines = 0;
static MR_Integer volatile MR_num_outstanding_contexts = 0;
-static sem_t shutdown_semaphore;
+static sem_t shutdown_ws_semaphore;
static MercuryLock MR_par_cond_stats_lock;
/*
** The spark deques are kept in engine id order.
**
-** This array will contain MR_num_threads pointers to deques.
+** This array will contain MR_max_engines pointers to deques.
*/
MR_SparkDeque **MR_spark_deques = NULL;
#endif
@@ -342,7 +348,7 @@ MR_init_context_stuff(void)
#ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
pthread_mutex_init(&MR_par_cond_stats_lock, MR_MUTEX_ATTR);
#endif
- sem_init(&shutdown_semaphore, 0, 0);
+ sem_init(&shutdown_ws_semaphore, 0, 0);
#endif
pthread_mutex_init(&MR_STM_lock, MR_MUTEX_ATTR);
@@ -356,24 +362,27 @@ MR_init_context_stuff(void)
MR_setup_thread_pinning();
#endif
MR_granularity_wsdeque_length =
- MR_granularity_wsdeque_length_factor * MR_num_threads;
+ MR_granularity_wsdeque_length_factor * MR_num_ws_engines;
MR_spark_deques = MR_GC_NEW_ARRAY_ATTRIB(MR_SparkDeque*,
- MR_num_threads, MR_ALLOC_SITE_RUNTIME);
- engine_sleep_sync_data = MR_GC_NEW_ARRAY_ATTRIB(engine_sleep_sync,
- MR_num_threads, MR_ALLOC_SITE_RUNTIME);
- for (i = 0; i < MR_num_threads; i++) {
+ MR_max_engines, MR_ALLOC_SITE_RUNTIME);
+ for (i = 0; i < MR_max_engines; i++) {
MR_spark_deques[i] = NULL;
+ }
+
+ engine_sleep_sync_data = MR_GC_NEW_ARRAY_ATTRIB(engine_sleep_sync,
+ MR_max_engines, MR_ALLOC_SITE_RUNTIME);
+ for (i = 0; i < MR_max_engines; i++) {
+ engine_sleep_sync *esync = get_engine_sleep_sync_data(i);
- sem_init(&(engine_sleep_sync_data[i].d.es_sleep_semaphore), 0, 0);
- pthread_mutex_init(&(engine_sleep_sync_data[i].d.es_wake_lock),
- MR_MUTEX_ATTR);
+ sem_init(&esync->d.es_sleep_semaphore, 0, 0);
+ pthread_mutex_init(&esync->d.es_wake_lock, MR_MUTEX_ATTR);
/*
** All engines are initially working (because telling them to wake up
** before they are started would be useless).
*/
- engine_sleep_sync_data[i].d.es_state = ENGINE_STATE_WORKING;
- engine_sleep_sync_data[i].d.es_action = MR_ENGINE_ACTION_NONE;
+ esync->d.es_state = ENGINE_STATE_WORKING;
+ esync->d.es_action = MR_ENGINE_ACTION_NONE;
}
#endif
#endif /* MR_THREAD_SAFE */
@@ -472,18 +481,21 @@ static void MR_setup_thread_pinning(void)
MR_num_processors = num_processors;
/*
- ** If MR_num_threads is unset, configure it to match number of processors
- ** on the system. If we do this, then we prepare to set processor
- ** affinities later on.
+ ** If MR_num_ws_engines is unset, configure it to match number of
+ ** processors on the system. If we do this, then we prepare to set
+ ** processor affinities later on.
*/
- if (MR_num_threads == 0) {
- MR_num_threads = num_processors;
+ if (MR_num_ws_engines == 0) {
+ MR_num_ws_engines = num_processors;
+ }
+ if (MR_max_engines < MR_num_ws_engines) {
+ MR_max_engines = MR_num_ws_engines;
}
- MR_num_threads_left_to_pin = MR_num_threads;
+ MR_num_threads_left_to_pin = MR_num_ws_engines;
#ifdef MR_DEBUG_THREAD_PINNING
fprintf(stderr, "Detected %d available processors, will use %d threads\n",
- MR_num_processors, MR_num_threads);
+ MR_num_processors, MR_num_ws_engines);
#endif
pthread_mutex_init(&MR_thread_pinning_lock, MR_MUTEX_ATTR);
@@ -738,7 +750,7 @@ MR_finalize_context_stuff(void)
pthread_mutex_destroy(&MR_runqueue_lock);
pthread_mutex_destroy(&free_context_list_lock);
#ifdef MR_LL_PARALLEL_CONJ
- sem_destroy(&shutdown_semaphore);
+ sem_destroy(&shutdown_ws_semaphore);
#endif
#endif
@@ -893,10 +905,11 @@ MR_init_context_maybe_generator(MR_Context *c, const char *id,
c->MR_ctxt_next = NULL;
c->MR_ctxt_resume = NULL;
#ifdef MR_THREAD_SAFE
- c->MR_ctxt_resume_owner_engine = 0;
+ c->MR_ctxt_exclusive_engine = MR_ENGINE_ID_NONE;
+ c->MR_ctxt_resume_engine = 0;
c->MR_ctxt_resume_engine_required = MR_FALSE;
c->MR_ctxt_resume_c_depth = 0;
- c->MR_ctxt_saved_owners = NULL;
+ c->MR_ctxt_resume_stack = NULL;
#endif
#ifndef MR_HIGHLEVEL_CODE
@@ -1161,7 +1174,7 @@ MR_release_context(MR_Context *c)
#endif
#ifdef MR_THREAD_SAFE
- MR_assert(c->MR_ctxt_saved_owners == NULL);
+ MR_assert(c->MR_ctxt_resume_stack == NULL);
#endif
/*
@@ -1169,7 +1182,7 @@ MR_release_context(MR_Context *c)
** retrieve one with a matching engine ID, or give each engine a local
** cache of spare contexts.
#ifdef MR_LL_PARALLEL_CONJ
- c->MR_ctxt_resume_owner_engine = MR_ENGINE(MR_eng_id);
+ c->MR_ctxt_resume_engine = MR_ENGINE(MR_eng_id);
#endif
*/
@@ -1250,6 +1263,9 @@ MR_find_ready_context(void)
preferred_context = NULL;
preferred_context_prev = NULL;
while (cur != NULL) {
+ MR_bool specific_engine_required;
+ MR_EngineId specific_engine;
+
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
fprintf(stderr,
@@ -1258,18 +1274,30 @@ MR_find_ready_context(void)
MR_SELF_THREAD_ID, engine_id, depth, cur);
}
#endif
- if (cur->MR_ctxt_resume_engine_required == MR_TRUE) {
+
+ if (cur->MR_ctxt_exclusive_engine != MR_ENGINE_ID_NONE) {
+ specific_engine_required = MR_TRUE;
+ specific_engine = cur->MR_ctxt_exclusive_engine;
+ } else if (cur->MR_ctxt_resume_engine_required) {
+ specific_engine_required = MR_TRUE;
+ specific_engine = cur->MR_ctxt_resume_engine;
+ } else {
+ specific_engine_required = MR_FALSE;
+ specific_engine = MR_ENGINE_ID_NONE;
+ }
+
+ if (specific_engine_required) {
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
fprintf(stderr,
- "%ld Context requires engine %d and c_depth %"
+ "%ld Context %p requires engine %d and c_depth %"
MR_INTEGER_LENGTH_MODIFIER "u\n",
- MR_SELF_THREAD_ID, cur->MR_ctxt_resume_owner_engine,
+ MR_SELF_THREAD_ID, cur, specific_engine,
cur->MR_ctxt_resume_c_depth);
}
#endif
- if ((cur->MR_ctxt_resume_owner_engine == engine_id) &&
- (cur->MR_ctxt_resume_c_depth == depth))
+ if (specific_engine == engine_id &&
+ cur->MR_ctxt_resume_c_depth == depth)
{
preferred_context = cur;
preferred_context_prev = prev;
@@ -1283,10 +1311,10 @@ MR_find_ready_context(void)
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
fprintf(stderr, "%ld Context prefers engine %d\n",
- MR_SELF_THREAD_ID, cur->MR_ctxt_resume_owner_engine);
+ MR_SELF_THREAD_ID, cur->MR_ctxt_resume_engine);
}
#endif
- if (cur->MR_ctxt_resume_owner_engine == engine_id) {
+ if (cur->MR_ctxt_resume_engine == engine_id) {
/*
** This context prefers to be ran on this engine.
*/
@@ -1339,20 +1367,35 @@ MR_attempt_steal_spark(MR_Spark *spark)
int i;
int offset;
int victim_id;
+ int max_victim_id;
MR_SparkDeque *victim;
int steal_result;
MR_bool result = MR_FALSE;
offset = MR_ENGINE(MR_eng_victim_counter);
- for (i = 0; i < MR_num_threads; i++) {
- victim_id = (i + offset) % MR_num_threads;
+ /*
+ ** This is the highest victim to attempt stealing from. We do not
+ ** steal from exclusive engines, numbered from MR_num_ws_engines up.
+ ** To try that out, set max_victim_id to MR_highest_engine_id and
+ ** change the condition in MR_fork_new_child.
+ */
+ max_victim_id = MR_num_ws_engines;
+
+ for (i = 0; i < max_victim_id; i++) {
+ victim_id = (i + offset) % max_victim_id;
if (victim_id == MR_ENGINE(MR_eng_id)) {
/*
** There's no point in stealing from ourself.
*/
continue;
}
+ /*
+ ** The victim engine may be shutting down as we attempt to steal from
+ ** it. However, the spark deque is allocated separately so that it may
+ ** outlive the engine, and since the spark deque must be empty when the
+ ** engine is destroyed, any attempt to steal from it must fail.
+ */
victim = MR_spark_deques[victim_id];
if (victim != NULL) {
steal_result = MR_wsdeque_steal_top(victim, spark);
@@ -1546,6 +1589,7 @@ MR_schedule_context(MR_Context *ctxt)
{
#ifdef MR_LL_PARALLEL_CONJ
MR_EngineId engine_id;
+ MR_bool engine_required;
union MR_engine_wake_action_data notify_context_data;
engine_sleep_sync *esync;
@@ -1558,15 +1602,22 @@ MR_schedule_context(MR_Context *ctxt)
/*
** Try to give this context straight to the engine that would execute it.
*/
- engine_id = ctxt->MR_ctxt_resume_owner_engine;
- esync = &(engine_sleep_sync_data[engine_id]);
+ if (ctxt->MR_ctxt_exclusive_engine != MR_ENGINE_ID_NONE) {
+ engine_id = ctxt->MR_ctxt_exclusive_engine;
+ engine_required = MR_TRUE;
+ } else {
+ engine_id = ctxt->MR_ctxt_resume_engine;
+ engine_required = ctxt->MR_ctxt_resume_engine_required;
+ }
+ esync = get_engine_sleep_sync_data(engine_id);
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Scheduling context %p desired engine: %d\n",
- MR_SELF_THREAD_ID, ctxt, engine_id);
+ fprintf(stderr,
+ "%ld Scheduling context %p desired engine: %d required: %d\n",
+ MR_SELF_THREAD_ID, ctxt, engine_id, engine_required);
}
#endif
- if (ctxt->MR_ctxt_resume_engine_required == MR_TRUE) {
+ if (engine_required) {
/*
** Only engine_id may execute this context, attempt to wake it.
**
@@ -1628,8 +1679,8 @@ MR_schedule_context(MR_Context *ctxt)
** If there is some idle engine, try to wake it up, starting with the
** preferred engine.
*/
- if (MR_num_idle_engines > 0) {
- if (MR_try_wake_an_engine(engine_id, MR_ENGINE_ACTION_CONTEXT,
+ if (MR_num_idle_ws_engines > 0) {
+ if (MR_try_wake_ws_engine(engine_id, MR_ENGINE_ACTION_CONTEXT,
¬ify_context_data, NULL))
{
/*
@@ -1653,7 +1704,7 @@ MR_schedule_context(MR_Context *ctxt)
MR_UNLOCK(&MR_runqueue_lock, "schedule_context");
#ifdef MR_LL_PARALLEL_CONJ
- if (ctxt->MR_ctxt_resume_engine_required == MR_TRUE) {
+ if (engine_required) {
/*
** The engine is only runnable on a single context, that context was
** busy earlier and couldn't be handed the engine. If that context
@@ -1661,8 +1712,8 @@ MR_schedule_context(MR_Context *ctxt)
** (where we just put the context). Therefore we re-attempt to
** notify the engine to ensure that it re-checks the runqueue.
**
- ** This is only a problem with only a single engine can execute a
- ** context, in any other case the current engine will eventually check
+ ** This is only a problem when only a single engine can execute a
+ ** context. In any other case the current engine will eventually check
** the runqueue.
**
** The updates to the run queue are guaranteed by the compiler and
@@ -1699,10 +1750,10 @@ MR_schedule_context(MR_Context *ctxt)
#ifdef MR_LL_PARALLEL_CONJ
/*
-** Try to wake an engine, starting at the preferred engine.
+** Try to wake a work-stealing engine, starting at the preferred engine.
*/
MR_bool
-MR_try_wake_an_engine(MR_EngineId preferred_engine, int action,
+MR_try_wake_ws_engine(MR_EngineId preferred_engine, int action,
union MR_engine_wake_action_data *action_data, MR_EngineId *target_eng)
{
MR_EngineId current_engine;
@@ -1734,15 +1785,15 @@ MR_try_wake_an_engine(MR_EngineId preferred_engine, int action,
** Right now this algorithm is naive, it searches from the preferred engine
** around the loop until it finds an engine.
*/
- for (i = 0; i < MR_num_threads; i++) {
- current_engine = (i + preferred_engine) % MR_num_threads;
+ for (i = 0; i < MR_num_ws_engines; i++) {
+ current_engine = (i + preferred_engine) % MR_num_ws_engines;
if (current_engine == MR_ENGINE(MR_eng_id)) {
/*
** Don't post superfluous events to ourself.
*/
continue;
}
- state = engine_sleep_sync_data[current_engine].d.es_state;
+ state = get_engine_sleep_sync_data(current_engine)->d.es_state;
if (state & valid_states) {
switch (state) {
case ENGINE_STATE_SLEEPING:
@@ -1778,7 +1829,7 @@ try_wake_engine(MR_EngineId engine_id, int action,
union MR_engine_wake_action_data *action_data)
{
MR_bool success = MR_FALSE;
- engine_sleep_sync *esync = &(engine_sleep_sync_data[engine_id]);
+ engine_sleep_sync *esync = get_engine_sleep_sync_data(engine_id);
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
@@ -1794,14 +1845,17 @@ try_wake_engine(MR_EngineId engine_id, int action,
*/
MR_LOCK(&(esync->d.es_wake_lock), "try_wake_engine, wake_lock");
if (esync->d.es_state == ENGINE_STATE_SLEEPING) {
- MR_atomic_dec_int(&MR_num_idle_engines);
+ /* XXX not sure about this */
+ if (engine_id < MR_num_ws_engines) {
+ MR_atomic_dec_int(&MR_num_idle_ws_engines);
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Decrement MR_num_idle_engines %"
+ fprintf(stderr, "%ld Decrement MR_num_idle_ws_engines %"
MR_INTEGER_LENGTH_MODIFIER "d\n",
- MR_SELF_THREAD_ID, MR_num_idle_engines);
+ MR_SELF_THREAD_ID, MR_num_idle_ws_engines);
}
#endif
+ }
/*
** We now KNOW that the engine is in one of the correct states.
@@ -1836,7 +1890,7 @@ MR_bool
try_notify_engine(MR_EngineId engine_id, int action,
union MR_engine_wake_action_data *action_data, MR_Unsigned engine_state)
{
- engine_sleep_sync *esync = &(engine_sleep_sync_data[engine_id]);
+ engine_sleep_sync *esync = get_engine_sleep_sync_data(engine_id);
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
@@ -1863,12 +1917,13 @@ try_notify_engine(MR_EngineId engine_id, int action,
** The engine was idle if it was in the stealing state.
** It is not idle anymore so fixup the count.
*/
- MR_atomic_dec_int(&MR_num_idle_engines);
+ MR_assert(engine_id < MR_num_ws_engines);
+ MR_atomic_dec_int(&MR_num_idle_ws_engines);
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Decrement MR_num_idle_engines %"
+ fprintf(stderr, "%ld Decrement MR_num_idle_ws_engines %"
MR_INTEGER_LENGTH_MODIFIER "d\n",
- MR_SELF_THREAD_ID, MR_num_idle_engines);
+ MR_SELF_THREAD_ID, MR_num_idle_ws_engines);
}
#endif
}
@@ -1899,18 +1954,18 @@ try_notify_engine(MR_EngineId engine_id, int action,
}
void
-MR_shutdown_all_engines(void)
+MR_shutdown_ws_engines(void)
{
int i;
MR_bool result;
- for (i = 0; i < MR_num_threads; i++) {
- engine_sleep_sync *esync = &(engine_sleep_sync_data[i]);
+ for (i = 0; i < MR_num_ws_engines; i++) {
if (i == MR_ENGINE(MR_eng_id)) {
continue;
}
while (1) {
+ engine_sleep_sync *esync = get_engine_sleep_sync_data(i);
MR_Unsigned state = esync->d.es_state;
/*
@@ -1944,8 +1999,8 @@ MR_shutdown_all_engines(void)
}
}
- for (i = 0; i < (MR_num_threads - 1); i++) {
- MR_SEM_WAIT(&shutdown_semaphore, "MR_shutdown_all_engines");
+ for (i = 0; i < (MR_num_ws_engines - 1); i++) {
+ MR_SEM_WAIT(&shutdown_ws_semaphore, "MR_shutdown_ws_engines");
}
}
@@ -1964,7 +2019,7 @@ MR_shutdown_all_engines(void)
#ifdef MR_THREAD_SAFE
static void
-action_shutdown_engine(void);
+action_shutdown_ws_engine(void);
static MR_Code*
action_worksteal(MR_EngineId victim_engine_id);
@@ -2037,8 +2092,7 @@ MR_define_entry(MR_do_idle);
#ifdef MR_THREAD_SAFE
MR_Code *jump_target;
MR_EngineId engine_id = MR_ENGINE(MR_eng_id);
- engine_sleep_sync *esync =
- &(engine_sleep_sync_data[MR_ENGINE(MR_eng_id)]);
+ engine_sleep_sync *esync = get_engine_sleep_sync_data(engine_id);
/*
** We can set the idle status without a compare and swap. There are no
@@ -2071,7 +2125,12 @@ MR_define_entry(MR_do_idle);
/*
** TODO: Use multiple entry points into a single MODULE structure.
*/
+ if (MR_ENGINE(MR_eng_type) == MR_ENGINE_TYPE_SHARED) {
MR_GOTO(MR_ENTRY(MR_do_idle_worksteal));
+ } else {
+ MR_GOTO(MR_ENTRY(MR_do_sleep));
+ }
+
#else /* !MR_THREAD_SAFE */
/*
** When an engine becomes idle in a non parallel grade, it simply picks up
@@ -2105,8 +2164,10 @@ MR_define_entry(MR_do_idle_worksteal);
{
MR_Code *jump_target;
MR_EngineId engine_id = MR_ENGINE(MR_eng_id);
- engine_sleep_sync *esync =
- &(engine_sleep_sync_data[engine_id]);
+ engine_sleep_sync *esync = get_engine_sleep_sync_data(engine_id);
+
+ /* Only work-stealing engines beyond this point. */
+ MR_assert(MR_ENGINE(MR_eng_type) == MR_ENGINE_TYPE_SHARED);
if (!MR_compare_and_swap_uint(&(esync->d.es_state), ENGINE_STATE_IDLE,
ENGINE_STATE_STEALING)) {
@@ -2119,7 +2180,7 @@ MR_define_entry(MR_do_idle_worksteal);
*/
switch (esync->d.es_action) {
case MR_ENGINE_ACTION_SHUTDOWN:
- action_shutdown_engine();
+ action_shutdown_ws_engine();
case MR_ENGINE_ACTION_CONTEXT_ADVICE:
MR_GOTO(MR_ENTRY(MR_do_idle));
@@ -2151,21 +2212,21 @@ MR_define_entry(MR_do_idle_worksteal);
** The compare and swap must be visible before the increment.
*/
MR_CPU_SFENCE;
- MR_atomic_inc_int(&MR_num_idle_engines);
+ MR_atomic_inc_int(&MR_num_idle_ws_engines);
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Increment MR_num_idle_engines %d\n",
- MR_SELF_THREAD_ID, MR_num_idle_engines);
+ fprintf(stderr, "%ld Increment MR_num_idle_ws_engines %d\n",
+ MR_SELF_THREAD_ID, MR_num_idle_ws_engines);
}
#endif
jump_target = do_work_steal();
if (jump_target != NULL) {
- MR_atomic_dec_int(&MR_num_idle_engines);
+ MR_atomic_dec_int(&MR_num_idle_ws_engines);
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Decrement MR_num_idle_engines %d\n",
- MR_SELF_THREAD_ID, MR_num_idle_engines);
+ fprintf(stderr, "%ld Decrement MR_num_idle_ws_engines %d\n",
+ MR_SELF_THREAD_ID, MR_num_idle_ws_engines);
}
#endif
MR_CPU_SFENCE;
@@ -2194,9 +2255,9 @@ MR_BEGIN_CODE
MR_define_entry(MR_do_sleep);
{
MR_EngineId engine_id = MR_ENGINE(MR_eng_id);
- engine_sleep_sync *esync =
- &(engine_sleep_sync_data[MR_ENGINE(MR_eng_id)]);
+ engine_sleep_sync *esync = get_engine_sleep_sync_data(engine_id);
+ MR_Unsigned in_state;
unsigned action;
int result;
MR_Code *jump_target;
@@ -2206,7 +2267,15 @@ MR_define_entry(MR_do_sleep);
struct timeval tv;
#endif
- if (MR_compare_and_swap_uint(&(esync->d.es_state), ENGINE_STATE_STEALING,
+ /*
+ ** Shared engines and exclusive engines enter via different states.
+ */
+ if (MR_ENGINE(MR_eng_type) == MR_ENGINE_TYPE_SHARED) {
+ in_state = ENGINE_STATE_STEALING;
+ } else {
+ in_state = ENGINE_STATE_IDLE;
+ }
+ if (MR_compare_and_swap_uint(&(esync->d.es_state), in_state,
ENGINE_STATE_SLEEPING)) {
/*
** We have permission to sleep, and must commit to sleeping.
@@ -2237,9 +2306,7 @@ retry_sleep:
}
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
- result = sem_timedwait(
- &(esync->d.es_sleep_semaphore),
- &ts);
+ result = sem_timedwait(&(esync->d.es_sleep_semaphore), &ts);
#else
result = sem_wait(&(esync->d.es_sleep_semaphore));
#endif
@@ -2255,7 +2322,7 @@ retry_sleep:
*/
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Engine sleep interrupted\n",
+ fprintf(stderr, "%ld Engine %d sleep interrupted\n",
MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
}
#endif
@@ -2266,7 +2333,7 @@ retry_sleep:
*/
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Engine sleep timed out\n",
+ fprintf(stderr, "%ld Engine %d sleep timed out\n",
MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
}
#endif
@@ -2287,11 +2354,12 @@ retry_sleep:
} else {
jump_target = do_work_steal();
if (jump_target != NULL) {
- MR_atomic_dec_int(&MR_num_idle_engines);
+ MR_atomic_dec_int(&MR_num_idle_ws_engines);
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Decrement MR_num_idle_engines %d\n",
- MR_SELF_THREAD_ID, MR_num_idle_engines);
+ fprintf(stderr,
+ "%ld Decrement MR_num_idle_ws_engines %d\n",
+ MR_SELF_THREAD_ID, MR_num_idle_ws_engines);
}
#endif
MR_CPU_SFENCE;
@@ -2337,16 +2405,23 @@ retry_sleep:
switch (action) {
case MR_ENGINE_ACTION_SHUTDOWN:
- action_shutdown_engine();
+ if (MR_ENGINE(MR_eng_type) == MR_ENGINE_TYPE_SHARED) {
+ action_shutdown_ws_engine();
+ } else {
+ fprintf(stderr, "Mercury runtime: Exclusive engine %d "
+ "received shutdown action\n", MR_ENGINE(MR_eng_id));
+ }
+ break;
case MR_ENGINE_ACTION_WORKSTEAL_ADVICE:
+ if (MR_ENGINE(MR_eng_type) == MR_ENGINE_TYPE_SHARED) {
jump_target = action_worksteal(
esync->d.es_action_data.MR_ewa_worksteal_engine);
if (jump_target != NULL) {
MR_GOTO(jump_target);
- } else {
- MR_GOTO(MR_ENTRY(MR_do_idle));
}
+ }
+ MR_GOTO(MR_ENTRY(MR_do_idle));
case MR_ENGINE_ACTION_CONTEXT:
MR_GOTO(action_context(esync->d.es_action_data.MR_ewa_context));
@@ -2357,18 +2432,19 @@ retry_sleep:
case MR_ENGINE_ACTION_NONE:
default:
fprintf(stderr,
- "Mercury runtime: Engine woken with no action\n");
+ "Mercury runtime: Engine %d woken with no action\n",
+ MR_ENGINE(MR_eng_id));
break;
} /* Switch on action */
/*
- ** Each case ends with a GOTO, so execution cannot reach here
+ ** Each valid case ends with a GOTO, so execution cannot reach here
*/
abort();
}
MR_END_MODULE
static void
-action_shutdown_engine(void)
+action_shutdown_ws_engine(void)
{
MR_EngineId engine_id = MR_ENGINE(MR_eng_id);
@@ -2381,10 +2457,12 @@ action_shutdown_engine(void)
/*
** The primordial thread has the responsibility of cleaning
** up the Mercury runtime. It cannot exit by this route.
+ ** Exclusive engines also do not exit by this route.
*/
- MR_assert(engine_id != 0);
- MR_destroy_thread(MR_cur_engine());
- MR_SEM_POST(&shutdown_semaphore, "MR_do_sleep shutdown_sem");
+ assert(engine_id != 0);
+ assert(MR_ENGINE(MR_eng_type) == MR_ENGINE_TYPE_SHARED);
+ MR_finalize_thread_engine();
+ MR_SEM_POST(&shutdown_ws_semaphore, "MR_do_sleep shutdown_sem");
pthread_exit(0);
}
@@ -2394,13 +2472,15 @@ action_worksteal(MR_EngineId victim_engine_id)
MR_SparkDeque *victim;
int steal_result;
MR_Spark spark;
- engine_sleep_sync *esync =
- &(engine_sleep_sync_data[MR_ENGINE(MR_eng_id)]);
+ engine_sleep_sync *esync;
+
+ MR_assert(MR_ENGINE(MR_eng_type) == MR_ENGINE_TYPE_SHARED);
+ esync = get_engine_sleep_sync_data(MR_ENGINE(MR_eng_id));
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Engine %d workstealing\n",
- MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id));
+ fprintf(stderr, "%ld Engine %d workstealing, victim %d\n",
+ MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id), victim_engine_id);
}
#endif
@@ -2447,9 +2527,9 @@ static MR_Code*
action_context(MR_Context *context)
{
MR_Code *resume_point;
- engine_sleep_sync *esync =
- &(engine_sleep_sync_data[MR_ENGINE(MR_eng_id)]);
+ engine_sleep_sync *esync;
+ esync = get_engine_sleep_sync_data(MR_ENGINE(MR_eng_id));
esync->d.es_state = ENGINE_STATE_WORKING;
prepare_engine_for_context(context);
@@ -2508,8 +2588,8 @@ do_get_context(void)
#endif
#ifdef MR_DEBUG_THREADS
if (MR_debug_threads) {
- fprintf(stderr, "%ld Resuming context %p\n",
- MR_SELF_THREAD_ID, ready_context);
+ fprintf(stderr, "%ld Engine %d resuming context %p\n",
+ MR_SELF_THREAD_ID, MR_ENGINE(MR_eng_id), ready_context);
}
#endif
@@ -2524,7 +2604,8 @@ do_get_context(void)
}
static void
-prepare_engine_for_context(MR_Context *context) {
+prepare_engine_for_context(MR_Context *context)
+{
/*
** Discard whatever unused context we may have, and switch to the new one.
*/
@@ -2540,6 +2621,10 @@ prepare_engine_for_context(MR_Context *context) {
MR_save_context(MR_ENGINE(MR_eng_this_context));
MR_release_context(MR_ENGINE(MR_eng_this_context));
}
+
+ MR_assert(context->MR_ctxt_exclusive_engine == MR_ENGINE_ID_NONE
+ || context->MR_ctxt_exclusive_engine == MR_ENGINE(MR_eng_id));
+
MR_ENGINE(MR_eng_this_context) = context;
MR_load_context(context);
#ifdef MR_THREADSCOPE
@@ -2613,14 +2698,14 @@ do_local_spark(MR_Code *join_label)
MR_threadscope_post_looking_for_local_spark();
#endif
- spark = MR_wsdeque_pop_bottom(&MR_ENGINE(MR_eng_spark_deque));
+ spark = MR_wsdeque_pop_bottom(MR_ENGINE(MR_eng_spark_deque));
if (NULL == spark) {
return NULL;
}
/*
** The current context may be dirty and incompatible with this spark, if
- ** so we put the spark back ondo the deque. This test is only
+ ** so we put the spark back onto the deque. This test is only
** applicable when running a local spark.
**
** Our caller will then save the context and look for a different
@@ -2632,7 +2717,7 @@ do_local_spark(MR_Code *join_label)
(spark->MR_spark_sync_term->MR_st_orig_context != this_context))
{
/* The cast discards the volatile qualifier, which is okay */
- MR_wsdeque_putback_bottom(&MR_ENGINE(MR_eng_spark_deque),
+ MR_wsdeque_putback_bottom(MR_ENGINE(MR_eng_spark_deque),
(MR_Spark*) spark);
return NULL;
}
@@ -2651,6 +2736,10 @@ do_work_steal(void)
{
MR_Spark spark;
+ if (MR_ENGINE(MR_eng_type) != MR_ENGINE_TYPE_SHARED) {
+ return NULL;
+ }
+
#ifdef MR_THREADSCOPE
MR_threadscope_post_work_stealing();
#endif
@@ -2683,13 +2772,14 @@ do_work_steal(void)
}
static void
-save_dirty_context(MR_Code *join_label) {
+save_dirty_context(MR_Code *join_label)
+{
MR_Context *this_context = MR_ENGINE(MR_eng_this_context);
#ifdef MR_THREADSCOPE
MR_threadscope_post_stop_context(MR_TS_STOP_REASON_BLOCKED);
#endif
- this_context->MR_ctxt_resume_owner_engine = MR_ENGINE(MR_eng_id);
+ this_context->MR_ctxt_resume_engine = MR_ENGINE(MR_eng_id);
MR_save_context(this_context);
/*
** Make sure the context gets saved before we set the join label,
@@ -2776,7 +2866,7 @@ MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
#ifdef MR_THREADSCOPE
MR_threadscope_post_looking_for_local_spark();
#endif
- spark = MR_wsdeque_pop_bottom(&MR_ENGINE(MR_eng_spark_deque));
+ spark = MR_wsdeque_pop_bottom(MR_ENGINE(MR_eng_spark_deque));
if (spark != NULL) {
if ((this_context == jnc_st->MR_st_orig_context) &&
(spark->MR_spark_sync_term != jnc_st)) {
@@ -2794,7 +2884,7 @@ MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
** There might be a suspended context. We should try
** to execute that.
*/
- MR_wsdeque_putback_bottom(&MR_ENGINE(MR_eng_spark_deque),
+ MR_wsdeque_putback_bottom(MR_ENGINE(MR_eng_spark_deque),
(MR_Spark*) spark);
return MR_ENTRY(MR_do_idle);
}
diff --git a/runtime/mercury_context.h b/runtime/mercury_context.h
index 47cb294..e033ef4 100644
--- a/runtime/mercury_context.h
+++ b/runtime/mercury_context.h
@@ -113,23 +113,30 @@
** context in the runqueue.
** (Accessed only when directly specifying the context.)
**
+** exclusive_engine
+** Either MR_ENGINE_ID_NONE, or else the exclusive engine
+** that this context belongs to. A context with an exclusive
+** engine may only be run on that engine. This restriction
+** may be relaxed in the future so that it only applies when
+** entering some foreign procs.
+** (Accessed only when directly specifying the context.)
+**
** resume A pointer to the code at which execution should resume
** when this context is next scheduled.
** (Accessed via MR_eng_this_context.)
**
-** resume_owner_engine
-** When resuming a context this is the engine that it
-** prefers to be resumed on, Doing so can avoid cache misses
-** as the engine's cache may already be warm.
+** resume_engine
+** When resuming a context this is the engine that it prefers
+** or is required to be resumed on. Doing so can avoid cache
+** misses as the engine's cache may already be warm.
** (Accessed only when directly specifying the context.)
**
** resume_engine_required
** resume_c_depth
** These fields are used to ensure that when we enter a
** Mercury engine from C, we return to the same engine. If
-** resume_engine_required is MR_FALSE then this context can be
-** executed by any engine and resume_owner_engine is simply a
-** preference. Otherwise the resume_owner_engine and
+** resume_engine_required is MR_FALSE then resume_engine is
+** simply a preference. Otherwise the resume_engine and
** resume_c_depth must match the engine's id and c_depth. See
** the comments in mercury_engine.h. (Both accessed only when
** directly specifying the context.)
@@ -226,16 +233,15 @@ typedef enum {
#endif
#ifdef MR_THREAD_SAFE
-typedef struct MR_SavedOwner_Struct MR_SavedOwner;
+typedef struct MR_ResumeStack_Struct MR_ResumeStack;
-struct MR_SavedOwner_Struct {
- MR_EngineId MR_saved_owner_engine;
- MR_Unsigned MR_saved_owner_c_depth;
- MR_SavedOwner *MR_saved_owner_next;
+struct MR_ResumeStack_Struct {
+ MR_EngineId MR_resume_engine;
+ MR_Unsigned MR_resume_c_depth;
+ MR_ResumeStack *MR_resume_stack_next;
};
#endif
-
#ifdef MR_LL_PARALLEL_CONJ
typedef struct MR_SyncTerm_Struct MR_SyncTerm;
typedef struct MR_Spark_Struct MR_Spark;
@@ -254,6 +260,7 @@ struct MR_Spark_Struct {
MR_Code *MR_spark_resume;
MR_ThreadLocalMuts *MR_spark_thread_local_mutables;
#ifdef MR_THREADSCOPE
+ /* XXX this is not wide enough for higher engine ids */
MR_uint_least32_t MR_spark_id;
#endif
};
@@ -264,7 +271,7 @@ struct MR_Spark_Struct {
struct MR_SparkDeque_Struct {
/*
- ** The top index is modified by theifs, the other fields are modified by
+ ** The top index is modified by thiefs; the other fields are modified by
** the owner. Therefore we pad out the structure to reduce false
** sharing.
*/
@@ -292,10 +299,11 @@ struct MR_Context_Struct {
MR_Code *MR_ctxt_resume;
#endif
#ifdef MR_THREAD_SAFE
- MR_EngineId MR_ctxt_resume_owner_engine;
+ MR_EngineId MR_ctxt_exclusive_engine;
+ MR_EngineId MR_ctxt_resume_engine;
MR_bool MR_ctxt_resume_engine_required;
MR_Unsigned MR_ctxt_resume_c_depth;
- MR_SavedOwner *MR_ctxt_saved_owners;
+ MR_ResumeStack *MR_ctxt_resume_stack;
#endif
#ifndef MR_HIGHLEVEL_CODE
@@ -417,20 +425,13 @@ extern MR_PendingContext *MR_pending_contexts;
#ifdef MR_LL_PARALLEL_CONJ
/*
- ** The number of engines waiting for work.
+ ** The number of work-stealing engines waiting for work.
** We don't protect it with a separate lock, but updates to it are made while
** holding the MR_runqueue_lock. Reads are made without the lock.
** XXX We may need to use atomic instructions or memory fences on some
** architectures.
*/
- extern volatile MR_Integer MR_num_idle_engines;
-
- /*
- ** The number of engines that have exited so far. We can spin on this to
- ** make sure that our engines have exited before finalizing some global
- ** resources.
- */
- extern volatile MR_Unsigned MR_num_exited_engines;
+ extern volatile MR_Integer MR_num_idle_ws_engines;
/*
** Spark deques for work stealing, These are made visible so that they can
@@ -500,10 +501,10 @@ extern MR_Unsigned MR_primordial_thread_cpu;
#endif
/*
-** Shutdown all the engines.
+** Shutdown all the work-stealing engines.
*/
extern void
-MR_shutdown_all_engines(void);
+MR_shutdown_ws_engines(void);
#endif
/*
@@ -829,9 +830,6 @@ do { \
MR_IF_THREADSCOPE( \
MR_uint_least32_t id; \
) \
- MR_IF_NOT_WORKSTEAL_POLLING( \
- union MR_engine_wake_action_data action_data; \
- ) \
\
fnc_spark.MR_spark_sync_term = (MR_SyncTerm*) &(sync_term); \
fnc_spark.MR_spark_resume = (child); \
@@ -840,15 +838,18 @@ do { \
id = MR_ENGINE(MR_eng_next_spark_id)++; \
fnc_spark.MR_spark_id = (engine_id << 24)|(id & 0xFFFFFF); \
) \
- fnc_deque = &MR_ENGINE(MR_eng_spark_deque); \
+ fnc_deque = MR_ENGINE(MR_eng_spark_deque); \
MR_wsdeque_push_bottom(fnc_deque, &fnc_spark); \
MR_IF_THREADSCOPE( \
MR_threadscope_post_sparking(&(sync_term), fnc_spark.MR_spark_id); \
) \
MR_IF_NOT_WORKSTEAL_POLLING( \
+ if (MR_ENGINE(MR_eng_this_context)->MR_ctxt_exclusive_engine \
+ == MR_ENGINE_ID_NONE && MR_num_idle_ws_engines > 0) \
+ { \
+ union MR_engine_wake_action_data action_data; \
action_data.MR_ewa_worksteal_engine = MR_ENGINE(MR_eng_id); \
- if (MR_num_idle_engines > 0) { \
- MR_try_wake_an_engine(MR_ENGINE(MR_eng_id), \
+ MR_try_wake_ws_engine(MR_ENGINE(MR_eng_id), \
MR_ENGINE_ACTION_WORKSTEAL_ADVICE, \
&action_data, NULL); \
} \
@@ -873,7 +874,7 @@ do { \
** the first-level cache.
*/
#define MR_par_cond_local_wsdeque_length \
- (MR_wsdeque_length(&MR_ENGINE(MR_eng_spark_deque)) < \
+ (MR_wsdeque_length(MR_ENGINE(MR_eng_spark_deque)) < \
MR_granularity_wsdeque_length)
extern MR_Code*
@@ -922,7 +923,7 @@ union MR_engine_wake_action_data {
};
/*
-** Try to wake a sleeping engine.
+** Try to wake a sleeping work-stealing engine.
**
** preferred_engine - The engine we'd like to wake up, a nearby engine will
** often be chosen so it's okay to name the current engine
@@ -939,8 +940,9 @@ union MR_engine_wake_action_data {
** This returns MR_TRUE if successful MR_FALSE otherwise.
*/
MR_bool
-MR_try_wake_an_engine(MR_EngineId perferred_engine, int action,
+MR_try_wake_ws_engine(MR_EngineId perferred_engine, int action,
union MR_engine_wake_action_data *action_data, MR_EngineId *target_engine);
+
#ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
/*
diff --git a/runtime/mercury_engine.c b/runtime/mercury_engine.c
index 3aca652..88bf7f1 100644
--- a/runtime/mercury_engine.c
+++ b/runtime/mercury_engine.c
@@ -144,13 +144,16 @@ MR_init_engine(MercuryEngine *eng)
#endif /* !MR_CONSERVATIVE_GC */
#ifdef MR_THREAD_SAFE
+ /* The caller must initialise id and type. */
+ eng->MR_eng_id = MR_ENGINE_ID_NONE;
+ eng->MR_eng_type = MR_ENGINE_TYPE_SHARED;
eng->MR_eng_owner_thread = pthread_self();
eng->MR_eng_c_depth = 0;
#endif
#ifdef MR_LL_PARALLEL_CONJ
- MR_init_wsdeque(&(eng->MR_eng_spark_deque),
- MR_INITIAL_SPARK_DEQUE_SIZE);
+ eng->MR_eng_spark_deque = MR_GC_NEW(MR_SparkDeque);
+ MR_init_wsdeque(eng->MR_eng_spark_deque, MR_INITIAL_SPARK_DEQUE_SIZE);
#endif
/*
@@ -161,6 +164,10 @@ MR_init_engine(MercuryEngine *eng)
/*---------------------------------------------------------------------------*/
+/*
+** The engine must be removed from MR_all_engine_bases BEFORE calling this
+** function.
+*/
void MR_finalize_engine(MercuryEngine *eng)
{
/*
@@ -489,14 +496,14 @@ dummy_label:
MR_ENGINE(MR_eng_c_depth)++;
if (MR_ENGINE(MR_eng_this_context) != NULL) {
- MR_SavedOwner *owner;
+ MR_ResumeStack *elem;
- owner = MR_GC_NEW_ATTRIB(MR_SavedOwner, MR_ALLOC_SITE_RUNTIME);
- owner->MR_saved_owner_engine = MR_ENGINE(MR_eng_id);
- owner->MR_saved_owner_c_depth = MR_ENGINE(MR_eng_c_depth);
- owner->MR_saved_owner_next =
- MR_ENGINE(MR_eng_this_context)->MR_ctxt_saved_owners;
- MR_ENGINE(MR_eng_this_context)->MR_ctxt_saved_owners = owner;
+ elem = MR_GC_NEW_ATTRIB(MR_ResumeStack, MR_ALLOC_SITE_RUNTIME);
+ elem->MR_resume_engine = MR_ENGINE(MR_eng_id);
+ elem->MR_resume_c_depth = MR_ENGINE(MR_eng_c_depth);
+ elem->MR_resume_stack_next =
+ MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume_stack;
+ MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume_stack = elem;
}
#endif
@@ -518,16 +525,16 @@ MR_define_label(engine_done);
#ifdef MR_THREAD_SAFE
{
MR_Context *this_ctxt;
- MR_SavedOwner *owner;
+ MR_ResumeStack *elem;
this_ctxt = MR_ENGINE(MR_eng_this_context);
- owner = this_ctxt->MR_ctxt_saved_owners;
- this_ctxt->MR_ctxt_saved_owners = owner->MR_saved_owner_next;
+ elem = this_ctxt->MR_ctxt_resume_stack;
+ this_ctxt->MR_ctxt_resume_stack = elem->MR_resume_stack_next;
- if ((owner->MR_saved_owner_engine == MR_ENGINE(MR_eng_id)) &&
- owner->MR_saved_owner_c_depth == MR_ENGINE(MR_eng_c_depth))
+ if ((elem->MR_resume_engine == MR_ENGINE(MR_eng_id)) &&
+ elem->MR_resume_c_depth == MR_ENGINE(MR_eng_c_depth))
{
- MR_GC_free_attrib(owner);
+ MR_GC_free_attrib(elem);
MR_GOTO_LABEL(engine_done_2);
}
@@ -536,10 +543,10 @@ MR_define_label(engine_done);
#endif
MR_save_context(this_ctxt);
this_ctxt->MR_ctxt_resume = MR_LABEL(engine_done_2);
- this_ctxt->MR_ctxt_resume_owner_engine = owner->MR_saved_owner_engine;
- this_ctxt->MR_ctxt_resume_c_depth = owner->MR_saved_owner_c_depth;
this_ctxt->MR_ctxt_resume_engine_required = MR_TRUE;
- MR_GC_free_attrib(owner);
+ this_ctxt->MR_ctxt_resume_engine = elem->MR_resume_engine;
+ this_ctxt->MR_ctxt_resume_c_depth = elem->MR_resume_c_depth;
+ MR_GC_free_attrib(elem);
MR_schedule_context(this_ctxt);
MR_ENGINE(MR_eng_this_context) = NULL;
diff --git a/runtime/mercury_engine.h b/runtime/mercury_engine.h
index ac49580..5fcc32a 100644
--- a/runtime/mercury_engine.h
+++ b/runtime/mercury_engine.h
@@ -337,6 +337,11 @@ typedef struct {
** generators to be created without redoing the work required
** to allocate a new context.
**
+** id The ID of this engine. It is used to index into some runtime
+** structures. It is also used for threadscope.
+**
+** type The type of engine this is.
+**
** owner_thread
** c_depth
** These fields are used to ensure that when a thread
@@ -360,8 +365,6 @@ typedef struct {
** ts_buffer
** The buffer object used by threadscope for this engine.
**
-** id The ID of this engine which is used by threadscope.
-**
** next_spark_id
** In threadscope grades sparks are given IDs to help us track
** them. This and MR_eng_id is used to allocate unique IDs.
@@ -424,6 +427,7 @@ typedef struct MR_mercury_engine_struct {
#endif
#ifdef MR_THREAD_SAFE
MR_EngineId MR_eng_id;
+ MR_EngineType MR_eng_type;
MercuryThread MR_eng_owner_thread;
MR_Unsigned MR_eng_c_depth;
#ifdef MR_THREADSCOPE
@@ -437,7 +441,7 @@ typedef struct MR_mercury_engine_struct {
MR_uint_least32_t MR_eng_next_spark_id;
#endif
#ifdef MR_LL_PARALLEL_CONJ
- MR_SparkDeque MR_eng_spark_deque;
+ MR_SparkDeque *MR_eng_spark_deque;
MR_EngineId MR_eng_victim_counter;
#endif
#endif
@@ -509,16 +513,6 @@ typedef struct MR_mercury_engine_struct {
#define MR_cur_engine() ((MercuryEngine *) MR_engine_base)
#define MR_get_engine() ((MercuryEngine *) MR_thread_engine_base)
- #ifndef MR_HIGHLEVEL_CODE
- /*
- ** This points to an array containing MR_num_threads pointers to
- ** Mercury engines. The first item in the array is the primordial thread.
- ** During initialisation, the array may be a null pointer, as may be
- ** any pointer inside.
- */
- extern MercuryEngine **MR_all_engine_bases;
- #endif
-
#else /* !MR_THREAD_SAFE */
extern MercuryEngine MR_engine_base;
@@ -532,6 +526,8 @@ typedef struct MR_mercury_engine_struct {
#define MR_MAYBE_INIT_LOCAL_THREAD_ENGINE_BASE
#endif
+#define MR_ENGINE_ID_NONE ((MR_EngineId) -1)
+
#define MR_CONTEXT(x) (MR_ENGINE(MR_eng_context).x)
#ifndef MR_CONSERVATIVE_GC
diff --git a/runtime/mercury_memory_handlers.c b/runtime/mercury_memory_handlers.c
index 6d455ce..bdf8cea 100644
--- a/runtime/mercury_memory_handlers.c
+++ b/runtime/mercury_memory_handlers.c
@@ -1037,7 +1037,7 @@ leave_signal_handler(int sig)
#if defined(MR_THREAD_SAFE) && defined(MR_THREADSCOPE)
if (MR_all_engine_bases) {
int i;
- for (i = 0; i < MR_num_threads; i++) {
+ for (i = 0; i < MR_max_engines; i++) {
if (MR_all_engine_bases[i] &&
MR_all_engine_bases[i]->MR_eng_ts_buffer)
{
diff --git a/runtime/mercury_memory_zones.c b/runtime/mercury_memory_zones.c
index c2285ac..dfd8475 100644
--- a/runtime/mercury_memory_zones.c
+++ b/runtime/mercury_memory_zones.c
@@ -985,16 +985,21 @@ static MR_bool MR_should_gc_memory_zones(void);
static MR_bool MR_should_stop_gc_memory_zones(void);
/*
-** TODO: These should be controlable via MERCURY_OPTIONS
+** TODO: These should be controllable via MERCURY_OPTIONS
*/
+#if defined(MR_THREAD_SAFE) && !defined(MR_HIGHLEVEL_CODE)
+ #define THREAD_COUNT (MR_num_ws_engines + MR_thread_barrier_count)
+#else
+ #define THREAD_COUNT (1 + MR_thread_barrier_count)
+#endif
/* 16 zones per thread */
-#define MR_FREE_MEMORY_ZONES_NUM_HIGH (16*MR_num_threads)
+#define MR_FREE_MEMORY_ZONES_NUM_HIGH (16*THREAD_COUNT)
/* 4 zones per thread */
-#define MR_FREE_MEMORY_ZONES_NUM_LOW (4*MR_num_threads)
+#define MR_FREE_MEMORY_ZONES_NUM_LOW (4*THREAD_COUNT)
/* 16MB per thread */
-#define MR_FREE_MEMORY_ZONES_PAGES_HIGH (((16*1024*1024)/MR_page_size)*MR_num_threads)
+#define MR_FREE_MEMORY_ZONES_PAGES_HIGH (((16*1024*1024)/MR_page_size)*THREAD_COUNT)
/* 4MB per thread */
-#define MR_FREE_MEMORY_ZONES_PAGES_LOW (((4*1024*1024)/MR_page_size)*MR_num_threads)
+#define MR_FREE_MEMORY_ZONES_PAGES_LOW (((4*1024*1024)/MR_page_size)*THREAD_COUNT)
static MR_MemoryZonesFree * MR_THREADSAFE_VOLATILE
free_memory_zones = NULL;
diff --git a/runtime/mercury_par_builtin.h b/runtime/mercury_par_builtin.h
index a6b86bb..ab46a6b 100644
--- a/runtime/mercury_par_builtin.h
+++ b/runtime/mercury_par_builtin.h
@@ -155,7 +155,7 @@ vim: ft=c ts=4 sw=4 et
\
ctxt->MR_ctxt_resume = \
MR_ENTRY(mercury__par_builtin__wait_resume); \
- ctxt->MR_ctxt_resume_owner_engine = \
+ ctxt->MR_ctxt_resume_engine = \
MR_ENGINE(MR_eng_id); \
ctxt->MR_ctxt_next = Future->MR_fut_suspended; \
Future->MR_fut_suspended = ctxt; \
@@ -392,7 +392,7 @@ extern MR_LoopControl *MR_lc_create(unsigned num_workers);
*/ \
if ((lc)->MR_lc_outstanding_workers != 0) { \
MR_save_context(MR_ENGINE(MR_eng_this_context)); \
- MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume_owner_engine = \
+ MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume_engine = \
MR_ENGINE(MR_eng_id); \
MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume = \
(part2_label); \
@@ -475,7 +475,7 @@ extern MR_Bool MR_lc_try_get_free_slot(MR_LoopControl *lc,
ctxt = MR_ENGINE(MR_eng_this_context); \
MR_save_context(ctxt); \
ctxt->MR_ctxt_resume = retry_label; \
- ctxt->MR_ctxt_resume_owner_engine = MR_ENGINE(MR_eng_id); \
+ ctxt->MR_ctxt_resume_engine = MR_ENGINE(MR_eng_id); \
(lc)->MR_lc_master_context = ctxt; \
MR_CPU_SFENCE; \
MR_US_UNLOCK(&((lc)->MR_lc_master_context_lock)); \
diff --git a/runtime/mercury_thread.c b/runtime/mercury_thread.c
index 99b5f68..33b1873 100644
--- a/runtime/mercury_thread.c
+++ b/runtime/mercury_thread.c
@@ -28,14 +28,11 @@
MercuryThreadKey MR_engine_base_key;
#endif
MercuryLock MR_global_lock;
- #ifndef MR_HIGHLEVEL_CODE
- static MercuryLock MR_next_engine_id_lock;
- static MR_EngineId MR_next_engine_id = 0;
- /*
- ** This array is indexed by engine id. No locking is necessary.
- */
+ #ifndef MR_HIGHLEVEL_CODE
+ static MercuryLock MR_all_engine_bases_lock;
MercuryEngine **MR_all_engine_bases = NULL;
+ static MR_EngineId MR_highest_engine_id;
#endif
#endif
@@ -55,12 +52,18 @@ MR_Integer MR_thread_barrier_count;
#endif
#ifdef MR_THREAD_SAFE
+static void
+MR_setup_engine_for_threads(MercuryEngine *eng, MR_EngineType engine_type);
+static void
+MR_shutdown_engine_for_threads(MercuryEngine *eng);
+#endif
+#ifdef MR_LL_PARALLEL_CONJ
static void *
-MR_create_thread_2(void *goal);
+MR_create_worksteal_thread_2(void *goal);
MercuryThread *
-MR_create_thread(MR_ThreadGoal *goal)
+MR_create_worksteal_thread(void)
{
MercuryThread *thread;
pthread_attr_t attrs;
@@ -76,7 +79,7 @@ MR_create_thread(MR_ThreadGoal *goal)
thread = MR_GC_NEW_ATTRIB(MercuryThread, MR_ALLOC_SITE_RUNTIME);
pthread_attr_init(&attrs);
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
- err = pthread_create(thread, &attrs, MR_create_thread_2, (void *) goal);
+ err = pthread_create(thread, &attrs, MR_create_worksteal_thread_2, NULL);
pthread_attr_destroy(&attrs);
#if 0
@@ -91,34 +94,30 @@ MR_create_thread(MR_ThreadGoal *goal)
}
static void *
-MR_create_thread_2(void *goal0)
+MR_create_worksteal_thread_2(void *arg)
{
- MR_ThreadGoal *goal;
-
- goal = (MR_ThreadGoal *) goal0;
- if (goal != NULL) {
- MR_init_thread(MR_use_now);
- (goal->func)(goal->arg);
- /* XXX: We should clean up the engine here */
- } else {
- MR_init_thread(MR_use_later);
- }
-
+ #ifdef MR_HAVE_THREAD_PINNING
+ /*
+ ** TODO: We may use the cpu value returned to determine which CPUs
+ ** which engines are on. This can help with some interesting work
+ ** stealing algorithms.
+ */
+ MR_pin_thread();
+ #endif
+ MR_init_thread_inner(MR_use_later, MR_ENGINE_TYPE_SHARED);
return NULL;
}
-#endif /* MR_THREAD_SAFE */
+#endif /* MR_LL_PARALLEL_CONJ */
+/*
+** This interface is used by generated code and thread.m.
+** Internal code should call MR_init_thread_inner.
+*/
MR_bool
MR_init_thread(MR_when_to_use when_to_use)
{
- MercuryEngine *eng;
-
#ifdef MR_THREAD_SAFE
- #if defined(MR_LL_PARALLEL_CONJ) && defined(MR_HAVE_THREAD_PINNING)
- unsigned cpu;
- #endif
-
/*
** Check to see whether there is already an engine that is initialized
** in this thread. If so we just return, there's nothing for us to do.
@@ -126,50 +125,29 @@ MR_init_thread(MR_when_to_use when_to_use)
if (MR_thread_engine_base != NULL) {
return MR_FALSE;
}
- #ifdef MR_LL_PARALLEL_CONJ
- switch (when_to_use) {
- case MR_use_later:
-#ifdef MR_HAVE_THREAD_PINNING
- cpu = MR_pin_thread();
-#endif
- break;
- case MR_use_now:
- /*
- ** Don't pin the primordial thread here, it's already been done.
- */
-#ifdef MR_HAVE_THREAD_PINNING
- cpu = MR_primordial_thread_cpu;
-#endif
- break;
+#endif /* MR_THREAD_SAFE */
+ assert(when_to_use == MR_use_now);
+ return MR_init_thread_inner(when_to_use, MR_ENGINE_TYPE_EXCLUSIVE);
+}
+
/*
- ** TODO: We may use the cpu value here to determine which CPUs which
- ** engines are on. This can help with some interesting work stealing
- ** algorithms.
+** Set up a Mercury engine in the current thread.
*/
- }
- #endif
-#endif
+MR_bool
+MR_init_thread_inner(MR_when_to_use when_to_use, MR_EngineType engine_type)
+{
+ MercuryEngine *eng;
+
eng = MR_create_engine();
#ifdef MR_THREAD_SAFE
+ MR_setup_engine_for_threads(eng, engine_type);
+ assert(MR_thread_engine_base == NULL);
MR_set_thread_engine_base(eng);
MR_restore_registers();
#ifdef MR_ENGINE_BASE_REGISTER
MR_engine_base_word = (MR_Word) eng;
#endif
- #ifndef MR_HIGHLEVEL_CODE
- MR_LOCK(&MR_next_engine_id_lock, "MR_init_thread");
- eng->MR_eng_id = MR_next_engine_id++;
- MR_UNLOCK(&MR_next_engine_id_lock, "MR_init_thread");
-
- eng->MR_eng_victim_counter = (eng->MR_eng_id + 1) % MR_num_threads;
-
- MR_all_engine_bases[eng->MR_eng_id] = eng;
- MR_spark_deques[eng->MR_eng_id] = &(eng->MR_eng_spark_deque);
- #ifdef MR_THREADSCOPE
- MR_threadscope_setup_engine(eng);
- #endif
- #endif
#else
MR_memcpy(&MR_engine_base, eng, sizeof(MercuryEngine));
MR_restore_registers();
@@ -227,9 +205,8 @@ MR_init_thread(MR_when_to_use when_to_use)
}
/*
-** Release resources associated with this thread.
+** Release resources associated with the Mercury engine for this thread.
*/
-
void
MR_finalize_thread_engine(void)
{
@@ -238,20 +215,89 @@ MR_finalize_thread_engine(void)
eng = MR_thread_engine_base;
MR_set_thread_engine_base(NULL);
+ MR_shutdown_engine_for_threads(eng);
MR_destroy_engine(eng);
#endif
}
#ifdef MR_THREAD_SAFE
+/*
+** Additional setup/shutdown of the engine for threads support.
+*/
-void
-MR_destroy_thread(void *eng0)
+static void
+MR_setup_engine_for_threads(MercuryEngine *eng, MR_EngineType engine_type)
{
- MercuryEngine *eng = eng0;
- MR_destroy_engine(eng);
+ #ifndef MR_HIGHLEVEL_CODE
+ MR_EngineId min;
+ MR_EngineId max;
+ MR_EngineId id;
+
+ MR_LOCK(&MR_all_engine_bases_lock, "MR_setup_engine_for_threads");
+
+ /* Allocate an engine id. */
+ if (engine_type == MR_ENGINE_TYPE_SHARED) {
+ min = 0;
+ max = MR_num_ws_engines;
+ } else {
+ min = MR_num_ws_engines;
+ max = MR_max_engines;
+ }
+ for (id = min; id < max; id++) {
+ if (MR_all_engine_bases[id] == NULL) {
+ break;
}
+ }
+ if (id == max) {
+ MR_fatal_error("exhausted engine ids");
+ }
+ if (MR_highest_engine_id < id) {
+ MR_highest_engine_id = id;
+ }
+
+ eng->MR_eng_id = id;
+ eng->MR_eng_type = engine_type;
+ eng->MR_eng_victim_counter = (id + 1) % MR_num_ws_engines;
+
+ MR_all_engine_bases[id] = eng;
+ MR_spark_deques[id] = eng->MR_eng_spark_deque;
+ #ifdef MR_THREADSCOPE
+ MR_threadscope_setup_engine(eng);
+ #endif
+
+ MR_UNLOCK(&MR_all_engine_bases_lock, "MR_setup_engine_for_threads");
#endif
+}
+
+static void
+MR_shutdown_engine_for_threads(MercuryEngine *eng)
+{
+ #ifndef MR_HIGHLEVEL_CODE
+ MR_EngineId id = eng->MR_eng_id;
+
+ MR_LOCK(&MR_all_engine_bases_lock, "MR_shutdown_engine_for_threads");
+
+ assert(MR_all_engine_bases[id] == eng);
+ MR_all_engine_bases[id] = NULL;
+
+ if (MR_highest_engine_id == id) {
+ int i;
+ for (i = id - 1; i >= 0; i--) {
+ if (MR_all_engine_bases[i] != NULL) {
+ MR_highest_engine_id = (MR_EngineId) i;
+ break;
+ }
+ }
+ }
+
+ assert(MR_spark_deques[id] == eng->MR_eng_spark_deque);
+ MR_spark_deques[id] = NULL;
+
+ MR_UNLOCK(&MR_all_engine_bases_lock, "MR_shutdown_engine_for_threads");
+ #endif
+}
+#endif /* MR_THREAD_SAFE */
#if defined(MR_THREAD_SAFE)
/*
@@ -476,9 +522,10 @@ MR_init_thread_stuff(void)
#endif
#ifndef MR_HIGHLEVEL_CODE
- pthread_mutex_init(&MR_next_engine_id_lock, MR_MUTEX_ATTR);
- MR_all_engine_bases = MR_GC_malloc(sizeof(MercuryEngine*)*MR_num_threads);
- for (i = 0; i < MR_num_threads; i++) {
+ pthread_mutex_init(&MR_all_engine_bases_lock, MR_MUTEX_ATTR);
+ MR_all_engine_bases =
+ MR_GC_malloc(sizeof(MercuryEngine *) * MR_max_engines);
+ for (i = 0; i < MR_max_engines; i++) {
MR_all_engine_bases[i] = NULL;
}
#endif
diff --git a/runtime/mercury_thread.h b/runtime/mercury_thread.h
index 92df39d..5b993d9 100644
--- a/runtime/mercury_thread.h
+++ b/runtime/mercury_thread.h
@@ -147,22 +147,11 @@ MR_null_thread(void);
#define MR_GETSPECIFIC(key) pthread_getspecific((key))
#define MR_KEY_CREATE pthread_key_create
- typedef struct {
- void (*func)(void *);
- void *arg;
- } MR_ThreadGoal;
-
/*
- ** create_thread(Goal) creates a new POSIX thread, and creates and
- ** initializes a new Mercury engine to run in that thread. If Goal
- ** is a NULL pointer, that thread will suspend on the global Mercury
- ** runqueue. If Goal is non-NULL, it is a pointer to a MR_ThreadGoal
- ** structure containing a function and an argument. The function will
- ** be called with the given argument in the new thread.
+ ** create_worksteal_thread() creates a new POSIX thread, and creates and
+ ** initializes a work-stealing Mercury engine to run in that thread.
*/
-
- extern MercuryThread *MR_create_thread(MR_ThreadGoal *);
- extern void MR_destroy_thread(void *eng);
+ extern MercuryThread *MR_create_worksteal_thread(void);
/*
** The primordial thread. Currently used for debugging.
@@ -184,9 +173,14 @@ MR_null_thread(void);
#ifndef MR_HIGHLEVEL_CODE
/*
- ** This lock protects writes to the MR_all_engine_bases structure.
+ ** This points to an array containing MR_max_engines pointers to Mercury
+ ** engines. It is indexed by engine id. The first item in the array is the
+ ** primordial thread. A null entry represents an unallocated engine id.
+ ** During initialisation, the pointer may be null.
+ ** This is exported only for leave_signal_handler.
+ ** All other accesses require the MR_all_engine_bases_lock.
*/
- extern MercuryLock MR_init_engine_array_lock;
+ extern struct MR_mercury_engine_struct **MR_all_engine_bases;
#endif
/*
@@ -225,8 +219,8 @@ extern MR_Integer MR_thread_barrier_count;
#endif
/*
-** The following enum is used as the argument to init_thread.
-** MR_use_now should be passed to init_thread to indicate that
+** The following enum is used as the argument to init_thread/init_thread_inner.
+** MR_use_now should be passed to indicate that
** it has been called in a context in which it should initialize
** the current thread's environment and return.
** MR_use_later should be passed to indicate that the thread should
@@ -238,6 +232,37 @@ extern MR_Integer MR_thread_barrier_count;
typedef enum { MR_use_now, MR_use_later } MR_when_to_use;
/*
+** In low-level C parallel grades, there are two types of Mercury
+** engines. "Shared" engines may execute code from any Mercury thread.
+** "Exclusive" engines execute code only for a single Mercury thread.
+** Shared engines may steal work from other shared engines, so are also
+** called work-stealing engines; we do not have shared engines that
+** refrain from work-stealing.
+**
+** In low-level C non-parallel grades, all Mercury threads execute on
+** the same unique Mercury engine. That engine is equivalent to a shared
+** engine.
+**
+** In high-level C parallel grades, all Mercury threads execute in their
+** own POSIX thread. All engines are exclusive engines.
+**
+** In high-level C non-parallel grades, only a single Mercury thread
+** exists, executing in a single Mercury engine. That engine is
+** equivalent to an exclusive engine, or a shared engine with no other
+** engines present.
+*/
+typedef enum {
+ MR_ENGINE_TYPE_SHARED = 1,
+ MR_ENGINE_TYPE_EXCLUSIVE = 2
+} MR_EngineType;
+
+#ifdef MR_HIGHLEVEL_CODE
+ #define MR_PRIMORIDAL_ENGINE_TYPE MR_ENGINE_TYPE_EXCLUSIVE
+#else
+ #define MR_PRIMORIDAL_ENGINE_TYPE MR_ENGINE_TYPE_SHARED
+#endif
+
+/*
** Create and initialize a new Mercury engine running in the current
** POSIX thread.
**
@@ -252,6 +277,8 @@ typedef enum { MR_use_now, MR_use_later } MR_when_to_use;
*/
extern MR_bool
MR_init_thread(MR_when_to_use);
+extern MR_bool
+MR_init_thread_inner(MR_when_to_use, MR_EngineType);
/*
** Finalize the thread engine running in the current POSIX thread.
diff --git a/runtime/mercury_threadscope.c b/runtime/mercury_threadscope.c
index cda565c..8164c2b 100644
--- a/runtime/mercury_threadscope.c
+++ b/runtime/mercury_threadscope.c
@@ -1036,7 +1036,7 @@ MR_setup_threadscope(void)
** Put the startup event in the buffer.
*/
put_event_header(&global_buffer, MR_TS_EVENT_STARTUP, 0);
- put_engine_id(&global_buffer, (MR_EngineId)MR_num_threads);
+ put_engine_id(&global_buffer, (MR_EngineId)MR_num_ws_engines);
flush_event_buffer(&global_buffer);
}
diff --git a/runtime/mercury_wrapper.c b/runtime/mercury_wrapper.c
index f9c09ca..82aa750 100644
--- a/runtime/mercury_wrapper.c
+++ b/runtime/mercury_wrapper.c
@@ -323,15 +323,10 @@ static char *MR_mem_usage_report_prefix = NULL;
static int MR_num_output_args = 0;
-/*
-** This is initialized to zero. If it is still zero after configuration of the
-** runtime but before threads are started, then we set it to the number of
-** processors on the system (if support is available to detect this).
-** Otherwise, we fall back to 1.
-*/
-MR_Unsigned MR_num_threads = 0;
+#ifdef MR_LL_PARALLEL_CONJ
+MR_Unsigned MR_num_ws_engines = 0;
+MR_Unsigned MR_max_engines = 8192;
-#if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
MR_Unsigned MR_granularity_wsdeque_length_factor = 8;
MR_Unsigned MR_granularity_wsdeque_length = 0;
#endif
@@ -641,11 +636,13 @@ mercury_runtime_init(int argc, char **argv)
*/
MR_init_context_stuff();
MR_init_thread_stuff();
- MR_max_outstanding_contexts = MR_max_contexts_per_thread * MR_num_threads;
- MR_num_contexts_per_loop_control =
- MR_num_contexts_per_loop_control_per_thread * MR_num_threads;
#ifdef MR_LL_PARALLEL_CONJ
- MR_granularity_wsdeque_length = MR_granularity_wsdeque_length_factor * MR_num_threads;
+ MR_max_outstanding_contexts =
+ MR_max_contexts_per_thread * MR_num_ws_engines;
+ MR_num_contexts_per_loop_control =
+ MR_num_contexts_per_loop_control_per_thread * MR_num_ws_engines;
+ MR_granularity_wsdeque_length =
+ MR_granularity_wsdeque_length_factor * MR_num_ws_engines;
#endif
MR_primordial_thread = pthread_self();
#endif
@@ -697,28 +694,32 @@ mercury_runtime_init(int argc, char **argv)
** Start up the Mercury engine. We don't yet know how many slots will be
** needed for thread-local mutable values so allocate the maximum number.
*/
- MR_init_thread(MR_use_now);
+ MR_init_thread_inner(MR_use_now, MR_PRIMORIDAL_ENGINE_TYPE);
MR_SET_THREAD_LOCAL_MUTABLES(
MR_create_thread_local_mutables(MR_MAX_THREAD_LOCAL_MUTABLES));
+ /*
+ ** Start up additional work-stealing Mercury engines.
+ */
#ifdef MR_LL_PARALLEL_CONJ
{
int i;
- for (i = 1; i < MR_num_threads; i++) {
- MR_create_thread(NULL);
+ for (i = 1; i < MR_num_ws_engines; i++) {
+ MR_create_worksteal_thread();
}
#ifdef MR_THREADSCOPE
/*
** TSC Synchronization is not used, support is commented out.
** See runtime/mercury_threadscope.h for an explanation.
- **
+ **/
+ /*
for (i = 1; i < MR_num_threads; i++) {
MR_threadscope_sync_tsc_master();
}
*/
#endif
- while (MR_num_idle_engines < MR_num_threads-1) {
+ while (MR_num_idle_ws_engines < MR_num_ws_engines-1) {
/* busy wait until the worker threads are ready */
MR_ATOMIC_PAUSE;
}
@@ -1314,6 +1315,7 @@ enum MR_long_option {
MR_GEN_DETSTACK_REDZONE_SIZE_KWORDS,
MR_GEN_NONDETSTACK_REDZONE_SIZE,
MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS,
+ MR_MAX_ENGINES,
MR_MAX_CONTEXTS_PER_THREAD,
MR_NUM_CONTEXTS_PER_LC_PER_THREAD,
MR_RUNTIME_GRANULAITY_WSDEQUE_LENGTH_FACTOR,
@@ -1417,6 +1419,7 @@ struct MR_option MR_long_opts[] = {
1, 0, MR_GEN_NONDETSTACK_REDZONE_SIZE },
{ "gen-nondetstack-zone-size-kwords",
1, 0, MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS },
+ { "max-engines", 1, 0, MR_MAX_ENGINES },
{ "max-contexts-per-thread", 1, 0, MR_MAX_CONTEXTS_PER_THREAD },
{ "num-contexts-per-lc-per-thread", 1, 0, MR_NUM_CONTEXTS_PER_LC_PER_THREAD },
{ "runtime-granularity-wsdeque-length-factor", 1, 0,
@@ -1835,6 +1838,19 @@ MR_process_options(int argc, char **argv)
MR_gen_nondetstack_zone_size = size * sizeof(MR_Word);
break;
+ case MR_MAX_ENGINES:
+#ifdef MR_LL_PARALLEL_CONJ
+ if (sscanf(MR_optarg, "%lu", &size) != 1) {
+ MR_usage();
+ }
+
+ if (size < 1) {
+ MR_usage();
+ }
+ MR_max_engines = MR_min(size, MR_ENGINE_ID_NONE);
+#endif
+ break;
+
case MR_MAX_CONTEXTS_PER_THREAD:
if (sscanf(MR_optarg, "%lu", &size) != 1) {
MR_usage();
@@ -2291,13 +2307,13 @@ MR_process_options(int argc, char **argv)
break;
case 'P':
-#ifdef MR_THREAD_SAFE
+#ifdef MR_LL_PARALLEL_CONJ
if (sscanf(MR_optarg, "%"MR_INTEGER_LENGTH_MODIFIER"u",
- &MR_num_threads) != 1) {
+ &MR_num_ws_engines) != 1) {
MR_usage();
}
- if (MR_num_threads < 1) {
+ if (MR_num_ws_engines < 1) {
MR_usage();
}
#endif
@@ -3105,7 +3121,7 @@ mercury_runtime_terminate(void)
}
#if !defined(MR_HIGHLEVEL_CODE) && defined(MR_THREAD_SAFE)
- MR_shutdown_all_engines();
+ MR_shutdown_ws_engines();
#ifdef MR_THREADSCOPE
if (MR_ENGINE(MR_eng_ts_buffer)) {
diff --git a/runtime/mercury_wrapper.h b/runtime/mercury_wrapper.h
index 4f7ce48..3a4a50c 100644
--- a/runtime/mercury_wrapper.h
+++ b/runtime/mercury_wrapper.h
@@ -258,7 +258,7 @@ extern MR_Unsigned MR_contexts_per_thread;
/*
** The number of outstanding contexts we can create
-** (MR_contexts_per_thread * MR_num_threads).
+** (MR_contexts_per_thread * MR_num_ws_engines).
*/
extern MR_Unsigned MR_max_outstanding_contexts;
@@ -267,12 +267,26 @@ extern MR_Unsigned MR_max_outstanding_contexts;
*/
extern MR_Unsigned MR_num_contexts_per_loop_control;
-extern MR_Unsigned MR_num_threads;
+#ifdef MR_LL_PARALLEL_CONJ
+/*
+** MR_num_ws_engines is the number of work-stealing Mercury engines.
+** This is initialized to zero. If it is still zero after configuration of the
+** runtime but before threads are started, then we set it to the number of
+** processors on the system (if support is available to detect this).
+** Otherwise, we fall back to 1.
+** After startup, the number of work-stealing Mercury engines is fixed.
+*/
+extern MR_Unsigned MR_num_ws_engines;
+
+/*
+** MR_max_engines is the maximum number of total engines we can create.
+** MR_num_ws_engines <= MR_max_engines < MR_ENGINE_ID_NONE
+*/
+extern MR_Unsigned MR_max_engines;
-#if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
/*
** This is used to set MR_granularity_wsdeque_length based on the value of
-** MR_num_threads. A value of 2 says, allow twice as many threads in a
+** MR_num_ws_engines. A value of 2 says, allow twice as many threads in a
** context's wsdeque than mercury engines before granularity control has an
** effect.
*/
--
1.8.4
More information about the reviews
mailing list