[m-rev.] for review: coroutining for parallel conjunctions

Peter Wang wangp at students.cs.mu.OZ.AU
Tue Jul 4 23:34:04 AEST 2006


On 2006-07-03, Julien Fischer <juliensf at cs.mu.OZ.AU> wrote:
> 
> On Fri, 30 Jun 2006, Peter Wang wrote:
> 
> > By the way, is anyone using asm_fast.par.gc on saturn and earth?
> > Otherwise I'll change those installations to asm_jump.par.gc, which
> > don't suffer from the random errors (somehow related to gcc global
> > registers, GC and whether you link with pthreads statically or not
> > -- wtf?).
> 
> Have you tried this on anything other than Linux?

No.  I'm having enough trouble coping with all these grades ;-)

> > +        /*
> > +        ** Save this context and put it on the list of suspended contexts for
> > +        ** this future.
> > +        */
> > +        MR_save_context(MR_ENGINE(MR_eng_this_context));
> > +        MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume = &&wait_resume;
> 
> Does taking the address of a label work in the none grades? (and the reg
> ones for that matter).

Oops, thanks for catching it.  I just copied it from
extras/concurrency/semaphore.m.  Here's another attempt: (trimmed)


Add coroutining support for dependent parallel conjunctions in lowlevel
parallel grades.

library/par_builtin.m:
	Change definitions of synchronisation primitives so that waiting on a
	future causes the current context to be suspended.  Signalling a
	future causes all the contexts waiting on the future to be scheduled.

runtime/mercury_context.c:
runtime/mercury_thread.c:
runtime/mercury_thread.h:
runtime/mercury_wrapper.c:
	Add a global `MR_primordial_thread' to hold the thread id of the
	primordial thread.

	Add sanity checks, in particular that the primordial thread does not
	exit like other threads as it needs to clean up the Mercury runtime.

tests/par_conj/Mmakefile:
	Actually run dependent parallel conjunction tests since they should
	no longer deadlock.

tests/par_conj/*.exp:
	Add expected outputs for test cases which didn't have them.


Index: library/par_builtin.m
===================================================================
RCS file: /home/mercury1/repository/mercury/library/par_builtin.m,v
retrieving revision 1.1
diff -u -r1.1 par_builtin.m
--- library/par_builtin.m	28 Jun 2006 04:46:19 -0000	1.1
+++ library/par_builtin.m	4 Jul 2006 13:16:12 -0000
@@ -11,8 +11,8 @@
 % Stability: low.
 
 % This file is automatically imported, as if via `use_module', into every
-% module in parallel grades.  It is intended for the builtin procedures that
-% the compiler generates implicit calls to when implementing parallel
+% module in lowlevel parallel grades.  It is intended for the builtin procedures
+% that the compiler generates implicit calls to when implementing parallel
 % conjunctions.
 %
 % This module is a private part of the Mercury implementation; user modules
