[m-rev.] for review: Re-factor the MR_join_and_continue macro.

Paul Bone pbone at csse.unimelb.edu.au
Fri Nov 27 14:50:50 AEDT 2009


On Mon, Nov 23, 2009 at 02:09:01PM +1100, Paul Bone wrote:
> 
> For review by someone familiar with the low-level C backend, probably Zoltan.
> 
>  - Thanks, Paul.
> 
> Re-factor the MR_join_and_continue macro.
> 

This alternative change is much simplier in that it doesn't require
modifications to the compiler.

For post commit review.

Re-factor the MR_join_and_continue macro.

This change replaces the MR_join_and_continue macro with a C procedure.  A
smaller macro named MR_join_and_continue wraps the new C procedure and provides
a trampoline to prevent C stack leaks.  MR_join_and_continue will now have the
additional cost of a C procedure call rather than always being inlined.  This
code is only used in the implementation of parallel conjunctions in the low
level C grades, it does not affect other grades.

An earlier revision of this code was causing deadlocks, to debug them support
was added to the MR_SIGNAL MR_BROADCAST and MR_WAIT macros to enable better
logging of the use of condition variables when MR_DEBUG_THREADS is defined at
compile time.

This change passes bootcheck and the test suite in the asm_fast.gc.par grade.

runtime/mercury_context.h:
runtime/mercury_context.c:
    Created MR_do_join_and_continue procedure from old MR_join_and_continue
    macro.
    Added additional comments to this procedure, describing how it works.
    Created a new macro MR_join_and_continue that wraps the new procedure.
    Conform to changes in the MR_WAIT, MR_SIGNAL and MR_BROADCAST macros. 

runtime/mercury_thread.h:
    Added a from parameter to the MR_WAIT, MR_SIGNAL and MR_BROADCAST macros.
    Added a from parameter to the C procedures' declarations that implement the
    debugging versions of the condition operations above.
    Adjusted the formatting of these declarations to match the C style used in
    the project.

runtime/mercury_thread.c:
    The C procedures implementing the debugging versions of the condition
    operations now print out their from parameter.
    MR_cond_broadcast now uses "broadcast" in it's log message rather than
    "signal"
    MR_cond_wait's log message now more clearly specifies which argument is the
    lock and which is the condition variable.

Index: runtime/mercury_context.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.c,v
retrieving revision 1.70
diff -u -p -b -r1.70 mercury_context.c
--- runtime/mercury_context.c	6 Nov 2009 05:40:24 -0000	1.70
+++ runtime/mercury_context.c	27 Nov 2009 00:36:45 -0000
@@ -787,9 +787,9 @@ MR_schedule_context(MR_Context *ctxt)
     ** we wake up all the waiting threads.
     */
     if (ctxt->MR_ctxt_resume_owner_thread == (MercuryThread) NULL) {
-        MR_SIGNAL(&MR_runqueue_cond);
+        MR_SIGNAL(&MR_runqueue_cond, "schedule_context");
     } else {
-        MR_BROADCAST(&MR_runqueue_cond);
+        MR_BROADCAST(&MR_runqueue_cond, "schedule_context");
     }
 #endif
     MR_UNLOCK(&MR_runqueue_lock, "schedule_context");
@@ -803,7 +803,7 @@ MR_schedule_spark_globally(const MR_Spar
     MR_wsdeque_push_bottom(&MR_spark_queue, proto_spark);
     MR_num_outstanding_contexts_and_global_sparks++;
     MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);
-    MR_SIGNAL(&MR_runqueue_cond);
+    MR_SIGNAL(&MR_runqueue_cond, "schedule_spark_globally");
     MR_UNLOCK(&MR_runqueue_lock, "schedule_spark_globally");
 }
 #endif /* !MR_LL_PARALLEL_CONJ */
@@ -825,6 +825,7 @@ MR_define_entry(MR_do_runnext);
     MR_Spark        spark;
     unsigned        depth;
     MercuryThread   thd;
+    int             wait_result;
 
 #ifdef MR_PROFILE_PARALLEL_EXECUTION_SUPPORT
     MR_Timer        runnext_timer;
@@ -903,8 +904,10 @@ MR_define_entry(MR_do_runnext);
                     &MR_profile_parallel_executed_nothing);
         }
 #endif
