[m-rev.] for review: don't terminate process until all threads done
Peter Wang
wangp at students.csse.unimelb.edu.au
Mon Apr 30 15:24:29 AEST 2007
Branches: main
Prevent multi-threaded programs from terminating as soon as the main thread
terminates, i.e. the process should not terminate until all threads started by
thread.spawn/3 terminate.
This is done by maintaining a a global count of the number of threads started
by thread.spawn. In low-level C grades the main context will suspend if it
reaches the global_success label and finds there are other contexts still
outstanding. The last context to terminate then reschedules the main context
to resume.
Similarly, in high-level C grades the main thread waits on a condition
variable, which is signalled by the last thread to terminate.
library/thread.m:
runtime/mercury_context.c:
runtime/mercury_thread.c:
runtime/mercury_thread.h:
runtime/mercury_wrapper.c:
As above.
Add some extra assertions related to this.
tests/par_conj/Mmakefile:
tests/par_conj/thread_barrier.exp:
tests/par_conj/thread_barrier.m:
Add test case
diff -u library/thread.m library/thread.m
--- library/thread.m 26 Apr 2007 05:17:17 -0000
+++ library/thread.m 30 Apr 2007 05:07:34 -0000
@@ -73,12 +73,19 @@
#endif
").
+%-----------------------------------------------------------------------------%
+
:- pragma foreign_proc("C",
spawn(Goal::(pred(di, uo) is cc_multi), IO0::di, IO::uo),
[promise_pure, will_not_call_mercury, thread_safe],
"
#if !defined(MR_HIGHLEVEL_CODE)
MR_Context *ctxt;
+
+ MR_LOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+ MR_thread_barrier_count++;
+ MR_UNLOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+
ctxt = MR_create_context(""spawn"", MR_CONTEXT_SIZE_REGULAR, NULL);
ctxt->MR_ctxt_resume = MR_ENTRY(mercury__thread__spawn_begin_thread);
@@ -135,6 +142,9 @@
yield(!IO).
%-----------------------------------------------------------------------------%
+%
+% Low-level C implementation
+%
:- pragma foreign_decl("C",
"
@@ -170,6 +180,21 @@
}
MR_define_label(mercury__thread__spawn_end_thread);
{
+ MR_LOCK(&MR_thread_barrier_lock, ""thread__spawn_end_thread"");
+ MR_thread_barrier_count--;
+ if (MR_thread_barrier_count == 0) {
+ /*
+ ** If this is the last spawned context to terminate and the
+ ** main context was just waiting on us in order to terminate
+ ** then reschedule the main context.
+ */
+ if (MR_thread_barrier_context) {
+ MR_schedule_context(MR_thread_barrier_context);
+ MR_thread_barrier_context = NULL;
+ }
+ }
+ MR_UNLOCK(&MR_thread_barrier_lock, ""thread__spawn_end_thread"");
+
MR_destroy_context(MR_ENGINE(MR_eng_this_context));
MR_ENGINE(MR_eng_this_context) = NULL;
MR_runnext();
@@ -210,16 +235,10 @@
#endif
").
-:- pred call_back_to_mercury(pred(io, io), io, io).
-:- mode call_back_to_mercury(pred(di, uo) is cc_multi, di, uo) is cc_multi.
-:- pragma foreign_export("C",
- call_back_to_mercury(pred(di, uo) is cc_multi, di, uo),
- "ML_call_back_to_mercury_cc_multi").
-
-call_back_to_mercury(Goal, !IO) :-
- Goal(!IO).
-
%-----------------------------------------------------------------------------%
+%
+% High-level C implementation
+%
:- pragma foreign_decl("C", "
#if defined(MR_HIGHLEVEL_CODE) && defined(MR_THREAD_SAFE)
@@ -254,6 +273,10 @@
args->thread_local_mutables =
MR_clone_thread_local_mutables(MR_THREAD_LOCAL_MUTABLES);
+ MR_LOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+ MR_thread_barrier_count++;
+ MR_UNLOCK(&MR_thread_barrier_lock, ""thread.spawn"");
+
pthread_attr_init(&attrs);
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
if (pthread_create(&thread, MR_THREAD_ATTR, ML_thread_wrapper, args)) {
@@ -286,11 +309,27 @@
ML_call_back_to_mercury_cc_multi(goal);
+ MR_LOCK(&MR_thread_barrier_lock, ""ML_thread_wrapper"");
+ MR_thread_barrier_count--;
+ if (MR_thread_barrier_count == 0) {
+ MR_SIGNAL(&MR_thread_barrier_cond);
+ }
+ MR_UNLOCK(&MR_thread_barrier_lock, ""ML_thread_wrapper"");
+
return NULL;
}
#endif /* MR_HIGHLEVEL_CODE && MR_THREAD_SAFE */
").
+:- pred call_back_to_mercury(pred(io, io), io, io).
+:- mode call_back_to_mercury(pred(di, uo) is cc_multi, di, uo) is cc_multi.
+:- pragma foreign_export("C",
+ call_back_to_mercury(pred(di, uo) is cc_multi, di, uo),
+ "ML_call_back_to_mercury_cc_multi").
+
+call_back_to_mercury(Goal, !IO) :-
+ Goal(!IO).
+
%-----------------------------------------------------------------------------%
:- pragma foreign_code("C#", "
diff -u tests/par_conj/Mmakefile tests/par_conj/Mmakefile
--- tests/par_conj/Mmakefile 26 Apr 2007 05:17:17 -0000
+++ tests/par_conj/Mmakefile 30 Apr 2007 05:07:34 -0000
@@ -73,7 +73,8 @@
# which was already set up in this directory.
#
THREAD_PROGS = \
- spawn_many
+ spawn_many \
+ thread_barrier
else
THREAD_PROGS =
endif
only in patch2:
unchanged:
--- runtime/mercury_context.c 3 Mar 2007 03:43:34 -0000 1.55
+++ runtime/mercury_context.c 30 Apr 2007 05:07:34 -0000
@@ -89,6 +89,12 @@
#endif
MR_KEY_CREATE(&MR_exception_handler_key, NULL);
+ /* These are actually in mercury_thread.c. */
+ pthread_mutex_init(&MR_thread_barrier_lock, MR_MUTEX_ATTR);
+ #ifdef MR_HIGHLEVEL_CODE
+ pthread_cond_init(&MR_thread_barrier_cond, MR_COND_ATTR);
+ #endif
+
#endif
}
@@ -317,6 +323,9 @@
{
MR_assert(c);
+#ifdef MR_THREAD_SAFE
+ assert(c->MR_ctxt_saved_owners == NULL);
+#endif
#ifndef MR_HIGHLEVEL_CODE
MR_assert(c->MR_ctxt_spark_stack == NULL);
#endif
only in patch2:
unchanged:
--- runtime/mercury_thread.c 17 Apr 2007 05:38:09 -0000 1.32
+++ runtime/mercury_thread.c 30 Apr 2007 05:07:34 -0000
@@ -34,6 +34,17 @@
MR_Unsigned MR_num_thread_local_mutables = 0;
+MR_Integer MR_thread_barrier_count;
+#ifdef MR_THREAD_SAFE
+ MercuryLock MR_thread_barrier_lock;
+ #ifdef MR_HIGHLEVEL_CODE
+ MercuryCond MR_thread_barrier_cond;
+ #endif
+#endif
+#ifndef MR_HIGHLEVEL_CODE
+ MR_Context *MR_thread_barrier_context;
+#endif
+
#ifdef MR_THREAD_SAFE
static void *MR_create_thread_2(void *goal);
only in patch2:
unchanged:
--- runtime/mercury_thread.h 17 Apr 2007 05:38:09 -0000 1.22
+++ runtime/mercury_thread.h 30 Apr 2007 05:07:34 -0000
@@ -165,6 +165,21 @@
#endif
/*
+** These are used to prevent the process terminating as soon as the original
+** Mercury thread terminates.
+*/
+extern MR_Integer MR_thread_barrier_count;
+#ifdef MR_THREAD_SAFE
+ extern MercuryLock MR_thread_barrier_lock;
+ #ifdef MR_HIGHLEVEL_CODE
+ extern MercuryCond MR_thread_barrier_cond;
+ #endif
+#endif
+#ifndef MR_HIGHLEVEL_CODE
+ extern struct MR_Context_Struct *MR_thread_barrier_context;
+#endif
+
+/*
** The following enum is used as the argument to init_thread.
** MR_use_now should be passed to init_thread to indicate that
** it has been called in a context in which it should initialize
only in patch2:
unchanged:
--- runtime/mercury_wrapper.c 17 Apr 2007 05:38:10 -0000 1.181
+++ runtime/mercury_wrapper.c 30 Apr 2007 05:07:34 -0000
@@ -2358,6 +2358,16 @@
}
}
+ #if defined(MR_HIGHLEVEL_CODE) && defined(MR_THREAD_SAFE)
+ assert(pthread_self() == MR_primordial_thread);
+ MR_LOCK(&MR_thread_barrier_lock, "MR_do_interpreter");
+ while (MR_thread_barrier_count > 0) {
+ while (MR_WAIT(&MR_thread_barrier_cond, &MR_thread_barrier_lock) != 0) {
+ }
+ }
+ MR_UNLOCK(&MR_thread_barrier_lock, "MR_do_interpreter");
+ #endif
+
#ifdef MR_MPROF_PROFILE_TIME
if (MR_profiling) {
MR_prof_turn_off_time_profiling();
@@ -2373,6 +2383,7 @@
MR_define_extern_entry(MR_do_interpreter);
MR_declare_label(global_success);
+MR_declare_label(global_success_2);
MR_declare_label(global_fail);
MR_declare_label(all_done);
MR_declare_label(wrapper_not_reached);
@@ -2380,6 +2391,7 @@
MR_BEGIN_MODULE(interpreter_module)
MR_init_entry_an(MR_do_interpreter);
MR_init_label_an(global_success);
+ MR_init_label_an(global_success_2);
MR_init_label_an(global_fail);
MR_init_label_an(all_done);
MR_init_label_an(wrapper_not_reached);
@@ -2422,6 +2434,28 @@
MR_noprof_call(MR_program_entry_point, MR_LABEL(global_success));
MR_define_label(global_success);
+ /*
+ ** Don't let the original Mercury thread continue onto MR_global_success_2
+ ** until all other threads have terminated.
+ */
+ MR_LOCK(&MR_thread_barrier_lock, "global_success");
+ if (MR_thread_barrier_count == 0) {
+ MR_UNLOCK(&MR_thread_barrier_lock, "global_success");
+ MR_GOTO_LABEL(global_success_2);
+ } else {
+ MR_Context *this_ctxt;
+
+ this_ctxt = MR_ENGINE(MR_eng_this_context);
+ MR_save_context(this_ctxt);
+ this_ctxt->MR_ctxt_resume = MR_LABEL(global_success_2);
+ MR_thread_barrier_context = this_ctxt;
+ MR_UNLOCK(&MR_thread_barrier_lock, "global_success");
+
+ MR_ENGINE(MR_eng_this_context) = NULL;
+ MR_runnext();
+ }
+
+MR_define_label(global_success_2);
#ifdef MR_LOWLEVEL_DEBUG
if (MR_finaldebug) {
MR_save_transient_registers();
@@ -2451,6 +2485,10 @@
#endif
MR_define_label(all_done);
+ assert(MR_runqueue_head == NULL);
+#ifndef MR_HIGHLEVEL_CODE
+ assert(MR_spark_queue_head == NULL);
+#endif
#ifdef MR_MPROF_PROFILE_TIME
if (MR_profiling) {
only in patch2:
unchanged:
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ tests/par_conj/thread_barrier.exp 30 Apr 2007 05:07:34 -0000
@@ -0,0 +1,11 @@
+loop
+loop
+loop
+loop
+loop
+loop
+loop
+loop
+loop
+loop
+fin.
only in patch2:
unchanged:
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ tests/par_conj/thread_barrier.m 30 Apr 2007 05:07:34 -0000
@@ -0,0 +1,38 @@
+% vim: ft=mercury ts=8 sw=4 et
+% Test that a multithread program doesn't terminate until all threads complete.
+
+:- module thread_barrier.
+:- interface.
+:- import_module io.
+
+:- pred main(io::di, io::uo) is cc_multi.
+
+:- implementation.
+
+:- import_module int.
+:- import_module thread.
+:- import_module thread.channel.
+:- import_module unit.
+
+main(!IO) :-
+ channel.init(Channel, !IO),
+ loop(Channel, 10, !IO).
+
+:- pred loop(channel(unit)::in, int::in, io::di, io::uo) is cc_multi.
+
+loop(Channel, N, !IO) :-
+ (if N > 0 then
+ io.write_string("loop\n", !IO),
+ thread.spawn(loop(Channel, N-1), !IO),
+ % Give the current thread something to do.
+ channel.put(Channel, unit, !IO)
+ else
+ true
+ ).
+
+:- finalize fin/2.
+:- pred fin(io::di, io::uo) is det.
+
+fin(!IO) :-
+ % This should appear last.
+ io.write_string("fin.\n", !IO).
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to: mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions: mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------
More information about the reviews
mailing list