[m-rev.] for review: don't terminate process until all threads done

Peter Wang wangp at students.csse.unimelb.edu.au
Mon Apr 30 15:24:29 AEST 2007


Branches: main

Prevent multi-threaded programs from terminating as soon as the main thread
terminates, i.e. the process should not terminate until all threads started by
thread.spawn/3 terminate.

This is done by maintaining a a global count of the number of threads started
by thread.spawn.  In low-level C grades the main context will suspend if it
reaches the global_success label and finds there are other contexts still
outstanding.  The last context to terminate then reschedules the main context
to resume.

Similarly, in high-level C grades the main thread waits on a condition
variable, which is signalled by the last thread to terminate.

library/thread.m:
runtime/mercury_context.c:
runtime/mercury_thread.c:
runtime/mercury_thread.h:
runtime/mercury_wrapper.c:
	As above.

	Add some extra assertions related to this.

tests/par_conj/Mmakefile:
tests/par_conj/thread_barrier.exp:
tests/par_conj/thread_barrier.m:
	Add test case


diff -u library/thread.m library/thread.m
--- library/thread.m	26 Apr 2007 05:17:17 -0000
+++ library/thread.m	30 Apr 2007 05:07:34 -0000
@@ -73,12 +73,19 @@
 #endif
 ").
 