-        while (MR_WAIT(&MR_runqueue_cond, &MR_runqueue_lock) != 0) {
-        }
+        do {
+            wait_result = 
+                MR_WAIT(&MR_runqueue_cond, &MR_runqueue_lock, "do_runnext");
+        } while (wait_result != 0);
     }
 
   ReadyContext:
@@ -986,6 +989,126 @@ MR_END_MODULE
 #endif /* !MR_HIGHLEVEL_CODE */
 
 #ifdef MR_LL_PARALLEL_CONJ
+MR_Code*
+MR_do_join_and_continue(MR_SyncTerm *jnc_st, MR_Code *join_label) 
+{
+    if (!jnc_st->MR_st_is_shared) {
+        /* This parallel conjunction has only executed sequentially. */
+        if (--jnc_st->MR_st_count == 0) {
+            /*
+            ** It has finished executing, continue execution from the join
+            ** label.
+            */
+            return join_label;
+        } else {
+            /*
+            ** It has not finished executing.  Try to finish it by executing
+            ** our sparks.
+            ** This code was formerly MR_join_and_continue_1() 
+            */
+            MR_Context  *jnc_ctxt;
+            MR_bool     jnc_popped;
+            MR_Spark    jnc_spark;
+
+            jnc_ctxt = MR_ENGINE(MR_eng_this_context);
+            jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
+                &jnc_spark);
+            if (jnc_popped) {
+                MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
+                return jnc_spark.MR_spark_resume;
+            } else {
+                /*
+                ** Someone's stolen our sparks, we should try to execute
+                ** something that's been scheduled globally.  XXX: We don't yet
+                ** have work stealing so how can this happen!?
+                */
+                fprintf(stderr, "My sparks have been stolen!! %lp", pthread_self());
+                return MR_ENTRY(MR_do_runnext);
+            }
+        }
+    } else {
+        /* This parallel conjunction may be executing in parallel. */
+        MR_LOCK(&MR_sync_term_lock, "continue");
+        if (--jnc_st->MR_st_count == 0) {
+            /* This parallel conjunction has finished. */
+            if (MR_ENGINE(MR_eng_this_context) == jnc_st->MR_st_orig_context) {
+                /*
+                ** This context originated this parallel conjunction and all
+                ** the branches have finished so jump to the join label.
+                */
+                MR_UNLOCK(&MR_sync_term_lock, "continue i");
+                return join_label;
+            } else {
+                /*
+                ** This context didn't originate this parallel conjunction and
+                ** we're the last branch to finish.  The originating context
+                ** should be suspended waiting for us to finish, so wake it up.
+                */
+                jnc_st->MR_st_orig_context->MR_ctxt_resume = join_label;
+                MR_schedule_context(jnc_st->MR_st_orig_context);
+                MR_UNLOCK(&MR_sync_term_lock, "continue ii");
+                return MR_ENTRY(MR_do_runnext);
+            }
+        } else {
+            /*
+            ** The parallel conjunction is being executed in parallel but it is
+            ** not yet finished.  This code was Formerly
+            ** MR_join_and_continue_2() 
+            */
+            MR_Context  *jnc_ctxt;
+            MR_bool     jnc_popped;
+            MR_Spark    jnc_spark;
+
+            jnc_ctxt = MR_ENGINE(MR_eng_this_context);
+            jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
+                &jnc_spark);
+            if (jnc_popped && (jnc_spark.MR_spark_parent_sp == MR_parent_sp)) {
+                /*
+                ** The spark at the top of the stack is from to the same parallel
+                ** conjunction that we've just been executing. We can immediately
+                ** execute the next branch of the same parallel conjunction in
+                ** the current context.
+                */
+                MR_UNLOCK(&MR_sync_term_lock, "continue_2 i");
+                MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
+                return jnc_spark.MR_spark_resume;
+            } else {
+                /*
+                ** The spark stack is empty or the next spark is from a different
+                ** parallel conjunction to the one we've been executing.  Either
+                ** way, there's nothing more we can do with this context right
+                ** now.  Put back the spark we won't be using.
+                */
+                if (jnc_popped) {
+                    MR_wsdeque_putback_bottom(&jnc_ctxt->MR_ctxt_spark_deque,
+                        &jnc_spark);
+                }
+                /*
+                ** If this context originated the parallel conjunction we've been
+                ** executing, the rest of the parallel conjunction must have been
+                ** put on the global spark queue to be executed in other
+                ** contexts.  This context will need to be resumed once the
+                ** parallel conjunction is completed, so suspend the context.
+                **
+                ** What if the other conjuncts where put on the global queue
+                ** but haven't yet been taken by other threads?  Then this step
+                ** is redundant.  It's not worth fixing, this problem will go
+                ** away once we enable work-stealing. - pbone. 
+                */
+                if (jnc_ctxt == jnc_st->MR_st_orig_context) {
+                    MR_save_context(jnc_ctxt);
+                    MR_ENGINE(MR_eng_this_context) = NULL;
+                }
+                /* Finally look for other work. */
+                MR_UNLOCK(&MR_sync_term_lock, "continue_2 ii");
+                return MR_ENTRY(MR_do_runnext);
+            }
+        }
+    }
+}
+#endif
+
+#ifdef MR_LL_PARALLEL_CONJ
 
 /*
  * Debugging functions for runtime granularity control.
Index: runtime/mercury_context.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.h,v
retrieving revision 1.55
diff -u -p -b -r1.55 mercury_context.h
--- runtime/mercury_context.h	17 Nov 2009 06:30:26 -0000	1.55
+++ runtime/mercury_context.h	27 Nov 2009 00:49:46 -0000
@@ -811,111 +811,15 @@ extern  void        MR_schedule_context(
         MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);       \
     } while (0)
 
-  #define MR_join_and_continue(sync_term, join_label)                         \
-    do {                                                                      \
-        MR_SyncTerm *jnc_st = (MR_SyncTerm *) &sync_term;                     \
-                                                                              \
-        if (!jnc_st->MR_st_is_shared) {                                       \
-            /* This parallel conjunction has only executed sequentially. */   \
-            if (--jnc_st->MR_st_count == 0) {                                 \
-                MR_GOTO(join_label);                                          \
-            } else {                                                          \
-                MR_join_and_continue_1();                                     \
-            }                                                                 \
-        } else {                                                              \
-            /* This parallel conjunction may be executing in parallel. */     \
-            MR_LOCK(&MR_sync_term_lock, "continue");                          \
-            if (--jnc_st->MR_st_count == 0) {                                 \
-                if (MR_ENGINE(MR_eng_this_context)                            \
-                    == jnc_st->MR_st_orig_context)                            \
-                {                                                             \
-                    /*                                                        \
-                    ** This context originated this parallel conjunction and  \
-                    ** all the branches have finished so jump to the join     \
-                    ** label.                                                 \
-                    */                                                        \
-                    MR_UNLOCK(&MR_sync_term_lock, "continue i");              \
-                    MR_GOTO(join_label);                                      \
-                } else {                                                      \
-                    /*                                                        \
-                    ** This context didn't originate this parallel            \
-                    ** conjunction and we're the last branch to finish.  The  \
-                    ** originating context should be suspended waiting for us \
-                    ** to finish, so wake it up.                              \
-                    */                                                        \
-                    jnc_st->MR_st_orig_context->MR_ctxt_resume = join_label;  \
-                    MR_schedule_context(jnc_st->MR_st_orig_context);          \
-                    MR_UNLOCK(&MR_sync_term_lock, "continue ii");             \
-                    MR_runnext();                                             \
-                }                                                             \
-            } else {                                                          \
-                MR_join_and_continue_2();                                     \
-            }                                                                 \
-        }                                                                     \
-    } while (0)
+extern MR_Code* 
+MR_do_join_and_continue(MR_SyncTerm *sync_term, MR_Code *join_label);
 
