[m-rev.] for review: Re-factor the MR_join_and_continue macro.
Paul Bone
pbone at csse.unimelb.edu.au
Fri Nov 27 14:50:50 AEDT 2009
On Mon, Nov 23, 2009 at 02:09:01PM +1100, Paul Bone wrote:
>
> For review by someone familiar with the low-level C backend, probably Zoltan.
>
> - Thanks, Paul.
>
> Re-factor the MR_join_and_continue macro.
>
This alternative change is much simplier in that it doesn't require
modifications to the compiler.
For post commit review.
Re-factor the MR_join_and_continue macro.
This change replaces the MR_join_and_continue macro with a C procedure. A
smaller macro named MR_join_and_continue wraps the new C procedure and provides
a trampoline to prevent C stack leaks. MR_join_and_continue will now have the
additional cost of a C procedure call rather than always being inlined. This
code is only used in the implementation of parallel conjunctions in the low
level C grades, it does not affect other grades.
An earlier revision of this code was causing deadlocks, to debug them support
was added to the MR_SIGNAL MR_BROADCAST and MR_WAIT macros to enable better
logging of the use of condition variables when MR_DEBUG_THREADS is defined at
compile time.
This change passes bootcheck and the test suite in the asm_fast.gc.par grade.
runtime/mercury_context.h:
runtime/mercury_context.c:
Created MR_do_join_and_continue procedure from old MR_join_and_continue
macro.
Added additional comments to this procedure, describing how it works.
Created a new macro MR_join_and_continue that wraps the new procedure.
Conform to changes in the MR_WAIT, MR_SIGNAL and MR_BROADCAST macros.
runtime/mercury_thread.h:
Added a from parameter to the MR_WAIT, MR_SIGNAL and MR_BROADCAST macros.
Added a from parameter to the C procedures' declarations that implement the
debugging versions of the condition operations above.
Adjusted the formatting of these declarations to match the C style used in
the project.
runtime/mercury_thread.c:
The C procedures implementing the debugging versions of the condition
operations now print out their from parameter.
MR_cond_broadcast now uses "broadcast" in it's log message rather than
"signal"
MR_cond_wait's log message now more clearly specifies which argument is the
lock and which is the condition variable.
Index: runtime/mercury_context.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.c,v
retrieving revision 1.70
diff -u -p -b -r1.70 mercury_context.c
--- runtime/mercury_context.c 6 Nov 2009 05:40:24 -0000 1.70
+++ runtime/mercury_context.c 27 Nov 2009 00:36:45 -0000
@@ -787,9 +787,9 @@ MR_schedule_context(MR_Context *ctxt)
** we wake up all the waiting threads.
*/
if (ctxt->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
- MR_SIGNAL(&MR_runqueue_cond);
+ MR_SIGNAL(&MR_runqueue_cond, "schedule_context");
} else {
- MR_BROADCAST(&MR_runqueue_cond);
+ MR_BROADCAST(&MR_runqueue_cond, "schedule_context");
}
#endif
MR_UNLOCK(&MR_runqueue_lock, "schedule_context");
@@ -803,7 +803,7 @@ MR_schedule_spark_globally(const MR_Spar
MR_wsdeque_push_bottom(&MR_spark_queue, proto_spark);
MR_num_outstanding_contexts_and_global_sparks++;
MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);
- MR_SIGNAL(&MR_runqueue_cond);
+ MR_SIGNAL(&MR_runqueue_cond, "schedule_spark_globally");
MR_UNLOCK(&MR_runqueue_lock, "schedule_spark_globally");
}
#endif /* !MR_LL_PARALLEL_CONJ */
@@ -825,6 +825,7 @@ MR_define_entry(MR_do_runnext);
MR_Spark spark;
unsigned depth;
MercuryThread thd;
+ int wait_result;
#ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
MR_Timer runnext_timer;
@@ -903,8 +904,10 @@ MR_define_entry(MR_do_runnext);
&MR_profile_parallel_executed_nothing);
}
#endif
- while (MR_WAIT(&MR_runqueue_cond, &MR_runqueue_lock) != 0) {
- }
+ do {
+ wait_result =
+ MR_WAIT(&MR_runqueue_cond, &MR_runqueue_lock, "do_runnext");
+ } while (wait_result != 0);
}
ReadyContext:
@@ -986,6 +989,126 @@ MR_END_MODULE
#endif /* !MR_HIGHLEVEL_CODE */
#ifdef MR_LL_PARALLEL_CONJ
+MR_Code*
+MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label)
+{
+ if (!jnc_st->MR_st_is_shared) {
+ /* This parallel conjunction has only executed sequentially. */
+ if (--jnc_st->MR_st_count == 0) {
+ /*
+ ** It has finished executing, continue execution from the join
+ ** label.
+ */
+ return join_label;
+ } else {
+ /*
+ ** It has not finished executing. Try to finish it by executing
+ ** our sparks.
+ ** This code was formerly MR_join_and_continue_1()
+ */
+ MR_Context *jnc_ctxt;
+ MR_bool jnc_popped;
+ MR_Spark jnc_spark;
+
+ jnc_ctxt = MR_ENGINE(MR_eng_this_context);
+ jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
+ &jnc_spark);
+ if (jnc_popped) {
+ MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
+ return jnc_spark.MR_spark_resume;
+ } else {
+ /*
+ ** Someone's stolen our sparks, we should try to execute
+ ** something that's been scheduled globally. XXX: We don't yet
+ ** have work stealing so how can this happen!?
+ */
+ fprintf(stderr, "My sparks have been stolen!! %lp", pthread_self());
+ return MR_ENTRY(MR_do_runnext);
+ }
+ }
+ } else {
+ /* This parallel conjunction may be executing in parallel. */
+ MR_LOCK(&MR_sync_term_lock, "continue");
+ if (--jnc_st->MR_st_count == 0) {
+ /* This parallel conjunction has finished. */
+ if (MR_ENGINE(MR_eng_this_context) == jnc_st->MR_st_orig_context) {
+ /*
+ ** This context originated this parallel conjunction and all
+ ** the branches have finished so jump to the join label.
+ */
+ MR_UNLOCK(&MR_sync_term_lock, "continue i");
+ return join_label;
+ } else {
+ /*
+ ** This context didn't originate this parallel conjunction and
+ ** we're the last branch to finish. The originating context
+ ** should be suspended waiting for us to finish, so wake it up.
+ */
+ jnc_st->MR_st_orig_context->MR_ctxt_resume = join_label;
+ MR_schedule_context(jnc_st->MR_st_orig_context);
+ MR_UNLOCK(&MR_sync_term_lock, "continue ii");
+ return MR_ENTRY(MR_do_runnext);
+ }
+ } else {
+ /*
+ ** The parallel conjunction is being executed in parallel but it is
+ ** not yet finished. This code was Formerly
+ ** MR_join_and_continue_2()
+ */
+ MR_Context *jnc_ctxt;
+ MR_bool jnc_popped;
+ MR_Spark jnc_spark;
+
+ jnc_ctxt = MR_ENGINE(MR_eng_this_context);
+ jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
+ &jnc_spark);
+ if (jnc_popped && (jnc_spark.MR_spark_parent_sp == MR_parent_sp)) {
+ /*
+ ** The spark at the top of the stack is from to the same parallel
+ ** conjunction that we've just been executing. We can immediately
+ ** execute the next branch of the same parallel conjunction in
+ ** the current context.
+ */
+ MR_UNLOCK(&MR_sync_term_lock, "continue_2 i");
+ MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
+ return jnc_spark.MR_spark_resume;
+ } else {
+ /*
+ ** The spark stack is empty or the next spark is from a different
+ ** parallel conjunction to the one we've been executing. Either
+ ** way, there's nothing more we can do with this context right
+ ** now. Put back the spark we won't be using.
+ */
+ if (jnc_popped) {
+ MR_wsdeque_putback_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
+ &jnc_spark);
+ }
+ /*
+ ** If this context originated the parallel conjunction we've been
+ ** executing, the rest of the parallel conjunction must have been
+ ** put on the global spark queue to be executed in other
+ ** contexts. This context will need to be resumed once the
+ ** parallel conjunction is completed, so suspend the context.
+ **
+ ** What if the other conjuncts where put on the global queue
+ ** but haven't yet been taken by other threads? Then this step
+ ** is redundant. It's not worth fixing, this problem will go
+ ** away once we enable work-stealing. - pbone.
+ */
+ if (jnc_ctxt == jnc_st->MR_st_orig_context) {
+ MR_save_context(jnc_ctxt);
+ MR_ENGINE(MR_eng_this_context) = NULL;
+ }
+ /* Finally look for other work. */
+ MR_UNLOCK(&MR_sync_term_lock, "continue_2 ii");
+ return MR_ENTRY(MR_do_runnext);
+ }
+ }
+ }
+}
+#endif
+
+#ifdef MR_LL_PARALLEL_CONJ
/*
* Debugging functions for runtime granularity control.
Index: runtime/mercury_context.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.h,v
retrieving revision 1.55
diff -u -p -b -r1.55 mercury_context.h
--- runtime/mercury_context.h 17 Nov 2009 06:30:26 -0000 1.55
+++ runtime/mercury_context.h 27 Nov 2009 00:49:46 -0000
@@ -811,111 +811,15 @@ extern void MR_schedule_context(
MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks); \
} while (0)
- #define MR_join_and_continue(sync_term, join_label) \
- do { \
- MR_SyncTerm *jnc_st = (MR_SyncTerm *) &sync_term; \
- \
- if (!jnc_st->MR_st_is_shared) { \
- /* This parallel conjunction has only executed sequentially. */ \
- if (--jnc_st->MR_st_count == 0) { \
- MR_GOTO(join_label); \
- } else { \
- MR_join_and_continue_1(); \
- } \
- } else { \
- /* This parallel conjunction may be executing in parallel. */ \
- MR_LOCK(&MR_sync_term_lock, "continue"); \
- if (--jnc_st->MR_st_count == 0) { \
- if (MR_ENGINE(MR_eng_this_context) \
- == jnc_st->MR_st_orig_context) \
- { \
- /* \
- ** This context originated this parallel conjunction and \
- ** all the branches have finished so jump to the join \
- ** label. \
- */ \
- MR_UNLOCK(&MR_sync_term_lock, "continue i"); \
- MR_GOTO(join_label); \
- } else { \
- /* \
- ** This context didn't originate this parallel \
- ** conjunction and we're the last branch to finish. The \
- ** originating context should be suspended waiting for us \
- ** to finish, so wake it up. \
- */ \
- jnc_st->MR_st_orig_context->MR_ctxt_resume = join_label; \
- MR_schedule_context(jnc_st->MR_st_orig_context); \
- MR_UNLOCK(&MR_sync_term_lock, "continue ii"); \
- MR_runnext(); \
- } \
- } else { \
- MR_join_and_continue_2(); \
- } \
- } \
- } while (0)
+extern MR_Code*
+MR_do_join_and_continue(MR_SyncTerm *sync_term, MR_Code *join_label);
- #define MR_join_and_continue_1() \
- do { \
- MR_Context *jnc_ctxt; \
- MR_bool jnc_popped; \
- MR_Spark jnc_spark; \
- \
- jnc_ctxt = MR_ENGINE(MR_eng_this_context); \
- jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque, \
- &jnc_spark); \
- if (jnc_popped) { \
- MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks); \
- MR_GOTO(jnc_spark.MR_spark_resume); \
- } else { \
- MR_runnext(); \
- } \
- } while (0)
-
- #define MR_join_and_continue_2() \
+ #define MR_join_and_continue(sync_term, join_label) \
do { \
- MR_Context *jnc_ctxt; \
- MR_bool jnc_popped; \
- MR_Spark jnc_spark; \
- \
- jnc_ctxt = MR_ENGINE(MR_eng_this_context); \
- jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque, \
- &jnc_spark); \
- if (jnc_popped && (jnc_spark.MR_spark_parent_sp == MR_parent_sp)) { \
- /* \
- ** The spark at the top of the stack is due to the same parallel \
- ** conjunction that we've just been executing. We can immediately \
- ** execute the next branch of the same parallel conjunction in \
- ** the current context. \
- */ \
- MR_UNLOCK(&MR_sync_term_lock, "continue_2 i"); \
- MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks); \
- MR_GOTO(jnc_spark.MR_spark_resume); \
- } else { \
- /* \
- ** The spark stack is empty or the next spark is from a different \
- ** parallel conjunction to the one we've been executing. Either \
- ** way, there's nothing more we can do with this context right \
- ** now. Put back the spark we won't be using. \
- */ \
- if (jnc_popped) { \
- MR_wsdeque_putback_bottom(&jnc_ctxt->MR_ctxt_spark_deque, \
- &jnc_spark); \
- } \
- /* \
- ** If this context originated the parallel conjunction we've been \
- ** executing, the rest of the parallel conjunction must have been \
- ** put on the global spark queue to be executed in other \
- ** contexts. This context will need to be resumed once the \
- ** parallel conjunction is completed, so suspend the context. \
- */ \
- if (jnc_ctxt == jnc_st->MR_st_orig_context) { \
- MR_save_context(jnc_ctxt); \
- MR_ENGINE(MR_eng_this_context) = NULL; \
- } \
- /* Finally look for other work. */ \
- MR_UNLOCK(&MR_sync_term_lock, "continue_2 ii"); \
- MR_runnext(); \
- } \
+ MR_Code *jump_target; \
+ jump_target = \
+ MR_do_join_and_continue((MR_SyncTerm*) &(sync_term), join_label); \
+ MR_GOTO(jump_target); \
} while (0)
/* This needs to come after the definition of MR_SparkDeque_Struct. */
Index: runtime/mercury_thread.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.c,v
retrieving revision 1.35
diff -u -p -b -r1.35 mercury_thread.c
--- runtime/mercury_thread.c 23 Aug 2009 22:52:35 -0000 1.35
+++ runtime/mercury_thread.c 26 Nov 2009 23:25:34 -0000
@@ -221,34 +221,36 @@ MR_mutex_unlock(MercuryLock *lock, const
}
int
-MR_cond_signal(MercuryCond *cond)
+MR_cond_signal(MercuryCond *cond, const char *from)
{
int err;
- fprintf(stderr, "%ld signaling %p\n", (long) pthread_self(), cond);
+ fprintf(stderr, "%ld signaling %p (%s)\n",
+ (long) pthread_self(), cond, from);
err = pthread_cond_signal(cond);
assert(err == 0);
return err;
}
int
-MR_cond_broadcast(MercuryCond *cond)
+MR_cond_broadcast(MercuryCond *cond, const char *from)
{
int err;
- fprintf(stderr, "%ld signaling %p\n", (long) pthread_self(), cond);
+ fprintf(stderr, "%ld broadcasting %p (%s)\n",
+ (long) pthread_self(), cond, from);
err = pthread_cond_broadcast(cond);
assert(err == 0);
return err;
}
int
-MR_cond_wait(MercuryCond *cond, MercuryLock *lock)
+MR_cond_wait(MercuryCond *cond, MercuryLock *lock, const char *from)
{
int err;
- fprintf(stderr, "%ld waiting on %p (%p)\n", (long) pthread_self(),
- cond, lock);
+ fprintf(stderr, "%ld waiting on cond: %p lock: %p (%s)\n",
+ (long) pthread_self(), cond, lock, from);
err = pthread_cond_wait(cond, lock);
assert(err == 0);
return err;
Index: runtime/mercury_thread.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.h,v
retrieving revision 1.27
diff -u -p -b -r1.27 mercury_thread.h
--- runtime/mercury_thread.h 4 Jun 2009 08:07:10 -0000 1.27
+++ runtime/mercury_thread.h 27 Nov 2009 01:41:46 -0000
@@ -31,11 +31,16 @@
typedef pthread_mutex_t MercuryLock;
typedef pthread_cond_t MercuryCond;
- extern int MR_mutex_lock(MercuryLock *lock, const char *from);
- extern int MR_mutex_unlock(MercuryLock *lock, const char *from);
- extern int MR_cond_signal(MercuryCond *cond);
- extern int MR_cond_broadcast(MercuryCond *cond);
- extern int MR_cond_wait(MercuryCond *cond, MercuryLock *lock);
+extern int
+MR_mutex_lock(MercuryLock *lock, const char *from);
+extern int
+MR_mutex_unlock(MercuryLock *lock, const char *from);
+extern int
+MR_cond_signal(MercuryCond *cond, const char *from);
+extern int
+MR_cond_broadcast(MercuryCond *cond, const char *from);
+extern int
+MR_cond_wait(MercuryCond *cond, MercuryLock *lock, const char *from);
extern MR_bool MR_debug_threads;
@@ -48,9 +53,9 @@
#define MR_LOCK(lck, from) pthread_mutex_lock((lck))
#define MR_UNLOCK(lck, from) pthread_mutex_unlock((lck))
- #define MR_SIGNAL(cnd) pthread_cond_signal((cnd))
- #define MR_BROADCAST(cnd) pthread_cond_broadcast((cnd))
- #define MR_WAIT(cnd, mtx) pthread_cond_wait((cnd), (mtx))
+ #define MR_SIGNAL(cnd, from) pthread_cond_signal((cnd))
+ #define MR_BROADCAST(cnd, from) pthread_cond_broadcast((cnd))
+ #define MR_WAIT(cnd, mtx, from) pthread_cond_wait((cnd), (mtx))
#else
#define MR_LOCK(lck, from) \
( MR_debug_threads ? \
@@ -65,21 +70,21 @@
pthread_mutex_unlock((lck)) \
)
- #define MR_SIGNAL(cnd) \
+ #define MR_SIGNAL(cnd, from) \
( MR_debug_threads ? \
- MR_cond_signal((cnd)) \
+ MR_cond_signal((cnd), (from)) \
: \
pthread_cond_signal((cnd)) \
)
- #define MR_BROADCAST(cnd) \
+ #define MR_BROADCAST(cnd, from) \
( MR_debug_threads ? \
- MR_cond_broadcast((cnd)) \
+ MR_cond_broadcast((cnd), (from)) \
: \
pthread_cond_broadcast((cnd)) \
)
- #define MR_WAIT(cnd, mtx) \
+ #define MR_WAIT(cnd, mtx, from) \
( MR_debug_threads ? \
- MR_cond_wait((cnd), (mtx)) \
+ MR_cond_wait((cnd), (mtx), (from)) \
: \
pthread_cond_wait((cnd), (mtx)) \
)
@@ -155,9 +160,9 @@
#define MR_LOCK(nothing, from) do { } while (0)
#define MR_UNLOCK(nothing, from) do { } while (0)
- #define MR_SIGNAL(nothing) do { } while (0)
- #define MR_BROADCAST(nothing) do { } while (0)
- #define MR_WAIT(no, thing) (0)
+ #define MR_SIGNAL(nothing, from) do { } while (0)
+ #define MR_BROADCAST(nothing, from) do { } while (0)
+ #define MR_WAIT(no, thing, from) (0)
#define MR_OBTAIN_GLOBAL_LOCK(where) do { } while (0)
#define MR_RELEASE_GLOBAL_LOCK(where) do { } while (0)
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 489 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20091127/05a6f25d/attachment.sig>
More information about the reviews
mailing list