+%-----------------------------------------------------------------------------%
+
 :- pragma foreign_proc("C",
     spawn(Goal::(pred(di, uo) is cc_multi), IO0::di, IO::uo),
     [promise_pure, will_not_call_mercury, thread_safe],
 "
 #if !defined(MR_HIGHLEVEL_CODE)
     MR_Context  *ctxt;
+
+    MR_LOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+    MR_thread_barrier_count++;
+    MR_UNLOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+
     ctxt = MR_create_context(""spawn"", MR_CONTEXT_SIZE_REGULAR, NULL);
     ctxt->MR_ctxt_resume = MR_ENTRY(mercury__thread__spawn_begin_thread);
     
@@ -135,6 +142,9 @@
 yield(!IO).
 
 %-----------------------------------------------------------------------------%
+%
+% Low-level C implementation
+%
 
 :- pragma foreign_decl("C",
 "
@@ -170,6 +180,21 @@
     }
     MR_define_label(mercury__thread__spawn_end_thread);
     {
+        MR_LOCK(&MR_thread_barrier_lock, ""thread__spawn_end_thread"");
+        MR_thread_barrier_count--;
+        if (MR_thread_barrier_count == 0) {
+            /*
+            ** If this is the last spawned context to terminate and the
+            ** main context was just waiting on us in order to terminate
+            ** then reschedule the main context.
+            */
+            if (MR_thread_barrier_context) {
+                MR_schedule_context(MR_thread_barrier_context);
+                MR_thread_barrier_context = NULL;
+            }
+        }
+        MR_UNLOCK(&MR_thread_barrier_lock, ""thread__spawn_end_thread"");
+
         MR_destroy_context(MR_ENGINE(MR_eng_this_context));
         MR_ENGINE(MR_eng_this_context) = NULL;
         MR_runnext();
@@ -210,16 +235,10 @@
     #endif
 ").
 
-:- pred call_back_to_mercury(pred(io, io), io, io).
-:- mode call_back_to_mercury(pred(di, uo) is cc_multi, di, uo) is cc_multi.
-:- pragma foreign_export("C",
-    call_back_to_mercury(pred(di, uo) is cc_multi, di, uo),
-    "ML_call_back_to_mercury_cc_multi").
-
-call_back_to_mercury(Goal, !IO) :-
-    Goal(!IO).
-
 %-----------------------------------------------------------------------------%
+%
+% High-level C implementation
+%
 
 :- pragma foreign_decl("C", "
 #if defined(MR_HIGHLEVEL_CODE) && defined(MR_THREAD_SAFE)
@@ -254,6 +273,10 @@
     args->thread_local_mutables =
         MR_clone_thread_local_mutables(MR_THREAD_LOCAL_MUTABLES);
 
+    MR_LOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+    MR_thread_barrier_count++;
+    MR_UNLOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+
     pthread_attr_init(&attrs);
     pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
     if (pthread_create(&thread, MR_THREAD_ATTR, ML_thread_wrapper, args)) {
@@ -286,11 +309,27 @@
 
     ML_call_back_to_mercury_cc_multi(goal);
 
+    MR_LOCK(&MR_thread_barrier_lock, ""ML_thread_wrapper"");
+    MR_thread_barrier_count--;
+    if (MR_thread_barrier_count == 0) {
+        MR_SIGNAL(&MR_thread_barrier_cond);
+    }
+    MR_UNLOCK(&MR_thread_barrier_lock, ""ML_thread_wrapper"");
+
     return NULL;
   }
 #endif /* MR_HIGHLEVEL_CODE && MR_THREAD_SAFE */
 ").
 
+:- pred call_back_to_mercury(pred(io, io), io, io).
+:- mode call_back_to_mercury(pred(di, uo) is cc_multi, di, uo) is cc_multi.
+:- pragma foreign_export("C",
+    call_back_to_mercury(pred(di, uo) is cc_multi, di, uo),
+    "ML_call_back_to_mercury_cc_multi").
+
+call_back_to_mercury(Goal, !IO) :-
+    Goal(!IO).
+
 %-----------------------------------------------------------------------------%
 
 :- pragma foreign_code("C#", "
diff -u tests/par_conj/Mmakefile tests/par_conj/Mmakefile
--- tests/par_conj/Mmakefile	26 Apr 2007 05:17:17 -0000
+++ tests/par_conj/Mmakefile	30 Apr 2007 05:07:34 -0000
@@ -73,7 +73,8 @@
 	# which was already set up in this directory.
 	#
 	THREAD_PROGS = \
-		spawn_many
+		spawn_many \
+		thread_barrier
 else
 	THREAD_PROGS =
 endif
only in patch2:
unchanged:
--- runtime/mercury_context.c	3 Mar 2007 03:43:34 -0000	1.55
+++ runtime/mercury_context.c	30 Apr 2007 05:07:34 -0000
@@ -89,6 +89,12 @@
   #endif
     MR_KEY_CREATE(&MR_exception_handler_key, NULL);
 
+    /* These are actually in mercury_thread.c. */
+    pthread_mutex_init(&MR_thread_barrier_lock, MR_MUTEX_ATTR);
+  #ifdef MR_HIGHLEVEL_CODE
+    pthread_cond_init(&MR_thread_barrier_cond, MR_COND_ATTR);
+  #endif
+
 #endif
 }
 
@@ -317,6 +323,9 @@
 {
     MR_assert(c);
 
+#ifdef MR_THREAD_SAFE
+    assert(c->MR_ctxt_saved_owners == NULL);
+#endif
 #ifndef MR_HIGHLEVEL_CODE
     MR_assert(c->MR_ctxt_spark_stack == NULL);
 #endif
only in patch2:
unchanged:
--- runtime/mercury_thread.c	17 Apr 2007 05:38:09 -0000	1.32
+++ runtime/mercury_thread.c	30 Apr 2007 05:07:34 -0000
@@ -34,6 +34,17 @@
 
 MR_Unsigned         MR_num_thread_local_mutables = 0;
 
+MR_Integer          MR_thread_barrier_count;
+#ifdef MR_THREAD_SAFE
+  MercuryLock       MR_thread_barrier_lock;
+  #ifdef MR_HIGHLEVEL_CODE
+    MercuryCond     MR_thread_barrier_cond;
+  #endif
+#endif
+#ifndef MR_HIGHLEVEL_CODE
+  MR_Context        *MR_thread_barrier_context;
+#endif
+
 #ifdef MR_THREAD_SAFE
 
 static void         *MR_create_thread_2(void *goal);
only in patch2:
unchanged:
--- runtime/mercury_thread.h	17 Apr 2007 05:38:09 -0000	1.22
+++ runtime/mercury_thread.h	30 Apr 2007 05:07:34 -0000
@@ -165,6 +165,21 @@
 #endif
 
 /*
+** These are used to prevent the process terminating as soon as the original
+** Mercury thread terminates.
+*/
+extern MR_Integer MR_thread_barrier_count;
+#ifdef MR_THREAD_SAFE
+  extern MercuryLock MR_thread_barrier_lock;
+  #ifdef MR_HIGHLEVEL_CODE
+    extern MercuryCond MR_thread_barrier_cond;
+  #endif
+#endif
+#ifndef MR_HIGHLEVEL_CODE
+  extern struct MR_Context_Struct *MR_thread_barrier_context;
+#endif
+
+/*
 ** The following enum is used as the argument to init_thread.
 ** MR_use_now should be passed to init_thread to indicate that
 ** it has been called in a context in which it should initialize
only in patch2:
unchanged:
--- runtime/mercury_wrapper.c	17 Apr 2007 05:38:10 -0000	1.181
+++ runtime/mercury_wrapper.c	30 Apr 2007 05:07:34 -0000
@@ -2358,6 +2358,16 @@
         }
     }
 
+  #if defined(MR_HIGHLEVEL_CODE) && defined(MR_THREAD_SAFE)
+    assert(pthread_self() == MR_primordial_thread);
+    MR_LOCK(&MR_thread_barrier_lock, "MR_do_interpreter");
+    while (MR_thread_barrier_count > 0) {
+        while (MR_WAIT(&MR_thread_barrier_cond, &MR_thread_barrier_lock) != 0) {
+        }
+    }
+    MR_UNLOCK(&MR_thread_barrier_lock, "MR_do_interpreter");
+  #endif
+
   #ifdef  MR_MPROF_PROFILE_TIME
     if (MR_profiling)  {
         MR_prof_turn_off_time_profiling();
@@ -2373,6 +2383,7 @@
 
 MR_define_extern_entry(MR_do_interpreter);
 MR_declare_label(global_success);
+MR_declare_label(global_success_2);
 MR_declare_label(global_fail);
 MR_declare_label(all_done);
 MR_declare_label(wrapper_not_reached);
@@ -2380,6 +2391,7 @@
 MR_BEGIN_MODULE(interpreter_module)
     MR_init_entry_an(MR_do_interpreter);
     MR_init_label_an(global_success);
+    MR_init_label_an(global_success_2);
     MR_init_label_an(global_fail);
     MR_init_label_an(all_done);
     MR_init_label_an(wrapper_not_reached);
@@ -2422,6 +2434,28 @@
     MR_noprof_call(MR_program_entry_point, MR_LABEL(global_success));
 
 MR_define_label(global_success);
+    /*
+    ** Don't let the original Mercury thread continue onto MR_global_success_2
+    ** until all other threads have terminated.
+    */
+    MR_LOCK(&MR_thread_barrier_lock, "global_success");
+    if (MR_thread_barrier_count == 0) {
+        MR_UNLOCK(&MR_thread_barrier_lock, "global_success");
+        MR_GOTO_LABEL(global_success_2);
+    } else {
+        MR_Context *this_ctxt;
+
+        this_ctxt = MR_ENGINE(MR_eng_this_context);
+        MR_save_context(this_ctxt);
+        this_ctxt->MR_ctxt_resume = MR_LABEL(global_success_2);
+        MR_thread_barrier_context = this_ctxt;
+        MR_UNLOCK(&MR_thread_barrier_lock, "global_success");
+
+        MR_ENGINE(MR_eng_this_context) = NULL;
+        MR_runnext();
+    }
+
+MR_define_label(global_success_2);
 #ifdef  MR_LOWLEVEL_DEBUG
     if (MR_finaldebug) {
         MR_save_transient_registers();
@@ -2451,6 +2485,10 @@
 #endif
 
 MR_define_label(all_done);
+    assert(MR_runqueue_head == NULL);
+#ifndef MR_HIGHLEVEL_CODE
+    assert(MR_spark_queue_head == NULL);
+#endif
 
 #ifdef  MR_MPROF_PROFILE_TIME
     if (MR_profiling) {
only in patch2:
unchanged:
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ tests/par_conj/thread_barrier.exp	30 Apr 2007 05:07:34 -0000
@@ -0,0 +1,11 @@
+loop
+loop
+loop
+loop
+loop
+loop
+loop
+loop
+loop
+loop
+fin.
only in patch2:
unchanged:
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ tests/par_conj/thread_barrier.m	30 Apr 2007 05:07:34 -0000
@@ -0,0 +1,38 @@
+% vim: ft=mercury ts=8 sw=4 et
+% Test that a multithread program doesn't terminate until all threads complete.
+
+:- module thread_barrier.
+:- interface.
+:- import_module io.
+
+:- pred main(io::di, io::uo) is cc_multi.
+
+:- implementation.
+
+:- import_module int.
+:- import_module thread.
+:- import_module thread.channel.
+:- import_module unit.
+
+main(!IO) :-
+    channel.init(Channel, !IO),
+    loop(Channel, 10, !IO).
+
+:- pred loop(channel(unit)::in, int::in, io::di, io::uo) is cc_multi.
+
+loop(Channel, N, !IO) :-
+    (if N > 0 then
+        io.write_string("loop\n", !IO),
+        thread.spawn(loop(Channel, N-1), !IO),
+        % Give the current thread something to do.
+        channel.put(Channel, unit, !IO)
+    else
+        true
+    ).
+
+:- finalize fin/2.
+:- pred fin(io::di, io::uo) is det.
+
+fin(!IO) :-
+    % This should appear last.
+    io.write_string("fin.\n", !IO).
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to:       mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions:          mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------



More information about the reviews mailing list