-  #define MR_join_and_continue_1()                                            \
-    do {                                                                      \
-        MR_Context  *jnc_ctxt;                                                \
-        MR_bool     jnc_popped;                                               \
-        MR_Spark    jnc_spark;                                                \
-                                                                              \
-        jnc_ctxt = MR_ENGINE(MR_eng_this_context);                            \
-        jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,    \
-            &jnc_spark);                                                      \
-        if (jnc_popped) {                                                     \
-            MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);   \
-            MR_GOTO(jnc_spark.MR_spark_resume);                               \
-        } else {                                                              \
-            MR_runnext();                                                     \
-        }                                                                     \
-    } while (0)
-
-  #define MR_join_and_continue_2()                                            \
+  #define MR_join_and_continue(sync_term, join_label)                         \
     do {                                                                      \
-        MR_Context  *jnc_ctxt;                                                \
-        MR_bool     jnc_popped;                                               \
-        MR_Spark    jnc_spark;                                                \
-                                                                              \
-        jnc_ctxt = MR_ENGINE(MR_eng_this_context);                            \
-        jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,    \
-            &jnc_spark);                                                      \
-        if (jnc_popped && (jnc_spark.MR_spark_parent_sp == MR_parent_sp)) {   \
-            /*                                                                \
-            ** The spark at the top of the stack is due to the same parallel  \
-            ** conjunction that we've just been executing. We can immediately \
-            ** execute the next branch of the same parallel conjunction in    \
-            ** the current context.                                           \
-            */                                                                \
-            MR_UNLOCK(&MR_sync_term_lock, "continue_2 i");                    \
-            MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);   \
-            MR_GOTO(jnc_spark.MR_spark_resume);                               \
-        } else {                                                              \
-            /*                                                                \
-            ** The spark stack is empty or the next spark is from a different \
-            ** parallel conjunction to the one we've been executing.  Either  \
-            ** way, there's nothing more we can do with this context right    \
-            ** now.  Put back the spark we won't be using.                    \
-            */                                                                \
-            if (jnc_popped) {                                                 \
-                MR_wsdeque_putback_bottom(&jnc_ctxt->MR_ctxt_spark_deque,     \
-                    &jnc_spark);                                              \
-            }                                                                 \
-            /*                                                                \
-            ** If this context originated the parallel conjunction we've been \
-            ** executing, the rest of the parallel conjunction must have been \
-            ** put on the global spark queue to be executed in other          \
-            ** contexts.  This context will need to be resumed once the       \
-            ** parallel conjunction is completed, so suspend the context.     \
-            */                                                                \
-            if (jnc_ctxt == jnc_st->MR_st_orig_context) {                     \
-                MR_save_context(jnc_ctxt);                                    \
-                MR_ENGINE(MR_eng_this_context) = NULL;                        \
-            }                                                                 \
-            /* Finally look for other work. */                                \
-            MR_UNLOCK(&MR_sync_term_lock, "continue_2 ii");                   \
-            MR_runnext();                                                     \
-        }                                                                     \
+        MR_Code     *jump_target;                                             \
+        jump_target =                                                         \
+            MR_do_join_and_continue((MR_SyncTerm*) &(sync_term), join_label); \
+        MR_GOTO(jump_target);                                                 \
     } while (0)
 
   /* This needs to come after the definition of MR_SparkDeque_Struct. */