@@ -55,27 +55,21 @@
 
 :- pragma foreign_decl("C",
 "
+    #include ""mercury_context.h""
+    #include ""mercury_thread.h""
+
     typedef struct MR_Future MR_Future;
 
 #ifdef MR_THREAD_SAFE
-# ifdef MR_HAVE_SEMAPHORE_H
-    /* POSIX 1003.1b semaphores available. */
-    #include <semaphore.h>
-
     struct MR_Future {
-        sem_t   semaphore;
+        MercuryLock lock;
+            /* lock preventing concurrent accesses */
+        int signalled;
+            /* whether this future has been signalled yet */
+        MR_Context *suspended;
+            /* linked list of all the contexts blocked on this future */
         MR_Word value;
     };
-# else /* !MR_HAVE_SEMAPHORE_H */
-    /* Use POSIX thread mutexes and condition variables. */
-    #include <pthread.h>
-
-    struct MR_Future {
-        pthread_mutex_t mutex;
-        pthread_cond_t cond;
-        MR_Word value;
-    };
-# endif /* !MR_HAVE_SEMAPHORE_H */
 #else /* !MR_THREAD_SAFE */
     struct MR_Future {
     };
@@ -92,17 +86,53 @@
     new_future(Future::uo),
     [will_not_call_mercury, promise_pure, thread_safe, will_not_modify_trail],
 "
-#ifdef MR_THREAD_SAFE
-# ifdef MR_HAVE_SEMAPHORE_H
-    Future = MR_GC_NEW(MR_Future);
-    sem_init(&Future->semaphore, MR_NO, 0);
-    Future->value = 0;
-# else
-    Future = MR_GC_NEW(MR_Future);
-    pthread_mutex_init(&Future->mutex, NULL);
-    pthread_cond_init(&Future->cond, NULL);
+#if (!defined MR_HIGHLEVEL_CODE) && (defined MR_THREAD_SAFE)
+
+    MR_Word fut_addr;
+
+    MR_incr_hp(fut_addr, MR_round_up(sizeof(MR_Future), sizeof(MR_Word)));
+    Future = (MR_Future *) fut_addr;
+
+    pthread_mutex_init(&(Future->lock), MR_MUTEX_ATTR);
+
+    /*
+    ** The mutex needs to be destroyed when the future is garbage collected.
+    ** For efficiency we might want to ignore this altogether, e.g. on Linux
+    ** pthread_mutex_destroy() only checks that the mutex is unlocked.
+    */
+  #ifdef MR_CONSERVATIVE_GC
+    GC_REGISTER_FINALIZER(Future, MR_finalize_future, NULL, NULL, NULL);
+  #endif
+
+    Future->signalled = 0;
+    Future->suspended = NULL;
     Future->value = 0;
-# endif
+
+#else
+
+    MR_fatal_error(""internal error: par_builtin should only be used by ""
+        ""lowlevel parallel grades"");
+
+#endif
+").
+
+:- pragma foreign_decl("C", "
+#ifdef MR_CONSERVATIVE_GC
+    void MR_finalize_future(GC_PTR obj, GC_PTR cd);
+#endif
+").
+
+:- pragma foreign_code("C", "
+#ifdef MR_CONSERVATIVE_GC
+    void
+    MR_finalize_future(GC_PTR obj, GC_PTR cd)
+    {
+        MR_Future *fut = (MR_Future *) obj;
+
+      #ifdef MR_THREAD_SAFE
+        pthread_mutex_destroy(&(fut->lock));
+      #endif
+    }
 #endif
 ").
 
@@ -110,19 +140,119 @@
     wait(Future::in, Value::out),
     [will_not_call_mercury, promise_pure, thread_safe, will_not_modify_trail],
 "
-#ifdef MR_THREAD_SAFE
-# ifdef MR_HAVE_SEMAPHORE_H
-    sem_wait(&Future->semaphore);
-    sem_post(&Future->semaphore);
-    Value = Future->value;
-# else
-    pthread_mutex_lock(&Future->mutex);
-    while (!Future->pass) {
-        pthread_cond_wait(&Future->cond, &Future->mutex);
+#if (!defined MR_HIGHLEVEL_CODE) && (defined MR_THREAD_SAFE)
+
+    MR_LOCK(&(Future->lock), ""future.wait"");
+
+    if (Future->signalled) {
+        Value = Future->value;
+        MR_UNLOCK(&(Future->lock), ""future.wait"");
+    } else {
+        MR_Context *ctxt;
+        MercuryThreadList *new_element;
+
+        /*
+        ** The address of the future can be lost when we resume so save it on
+        ** top of the stack.
+        */
+        MR_incr_sp(1);
+        MR_sv(1) = (MR_Word) Future;
+
+        ctxt = MR_ENGINE(MR_eng_this_context);
+
+        /*
+        ** Mark the current context as being owned by this thread to prevent it
+        ** from being resumed by another thread. Specifically we don't want the
+        ** 'main' context to be resumed by any thread other than the primordial
+        ** thread, because after the primordial thread finishes executing the
+        ** main program it has to clean up the Mercury runtime.
+        **
+        ** XXX this solution seems too heavy for the problem at hand
+        */
+        MR_ENGINE(MR_eng_c_depth)++;
+
+        new_element = MR_GC_NEW(MercuryThreadList);
+        new_element->thread = ctxt->MR_ctxt_owner_thread;
+        new_element->next = MR_ENGINE(MR_eng_saved_owners);
+        MR_ENGINE(MR_eng_saved_owners) = new_element;
+
+        ctxt->MR_ctxt_owner_thread = MR_ENGINE(MR_eng_owner_thread);
+
+        /*
+        ** Save this context and put it on the list of suspended contexts for
+        ** this future.
+        */
+        MR_save_context(ctxt);
+        ctxt->MR_ctxt_resume = MR_ENTRY_AP(par_builtin__wait_resume);
+        ctxt->MR_ctxt_next = Future->suspended;
+        Future->suspended = ctxt;
+
+        MR_UNLOCK(&(Future->lock), ""future.wait"");
+        MR_runnext();
+
+        assert(0);
     }
-    Value = Future->value;
-    pthread_mutex_unlock(&Future->mutex);
-# endif
+
+#else
+
+    MR_fatal_error(""internal error: par_builtin.wait"");
+    Value = -1;
+
+#endif
+").
+
+    % `wait_resume' is the piece of code we jump to when a thread suspended
+    % on a future resumes after the future is signalled.
+    % 
+:- pragma foreign_decl("C",
+"
+#if (!defined MR_HIGHLEVEL_CODE) && (defined MR_THREAD_SAFE)
+    MR_declare_entry(mercury__par_builtin__wait_resume);
+#endif
+").
+
+:- pragma foreign_code("C",
+"
+#if (!defined MR_HIGHLEVEL_CODE) && (defined MR_THREAD_SAFE)
+
+    MR_BEGIN_MODULE(par_builtin_wait_resume)
+    MR_BEGIN_CODE
+    MR_define_entry(mercury__par_builtin__wait_resume);
+    {
+        MR_Future *Future;
+
+        /* Restore the address of the future after resuming. */
+        Future = (MR_Future *) MR_sv(1);
+        MR_decr_sp(1);
+
+        assert(Future->signalled == 1);
+
+        /* Restore the owning thread in the current context. */
+        assert(MR_ENGINE(MR_eng_this_context)->MR_ctxt_owner_thread
+            == MR_ENGINE(MR_eng_owner_thread));
+        MR_ENGINE(MR_eng_c_depth)--;
+        {
+            MercuryThreadList *tmp;
+            MercuryThread val;
+
+            tmp = MR_ENGINE(MR_eng_saved_owners);
+            if (tmp != NULL)
+            {
+                val = tmp->thread;
+                MR_ENGINE(MR_eng_saved_owners) = tmp->next;
+                MR_GC_free(tmp);
+            } else {
+                val = 0;
+            }
+            MR_ENGINE(MR_eng_this_context)->MR_ctxt_owner_thread = val;
+        }
+
+        /* Return to the caller of par_builtin.wait. */
+        MR_r1 = Future->value;
+        MR_proceed();
+    }
+    MR_END_MODULE
+
 #endif
 ").
 
@@ -130,16 +260,32 @@
     signal(Future::in, Value::in),
     [will_not_call_mercury, thread_safe, will_not_modify_trail],
 "
-#ifdef MR_THREAD_SAFE
-# ifdef MR_HAVE_SEMAPHORE_H
+#if (!defined MR_HIGHLEVEL_CODE) && (defined MR_THREAD_SAFE)
+
+    MR_Context *ctxt;
+
+    MR_LOCK(&(Future->lock), ""future.signal"");
+
+    assert(Future->signalled == 0);
+    Future->signalled++;
     Future->value = Value;
-    sem_post(&Future->semaphore);
-# else
-    pthread_mutex_lock(&Future->mutex);
-    Value = Future->value;
-    pthread_cond_broadcast(&Future->cond);
-    pthread_mutex_unlock(&Future->mutex);
-# endif
+
+    /* Schedule all the contexts which are blocking on this future. */
+    ctxt = Future->suspended;
+    while (ctxt != NULL) {
+        MR_schedule(ctxt);
+        ctxt = ctxt->MR_ctxt_next;
+    }
+    Future->suspended = NULL;
+
+    MR_UNLOCK(&(Future->lock), ""future.signal"");
+
+    assert(Future->signalled == 1);
+
+#else
+
+    MR_fatal_error(""internal error: par_builtin.signal"");
+
 #endif
 ").
 
Index: runtime/mercury_context.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.c,v
retrieving revision 1.46
diff -u -r1.46 mercury_context.c
--- runtime/mercury_context.c	4 Jul 2006 04:46:38 -0000	1.46
+++ runtime/mercury_context.c	4 Jul 2006 10:21:27 -0000
@@ -387,6 +387,11 @@
 
     while (1) {
         if (MR_exit_now == MR_TRUE) {
+            /*
+            ** The primordial thread has the responsibility of cleaning
+            ** up the Mercury runtime.  It cannot exit by this route.
+            */
+            assert(thd != MR_primordial_thread);
             MR_UNLOCK(&MR_runqueue_lock, "MR_do_runnext (ii)");
             MR_destroy_thread(MR_cur_engine());
         }
Index: runtime/mercury_thread.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.c,v
retrieving revision 1.24
diff -u -r1.24 mercury_thread.c
--- runtime/mercury_thread.c	4 Jul 2006 04:46:38 -0000	1.24
+++ runtime/mercury_thread.c	4 Jul 2006 10:31:12 -0000
@@ -18,6 +18,8 @@
 #include <errno.h>
 
 #ifdef  MR_THREAD_SAFE
+  MercuryThread     MR_primordial_thread;
+
   MercuryThreadKey  MR_exception_handler_key;
   #ifdef MR_THREAD_LOCAL_STORAGE
     __thread MercuryEngine *MR_thread_engine_base;
@@ -41,6 +43,8 @@
     pthread_attr_t  attrs;
     int             err;
 
+    assert(MR_primordial_thread != (MercuryThread) 0);
+
     thread = MR_GC_NEW(MercuryThread);
     pthread_attr_init(&attrs);
     err = pthread_create(thread, &attrs, MR_create_thread_2, (void *) goal);
Index: runtime/mercury_thread.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_thread.h,v
retrieving revision 1.18
diff -u -r1.18 mercury_thread.h
--- runtime/mercury_thread.h	29 Jun 2006 03:53:34 -0000	1.18
+++ runtime/mercury_thread.h	4 Jul 2006 10:22:43 -0000
@@ -122,6 +122,11 @@
   extern MR_bool	MR_exit_now;
 
   /*
+  ** The primordial thread. Currently used for debugging.
+  */
+  extern MercuryThread MR_primordial_thread;
+
+  /*
   ** MR_global_lock is a mutex for ensuring that only one non-threadsafe
   ** piece of pragma c code executes at a time. If `not_threadsafe' is
   ** given or `threadsafe' is not given in the attributes of a pragma
Index: runtime/mercury_wrapper.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.c,v
retrieving revision 1.161
diff -u -r1.161 mercury_wrapper.c
--- runtime/mercury_wrapper.c	20 Mar 2006 23:43:17 -0000	1.161
+++ runtime/mercury_wrapper.c	4 Jul 2006 10:52:33 -0000
@@ -532,6 +532,7 @@
 #ifdef MR_THREAD_SAFE
     /* MR_init_thread_stuff() must be called prior to MR_init_memory() */
     MR_init_thread_stuff();
+    MR_primordial_thread = pthread_self();
 #endif
 
     /*
@@ -2287,6 +2288,12 @@
   #ifdef MR_THREAD_SAFE
     MR_exit_now = MR_TRUE;
     pthread_cond_broadcast(&MR_runqueue_cond);
+
+    assert(MR_primordial_thread == pthread_self());
+    MR_primordial_thread = (MercuryThread) 0;
+
+    /* XXX seems to be needed or short programs may have no output */
+    fflush(stdout);
   #endif
 #endif
 
--------------------------------------------------------------------------
mercury-reviews mailing list
post:  mercury-reviews at cs.mu.oz.au
administrative address: owner-mercury-reviews at cs.mu.oz.au
unsubscribe: Address: mercury-reviews-request at cs.mu.oz.au Message: unsubscribe
subscribe:   Address: mercury-reviews-request at cs.mu.oz.au Message: subscribe
--------------------------------------------------------------------------



More information about the reviews mailing list