Index: runtime/mercury_thread.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.c,v
retrieving revision 1.35
diff -u -p -b -r1.35 mercury_thread.c
--- runtime/mercury_thread.c	23 Aug 2009 22:52:35 -0000	1.35
+++ runtime/mercury_thread.c	26 Nov 2009 23:25:34 -0000
@@ -221,34 +221,36 @@ MR_mutex_unlock(MercuryLock *lock, const
 }
 
 int
-MR_cond_signal(MercuryCond *cond)
+MR_cond_signal(MercuryCond *cond, const char *from)
 {
     int err;
 
-    fprintf(stderr, "%ld signaling %p\n", (long) pthread_self(), cond);
+    fprintf(stderr, "%ld signaling %p (%s)\n", 
+        (long) pthread_self(), cond, from);
     err = pthread_cond_signal(cond);
     assert(err == 0);
     return err;
 }
 
 int
-MR_cond_broadcast(MercuryCond *cond)
+MR_cond_broadcast(MercuryCond *cond, const char *from)
 {
     int err;
 
-    fprintf(stderr, "%ld signaling %p\n", (long) pthread_self(), cond);
+    fprintf(stderr, "%ld broadcasting %p (%s)\n", 
+        (long) pthread_self(), cond, from);
     err = pthread_cond_broadcast(cond);
     assert(err == 0);
     return err;
 }
 
 int
-MR_cond_wait(MercuryCond *cond, MercuryLock *lock)
+MR_cond_wait(MercuryCond *cond, MercuryLock *lock, const char *from)
 {
     int err;
 
-    fprintf(stderr, "%ld waiting on %p (%p)\n", (long) pthread_self(),
-        cond, lock);
+    fprintf(stderr, "%ld waiting on cond: %p lock: %p (%s)\n", 
+        (long) pthread_self(), cond, lock, from);
     err = pthread_cond_wait(cond, lock);
     assert(err == 0);
     return err;
Index: runtime/mercury_thread.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.h,v
retrieving revision 1.27
diff -u -p -b -r1.27 mercury_thread.h
--- runtime/mercury_thread.h	4 Jun 2009 08:07:10 -0000	1.27
+++ runtime/mercury_thread.h	27 Nov 2009 01:41:46 -0000
@@ -31,11 +31,16 @@
   typedef pthread_mutex_t   MercuryLock;
   typedef pthread_cond_t    MercuryCond;
 
-  extern int        MR_mutex_lock(MercuryLock *lock, const char *from);
-  extern int        MR_mutex_unlock(MercuryLock *lock, const char *from);
-  extern int        MR_cond_signal(MercuryCond *cond);
-  extern int        MR_cond_broadcast(MercuryCond *cond);
-  extern int        MR_cond_wait(MercuryCond *cond, MercuryLock *lock);
+extern int
+MR_mutex_lock(MercuryLock *lock, const char *from);
+extern int
+MR_mutex_unlock(MercuryLock *lock, const char *from);
+extern int
+MR_cond_signal(MercuryCond *cond, const char *from);
+extern int
+MR_cond_broadcast(MercuryCond *cond, const char *from);
+extern int
+MR_cond_wait(MercuryCond *cond, MercuryLock *lock, const char *from);
 
   extern MR_bool    MR_debug_threads;
 
@@ -48,9 +53,9 @@
     #define MR_LOCK(lck, from)      pthread_mutex_lock((lck))
     #define MR_UNLOCK(lck, from)    pthread_mutex_unlock((lck))
 
-    #define MR_SIGNAL(cnd)          pthread_cond_signal((cnd))
-    #define MR_BROADCAST(cnd)       pthread_cond_broadcast((cnd))
-    #define MR_WAIT(cnd, mtx)       pthread_cond_wait((cnd), (mtx))
+    #define MR_SIGNAL(cnd, from)    pthread_cond_signal((cnd))
+    #define MR_BROADCAST(cnd, from) pthread_cond_broadcast((cnd))
+    #define MR_WAIT(cnd, mtx, from) pthread_cond_wait((cnd), (mtx))
   #else
     #define MR_LOCK(lck, from)                          \
                 ( MR_debug_threads ?                    \
@@ -65,21 +70,21 @@
                     pthread_mutex_unlock((lck))         \
                 )
 
-    #define MR_SIGNAL(cnd)                              \
+    #define MR_SIGNAL(cnd, from)                        \
                 ( MR_debug_threads ?                    \
-                    MR_cond_signal((cnd))               \
+                    MR_cond_signal((cnd), (from))       \
                 :                                       \
                     pthread_cond_signal((cnd))          \
                 )
-    #define MR_BROADCAST(cnd)                           \
+    #define MR_BROADCAST(cnd, from)                     \
                 ( MR_debug_threads ?                    \
-                    MR_cond_broadcast((cnd))            \
+                    MR_cond_broadcast((cnd), (from))    \
                 :                                       \
                     pthread_cond_broadcast((cnd))       \
                 )
-    #define MR_WAIT(cnd, mtx)                           \
+    #define MR_WAIT(cnd, mtx, from)                     \
                 ( MR_debug_threads ?                    \
-                    MR_cond_wait((cnd), (mtx))          \
+                    MR_cond_wait((cnd), (mtx), (from))  \
                 :                                       \
                     pthread_cond_wait((cnd), (mtx))     \
                 )
@@ -155,9 +160,9 @@
   #define MR_LOCK(nothing, from)        do { } while (0)
   #define MR_UNLOCK(nothing, from)      do { } while (0)
 
-  #define MR_SIGNAL(nothing)            do { } while (0)
-  #define MR_BROADCAST(nothing)         do { } while (0)
-  #define MR_WAIT(no, thing)            (0)
+  #define MR_SIGNAL(nothing, from)      do { } while (0)
+  #define MR_BROADCAST(nothing, from)   do { } while (0)
+  #define MR_WAIT(no, thing, from)      (0)
 
   #define MR_OBTAIN_GLOBAL_LOCK(where)  do { } while (0)
   #define MR_RELEASE_GLOBAL_LOCK(where) do { } while (0)
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 489 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20091127/05a6f25d/attachment.sig>


More information about the reviews mailing list