[m-rev.] for review: Add interface for joinable threads.
Peter Wang
novalazy at gmail.com
Thu Feb 22 14:19:51 AEDT 2024
It is often necessary to be certain that a thread has terminated before
the rest of the program can continue. In some cases, signalling that a
thread is ABOUT to terminate (using any synchronisation device)
is insufficient: even after the last piece of Mercury code has run,
the thread still has additional cleanup code to run, which might include
arbitrary code in thread-specific data destructors, etc.
Introduce a predicate to create a joinable native thread,
and a predicate to wait for that thread to terminate (joining).
Joinable threads are only implemented for C backends for now.
library/thread.m:
Add new predicates spawn_native_joinable and join_thread.
Add a internal type thread_handle.
Rename the internal type thread_id type to thread_desc,
as thread_id is too similar to thread_handle.
tests/hard_coded/Mmakefile:
tests/hard_coded/spawn_native_joinable.exp:
tests/hard_coded/spawn_native_joinable.exp2:
tests/hard_coded/spawn_native_joinable.m:
Add test case.
NEWS.md:
Announce changes.
(The diff is made with diff -b)
diff --git a/NEWS.md b/NEWS.md
index c0b5de5c5..310f55842 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -1049,6 +1049,13 @@ Changes to the Mercury standard library
* This new module contains predicates and functions that find
variables in terms.
+### Changes to the `thread` module
+
+* The following predicate have been added:
+
+ - pred `spawn_native_joinable/5`
+ - pred `join_thread/4`
+
### Changes to the `tree_bitset` module
* The following predicates have been added:
diff --git a/library/thread.m b/library/thread.m
index 01b19e528..19474d41c 100644
--- a/library/thread.m
+++ b/library/thread.m
@@ -3,7 +3,7 @@
%---------------------------------------------------------------------------%
% Copyright (C) 2000-2001, 2003-2004, 2006-2008, 2010-2011 The University
% of Melbourne.
-% Copyright (C) 2014-2022 The Mercury Team.
+% Copyright (C) 2014-2024 The Mercury team.
% This file is distributed under the terms specified in COPYING.LIB.
%---------------------------------------------------------------------------%
%
@@ -35,10 +35,14 @@
%---------------------------------------------------------------------------%
- % Abstract type representing a thread.
+ % Abstract type representing a detached thread.
%
:- type thread.
+ % Abstract type representing a joinable thread.
+ %
+:- type joinable_thread(T).
+
% can_spawn succeeds if spawn/4 is supported in the current grade.
%
:- pred can_spawn is semidet.
@@ -111,6 +115,34 @@
:- pred spawn_native(pred(thread, io, io)::in(pred(in, di, uo) is cc_multi),
thread_options::in, maybe_error(thread)::out, io::di, io::uo) is cc_multi.
+ % spawn_native_joinable(Closure, Options, Res, IO0, IO);
+ %
+ % Create a joinable native thread (like spawn_native), then perform Closure
+ % in that thread. Another thread can call join_thread/4 to wait for the
+ % thread to terminate, and fetch the output returned by Closure.
+ % The thread will continue to take up system resources until it terminates
+ % and has been joined by a call to join_thread/4.
+ %
+ % The Java and C# backends do not yet support joinable threads.
+ %
+:- pred spawn_native_joinable(
+ pred(joinable_thread(T), T, io, io)::in(pred(in, out, di, uo) is cc_multi),
+ thread_options::in, maybe_error(joinable_thread(T))::out, io::di, io::uo)
+ is cc_multi.
+
+ % join_thread(Thread, Res, !IO):
+ %
+ % Wait for the specified thread to terminate. If the thread has already
+ % terminated, join_thread/4 will return immediately. On success, Res will
+ % be ok(Output) where Output is the value returned by the closure
+ % performed on that thread.
+ %
+ % A thread must only be joined once. If multiple threads simultaneously
+ % try to join with the same thread, the results are undefined.
+ %
+:- pred join_thread(joinable_thread(T)::in, maybe_error(T)::out,
+ io::di, io::uo) is cc_multi.
+
% yield(IO0, IO) is logically equivalent to (IO = IO0) but
% operationally, yields the Mercury engine to some other thread
% if one exists.
@@ -141,6 +173,7 @@
:- implementation.
:- import_module bool.
+:- import_module mutvar.
:- import_module require.
:- pragma foreign_decl("C", "
@@ -173,13 +206,32 @@ import jmercury.runtime.Task;
min_stack_size :: uint
).
- % The thread id is not formally exposed yet but allows different thread
- % handles to compare unequal.
- %
:- type thread
- ---> thread(thread_id).
+ ---> detached_thread(thread_desc).
-:- type thread_id == string.
+:- type joinable_thread(T)
+ ---> joinable_thread(
+ jt_handle :: thread_handle,
+ jt_mutvar :: mutvar(T)
+ ).
+
+ % A descriptor for a (detached) Mercury thread.
+ % thread_desc values are not publicly exported, but they may help with
+ % debugging by printing and/or comparing of 'thread' values. There is no
+ % guarantee that a thread descriptor remains unique after a thread
+ % terminates, as the memory address used to derive the descriptor
+ % may be reused.
+ %
+:- type thread_desc == string.
+
+ % A thread handle from the underlying thread API.
+ % The C# and Java grades currently use strings for this type but
+ % that will need to change to support joinable threads in those grades.
+ %
+:- type thread_handle.
+:- pragma foreign_type("C", thread_handle, "pthread_t").
+:- pragma foreign_type("C#", thread_handle, "string").
+:- pragma foreign_type("Java", thread_handle, "java.lang.String").
%---------------------------------------------------------------------------%
@@ -258,17 +310,17 @@ spawn(Goal, Res, !IO) :-
maybe_error(thread)::out, io::di, io::uo) is cc_multi.
spawn_context(Goal, Res, !IO) :-
- spawn_context_2(Goal, Success, ThreadId, !IO),
+ spawn_context_2(Goal, Success, ThreadDesc, !IO),
(
Success = yes,
- Res = ok(thread(ThreadId))
+ Res = ok(detached_thread(ThreadDesc))
;
Success = no,
Res = error("Unable to spawn threads in this grade.")
).
:- pred spawn_context_2(pred(thread, io, io)::in(pred(in, di, uo) is cc_multi),
- bool::out, string::out, io::di, io::uo) is cc_multi.
+ bool::out, thread_desc::out, io::di, io::uo) is cc_multi.
spawn_context_2(_, Res, "", !IO) :-
( Res = no
@@ -277,7 +329,7 @@ spawn_context_2(_, Res, "", !IO) :-
:- pragma foreign_proc("C",
spawn_context_2(Goal::in(pred(in, di, uo) is cc_multi), Success::out,
- ThreadId::out, _IO0::di, _IO::uo),
+ ThreadDesc::out, _IO0::di, _IO::uo),
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
may_not_duplicate],
"
@@ -294,15 +346,15 @@ spawn_context_2(_, Res, "", !IO) :-
tlm = MR_clone_thread_local_mutables(MR_THREAD_LOCAL_MUTABLES);
ctxt->MR_ctxt_thread_local_mutables = tlm;
- // Derive a thread id from the address of the thread-local mutable vector
- // for the Mercury thread. It should actually be more unique than a
+ // Derive a thread descriptor from the address of the thread-local mutable
+ // vector for the Mercury thread. It should actually be more unique than a
// context address as contexts are kept around and reused.
- ThreadId = MR_make_string(MR_ALLOC_ID, ""%p"", tlm);
+ ThreadDesc = MR_make_string(MR_ALLOC_ID, ""%p"", tlm);
- // Store Goal and ThreadId on the top of the new context's stack.
+ // Store Goal and ThreadDesc on the top of the new context's stack.
ctxt->MR_ctxt_sp += 2;
ctxt->MR_ctxt_sp[0] = Goal; // MR_stackvar(1)
- ctxt->MR_ctxt_sp[-1] = (MR_Word) ThreadId; // MR_stackvar(2)
+ ctxt->MR_ctxt_sp[-1] = (MR_Word) ThreadDesc; // MR_stackvar(2)
MR_schedule_context(ctxt);
@@ -311,21 +363,21 @@ spawn_context_2(_, Res, "", !IO) :-
#else // MR_HIGHLEVEL_CODE
{
Success = MR_FALSE;
- ThreadId = MR_make_string_const("""");
+ ThreadDesc = MR_make_string_const("""");
}
#endif // MR_HIGHLEVEL_CODE
").
:- pragma foreign_proc("Java",
spawn_context_2(Goal::in(pred(in, di, uo) is cc_multi), Success::out,
- ThreadId::out, _IO0::di, _IO::uo),
+ ThreadDesc::out, _IO0::di, _IO::uo),
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
may_not_duplicate],
"
RunGoal rg = new RunGoal((Object[]) Goal);
Task task = new Task(rg);
- ThreadId = String.valueOf(task.getId());
- rg.setId(ThreadId);
+ ThreadDesc = String.valueOf(task.getId());
+ rg.setThreadDesc(ThreadDesc);
JavaInternal.getThreadPool().submit(task);
Success = bool.YES;
").
@@ -344,39 +396,47 @@ spawn_native(Goal, Res, !IO) :-
spawn_native(Goal, Options, Res, !IO) :-
Options = thread_options(MinStackSize),
- spawn_native_2(Goal, MinStackSize, Success, ThreadId, ErrorMsg, !IO),
+ Dummy = 0, % for the typeinfo
+ spawn_native_2(Goal, Dummy, MinStackSize, Success, ThreadDesc, ErrorMsg,
+ !IO),
(
Success = yes,
- Res = ok(thread(ThreadId))
+ Res = ok(detached_thread(ThreadDesc))
;
Success = no,
Res = error(ErrorMsg)
).
:- pred spawn_native_2(pred(thread, io, io)::in(pred(in, di, uo) is cc_multi),
- uint::in, bool::out, thread_id::out, string::out,
+ T::unused, uint::in, bool::out, thread_desc::out, string::out,
io::di, io::uo) is cc_multi.
:- pragma foreign_proc("C",
- spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), MinStackSize::in,
- Success::out, ThreadId::out, ErrorMsg::out, _IO0::di, _IO::uo),
+ spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _T::unused,
+ MinStackSize::in, Success::out, ThreadDesc::out, ErrorMsg::out,
+ _IO0::di, _IO::uo),
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
may_not_duplicate],
"
#ifdef MR_THREAD_SAFE
- Success = ML_create_exclusive_thread(Goal, MinStackSize, &ThreadId,
- &ErrorMsg, MR_ALLOC_ID);
+ pthread_t thread_handle;
+
+ // Pass 0 for joinable_thread_mutvar to create a detached thread.
+ Success = ML_create_exclusive_thread(TypeInfo_for_T, Goal,
+ MinStackSize, (MR_Word) 0, &ThreadDesc, &thread_handle, &ErrorMsg,
+ MR_ALLOC_ID);
#else
Success = MR_FALSE;
- ThreadId = MR_make_string_const("""");
+ ThreadDesc = MR_make_string_const("""");
ErrorMsg = MR_make_string_const(
""Cannot create native thread in this grade."");
#endif
").
:- pragma foreign_proc("C#",
- spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _MinStackSize::in,
- Success::out, ThreadId::out, ErrorMsg::out, _IO0::di, _IO::uo),
+ spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _T::unused,
+ _MinStackSize::in, Success::out, ThreadDesc::out, ErrorMsg::out,
+ _IO0::di, _IO::uo),
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
may_not_duplicate],
"
@@ -385,33 +445,34 @@ spawn_native(Goal, Options, Res, !IO) :-
MercuryThread mt = new MercuryThread(Goal, thread_locals);
System.Threading.Thread thread = new System.Threading.Thread(
new System.Threading.ThreadStart(mt.run));
- ThreadId = thread.ManagedThreadId.ToString();
- mt.setThreadId(ThreadId);
+ ThreadDesc = thread.ManagedThreadId.ToString();
+ mt.setThreadDesc(ThreadDesc);
thread.Start();
Success = mr_bool.YES;
ErrorMsg = """";
} catch (System.Threading.ThreadStartException e) {
Success = mr_bool.NO;
- ThreadId = """";
+ ThreadDesc = """";
ErrorMsg = e.Message;
} catch (System.SystemException e) {
// Seen with mono.
Success = mr_bool.NO;
- ThreadId = """";
+ ThreadDesc = """";
ErrorMsg = e.Message;
}
").
:- pragma foreign_proc("Java",
- spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _MinStackSize::in,
- Success::out, ThreadId::out, ErrorMsg::out, _IO0::di, _IO::uo),
+ spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _T::unused,
+ _MinStackSize::in, Success::out, ThreadDesc::out, ErrorMsg::out,
+ _IO0::di, _IO::uo),
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
may_not_duplicate],
"
RunGoal rg = new RunGoal((Object[]) Goal);
Task task = new Task(rg);
- ThreadId = String.valueOf(task.getId());
- rg.setId(ThreadId);
+ ThreadDesc = String.valueOf(task.getId());
+ rg.setThreadDesc(ThreadDesc);
try {
JavaInternal.getThreadPool().submitExclusiveThread(task);
Success = bool.YES;
@@ -424,12 +485,129 @@ spawn_native(Goal, Options, Res, !IO) :-
ErrorMsg = e.getMessage();
}
if (Success == bool.NO && ErrorMsg == null) {
- ErrorMsg = ""unable to create new native thread"";
+ ErrorMsg = ""Unable to create new native thread."";
}
").
%---------------------------------------------------------------------------%
+spawn_native_joinable(Goal, Options, Res, !IO) :-
+ Options = thread_options(MinStackSize),
+ promise_pure (
+ impure new_mutvar0(OutputMutvar),
+ spawn_native_joinable_2(Goal, MinStackSize, OutputMutvar,
+ Success, ThreadHandle, ErrorMsg, !IO)
+ ),
+ (
+ Success = yes,
+ Res = ok(joinable_thread(ThreadHandle, OutputMutvar))
+ ;
+ Success = no,
+ Res = error(ErrorMsg)
+ ).
+
+:- pred spawn_native_joinable_2(
+ pred(joinable_thread(T), T, io, io)::in(pred(in, out, di, uo) is cc_multi),
+ uint::in, mutvar(T)::in, bool::out, thread_handle::out, string::out,
+ io::di, io::uo) is cc_multi.
+
+:- pragma foreign_proc("C",
+ spawn_native_joinable_2(Goal::in(pred(in, out, di, uo) is cc_multi),
+ MinStackSize::in, OutputMutvar::in,
+ Success::out, ThreadHandle::out, ErrorMsg::out, _IO0::di, _IO::uo),
+ [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+#ifdef MR_THREAD_SAFE
+ MR_String thread_desc;
+
+ Success = ML_create_exclusive_thread(TypeInfo_for_T, Goal,
+ MinStackSize, OutputMutvar, &thread_desc, &ThreadHandle, &ErrorMsg,
+ MR_ALLOC_ID);
+#else
+ Success = MR_FALSE;
+ ThreadHandle = 0;
+ ErrorMsg = MR_make_string_const(
+ ""Cannot create joinable thread in this grade."");
+#endif
+").
+
+:- pragma foreign_proc("C#",
+ spawn_native_joinable_2(_Goal::in(pred(in, out, di, uo) is cc_multi),
+ _MinStackSize::in, _OutputMutvar::in,
+ Success::out, ThreadHandle::out, ErrorMsg::out, _IO0::di, _IO::uo),
+ [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+ Success = mr_bool.NO;
+ ThreadHandle = null;
+ ErrorMsg = ""Cannot create joinable thread in this grade."";
+").
+
+:- pragma foreign_proc("Java",
+ spawn_native_joinable_2(_Goal::in(pred(in, out, di, uo) is cc_multi),
+ _MinStackSize::in, _OutputMutvar::in,
+ Success::out, ThreadHandle::out, ErrorMsg::out, _IO0::di, _IO::uo),
+ [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+ Success = bool.NO;
+ ThreadHandle = null;
+ ErrorMsg = ""Cannot create joinable thread in this grade."";
+").
+
+%---------------------------------------------------------------------------%
+
+join_thread(Thread, Res, !IO) :-
+ Thread = joinable_thread(ThreadHandle, OutputMutvar),
+ promise_pure (
+ join_thread_2(ThreadHandle, Success, ErrorMsg, !IO),
+ (
+ Success = yes,
+ impure get_mutvar(OutputMutvar, Output),
+ Res0 = ok(Output)
+ ;
+ Success = no,
+ Res0 = error(ErrorMsg)
+ )
+ ),
+ cc_multi_equal(Res0, Res).
+
+:- pred join_thread_2(thread_handle::in, bool::out, string::out,
+ io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+ join_thread_2(ThreadHandle::in, Success::out, ErrorMsg::out,
+ _IO0::di, _IO::uo),
+ [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+#ifdef MR_THREAD_SAFE
+ int err;
+ char errbuf[MR_STRERROR_BUF_SIZE];
+
+ err = pthread_join(ThreadHandle, NULL);
+ if (err == 0) {
+ Success = MR_YES;
+ ErrorMsg = MR_make_string_const("""");
+ } else {
+ Success = MR_NO;
+ ErrorMsg = MR_make_string(MR_ALLOC_ID, ""pthread_join failed: %s"",
+ MR_strerror(err, errbuf, sizeof(errbuf)));
+ }
+#else
+ Success = MR_NO;
+ ErrorMsg = MR_make_string_const(
+ ""Native threads are not supported in this grade."");
+#endif
+").
+
+join_thread_2(_ThreadHandle, Success, ErrorMsg, !IO) :-
+ Success = no,
+ ErrorMsg = "Joinable threads not supported in this grade.".
+
+%---------------------------------------------------------------------------%
+
:- pragma no_inline(pred(yield/2)).
:- pragma foreign_proc("C",
yield(_IO0::di, _IO::uo),
@@ -507,7 +685,7 @@ INIT mercury_sys_init_thread_modules
{
// Call the closure placed the top of the stack.
MR_r1 = MR_stackvar(1); // Goal
- MR_r2 = MR_stackvar(2); // ThreadId
+ MR_r2 = MR_stackvar(2); // ThreadDesc
MR_decr_sp(2);
MR_noprof_call(MR_ENTRY(mercury__do_call_closure_1),
MR_LABEL(mercury__thread__spawn_end_thread));
@@ -566,46 +744,53 @@ INIT mercury_sys_init_thread_modules
:- pragma foreign_decl("C", local, "
#if defined(MR_THREAD_SAFE)
- #include <pthread.h>
- static MR_bool ML_create_exclusive_thread(MR_Word goal,
- size_t min_stack_size, MR_String *thread_id,
+#include <pthread.h>
+
+static MR_bool ML_create_exclusive_thread(MR_Word typeinfo_for_T, MR_Word goal,
+ size_t min_stack_size, MR_Word joinable_thread_mutvar,
+ MR_String *thread_desc, pthread_t *thread_handle,
MR_String *error_msg, MR_AllocSiteInfoPtr alloc_id);
- static void *ML_exclusive_thread_wrapper(void *arg);
+static void *ML_exclusive_thread_wrapper(void *arg);
- typedef struct ML_ThreadWrapperArgs ML_ThreadWrapperArgs;
- struct ML_ThreadWrapperArgs {
+typedef struct ML_ThreadWrapperArgs ML_ThreadWrapperArgs;
+struct ML_ThreadWrapperArgs {
MercuryLock mutex;
MercuryCond cond;
+ MR_Word typeinfo_for_T;
MR_Word goal;
MR_ThreadLocalMuts *thread_local_mutables;
+ MR_Word joinable_thread_mutvar; // 0 for detached thread
MR_Integer thread_state;
- MR_String thread_id;
- };
+ MR_String thread_desc;
+ pthread_t thread_handle;
+};
- enum {
+enum {
ML_THREAD_NOT_READY,
ML_THREAD_READY,
ML_THREAD_START_ERROR
- };
+};
#endif // MR_THREAD_SAFE
").
:- pragma foreign_code("C", "
#if defined(MR_THREAD_SAFE)
- static MR_bool
- ML_create_exclusive_thread(MR_Word goal, size_t min_stack_size,
- MR_String *thread_id, MR_String *error_msg,
- MR_AllocSiteInfoPtr alloc_id)
- {
+static MR_bool
+ML_create_exclusive_thread(MR_Word typeinfo_for_T, MR_Word goal,
+ size_t min_stack_size, MR_Word joinable_thread_mutvar,
+ MR_String *thread_desc, pthread_t *thread_handle,
+ MR_String *error_msg, MR_AllocSiteInfoPtr alloc_id)
+{
ML_ThreadWrapperArgs args;
pthread_t thread;
pthread_attr_t attrs;
int err;
char errbuf[MR_STRERROR_BUF_SIZE];
- *thread_id = MR_make_string_const("""");
+ *thread_desc = MR_make_string_const("""");
+ *thread_handle = 0;
*error_msg = MR_make_string_const("""");
ML_incr_thread_barrier_count();
@@ -618,13 +803,18 @@ INIT mercury_sys_init_thread_modules
pthread_mutex_init(&args.mutex, MR_MUTEX_ATTR);
pthread_cond_init(&args.cond, MR_COND_ATTR);
+ args.typeinfo_for_T = typeinfo_for_T;
args.goal = goal;
args.thread_local_mutables =
MR_clone_thread_local_mutables(MR_THREAD_LOCAL_MUTABLES);
+ args.joinable_thread_mutvar = joinable_thread_mutvar;
+ // These fields will be updated by the newly created thread.
args.thread_state = ML_THREAD_NOT_READY;
- args.thread_id = NULL;
+ args.thread_desc = NULL;
+ args.thread_handle = 0;
pthread_attr_init(&attrs);
+ if (joinable_thread_mutvar == 0) {
err = pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
if (err != 0) {
*error_msg = MR_make_string(alloc_id,
@@ -632,6 +822,7 @@ INIT mercury_sys_init_thread_modules
MR_strerror(errno, errbuf, sizeof(errbuf)));
goto failed_to_create_thread;
}
+ }
if (min_stack_size > 0) {
err = pthread_attr_setstacksize(&attrs, min_stack_size);
if (err != 0) {
@@ -673,19 +864,23 @@ failed_to_create_thread:
pthread_mutex_destroy(&args.mutex);
if (args.thread_state == ML_THREAD_READY) {
- *thread_id = args.thread_id;
+ *thread_desc = args.thread_desc;
+ *thread_handle = args.thread_handle;
return MR_TRUE;
}
ML_decr_thread_barrier_count();
return MR_FALSE;
- }
+}
- static void *ML_exclusive_thread_wrapper(void *arg)
- {
+static void *ML_exclusive_thread_wrapper(void *arg)
+{
ML_ThreadWrapperArgs *args = arg;
+ MR_Word typeinfo_for_T;
MR_Word goal;
- MR_String thread_id;
+ MR_Word joinable_thread_mutvar;
+ MR_String thread_desc;
+ pthread_t thread_handle;
if (MR_init_thread(MR_use_now) == MR_FALSE) {
MR_LOCK(&args->mutex, ""ML_exclusive_thread_wrapper"");
@@ -703,44 +898,80 @@ failed_to_create_thread:
MR_assert(MR_THREAD_LOCAL_MUTABLES == NULL);
MR_SET_THREAD_LOCAL_MUTABLES(args->thread_local_mutables);
- thread_id = MR_make_string(MR_ALLOC_SITE_RUNTIME,
- ""%"" MR_INTEGER_LENGTH_MODIFIER ""x"", MR_SELF_THREAD_ID);
-
- // Take a copy of the goal before telling the parent we are ready.
+ // Take a copy of args fields.
+ typeinfo_for_T = args->typeinfo_for_T;
goal = args->goal;
+ joinable_thread_mutvar = args->joinable_thread_mutvar;
+ thread_desc = MR_make_string(MR_ALLOC_SITE_RUNTIME,
+ ""%"" MR_INTEGER_LENGTH_MODIFIER ""x"", MR_SELF_THREAD_ID);
+ thread_handle = pthread_self();
+
+ // Tell the 'parent' we are ready, passing back a thread descriptor and
+ // thread handle.
MR_LOCK(&args->mutex, ""ML_exclusive_thread_wrapper"");
args->thread_state = ML_THREAD_READY;
- args->thread_id = thread_id;
+ args->thread_desc = thread_desc;
+ args->thread_handle = thread_handle;
MR_COND_SIGNAL(&args->cond, ""ML_exclusive_thread_wrapper"");
MR_UNLOCK(&args->mutex, ""ML_exclusive_thread_wrapper"");
+ // We must not dereference args after this point.
- ML_call_back_to_mercury_cc_multi(goal, thread_id);
+ if (joinable_thread_mutvar == 0) {
+ ML_call_back_to_mercury_detached_cc_multi(goal, thread_desc);
+ } else {
+ ML_call_back_to_mercury_joinable_cc_multi(typeinfo_for_T, goal,
+ thread_handle, joinable_thread_mutvar);
+ }
MR_finalize_thread_engine();
ML_decr_thread_barrier_count();
return NULL;
- }
+}
#endif // MR_THREAD_SAFE
").
-:- pred call_back_to_mercury(
+:- pred call_back_to_mercury_detached(
pred(thread, io, io)::in(pred(in, di, uo) is cc_multi),
- thread_id::in, io::di, io::uo) is cc_multi.
+ thread_desc::in, io::di, io::uo) is cc_multi.
:- pragma foreign_export("C",
- call_back_to_mercury(in(pred(in, di, uo) is cc_multi), in, di, uo),
- "ML_call_back_to_mercury_cc_multi").
+ call_back_to_mercury_detached(in(pred(in, di, uo) is cc_multi),
+ in, di, uo), "ML_call_back_to_mercury_detached_cc_multi").
:- pragma foreign_export("C#",
- call_back_to_mercury(in(pred(in, di, uo) is cc_multi), in, di, uo),
- "ML_call_back_to_mercury_cc_multi").
+ call_back_to_mercury_detached(in(pred(in, di, uo) is cc_multi),
+ in, di, uo), "ML_call_back_to_mercury_detached_cc_multi").
:- pragma foreign_export("Java",
- call_back_to_mercury(in(pred(in, di, uo) is cc_multi), in, di, uo),
- "ML_call_back_to_mercury_cc_multi").
+ call_back_to_mercury_detached(in(pred(in, di, uo) is cc_multi),
+ in, di, uo), "ML_call_back_to_mercury_detached_cc_multi").
+:- pragma no_inline(pred(call_back_to_mercury_detached/4)).
-call_back_to_mercury(Goal, ThreadId, !IO) :-
- Goal(thread(ThreadId), !IO).
+call_back_to_mercury_detached(Goal, ThreadDesc, !IO) :-
+ Thread = detached_thread(ThreadDesc),
+ Goal(Thread, !IO).
+
+:- pred call_back_to_mercury_joinable(
+ pred(joinable_thread(T), T, io, io)::in(pred(in, out, di, uo) is cc_multi),
+ thread_handle::in, mutvar(T)::in, io::di, io::uo) is cc_multi.
+:- pragma foreign_export("C",
+ call_back_to_mercury_joinable(in(pred(in, out, di, uo) is cc_multi),
+ in, in, di, uo), "ML_call_back_to_mercury_joinable_cc_multi").
+:- pragma no_inline(pred(call_back_to_mercury_joinable/5)).
+
+call_back_to_mercury_joinable(Goal, ThreadHandle, OutputMutvar, !IO) :-
+ Thread = joinable_thread(ThreadHandle, OutputMutvar),
+ promise_pure (
+ Goal(Thread, Output, !IO),
+ % Store a reference to the output term in a mutvar that is in turn
+ % referenced from a joinable_thread term. If we simply returned the
+ % output term as the return value of the pthread_create start routine
+ % (ML_exclusive_thread_wrapper), it might be possible that the last
+ % reference to the term resides only in some GC-inaccessible memory
+ % in the pthread implementation, and therefore could be collected
+ % before join_thread retrieves the value.
+ impure set_mutvar(OutputMutvar, Output)
+ ).
%---------------------------------------------------------------------------%
@@ -794,47 +1025,47 @@ call_back_to_mercury(Goal, ThreadId, !IO) :-
:- pragma foreign_code("C#", "
private class MercuryThread {
- private object[] Goal;
+ private object[] goal;
private object[] thread_local_mutables;
- private string ThreadId;
+ private string thread_desc;
- internal MercuryThread(object[] g, object[] tlmuts)
+ internal MercuryThread(object[] goal, object[] tlmuts)
{
- Goal = g;
- thread_local_mutables = tlmuts;
+ this.goal = goal;
+ this.thread_local_mutables = tlmuts;
}
- internal void setThreadId(string id)
+ internal void setThreadDesc(string thread_desc)
{
- ThreadId = id;
+ this.thread_desc = thread_desc;
}
internal void run()
{
runtime.ThreadLocalMutables.set_array(thread_local_mutables);
- thread.ML_call_back_to_mercury_cc_multi(Goal, ThreadId);
+ thread.ML_call_back_to_mercury_detached_cc_multi(goal, thread_desc);
}
}").
:- pragma foreign_code("Java", "
public static class RunGoal implements Runnable {
private final Object[] goal;
- private String id;
+ private String thread_desc;
- private RunGoal(Object[] g)
+ private RunGoal(Object[] goal)
{
- goal = g;
- id = null;
+ this.goal = goal;
+ this.thread_desc = null;
}
- private void setId(String id)
+ private void setThreadDesc(String thread_desc)
{
- this.id = id;
+ this.thread_desc = thread_desc;
}
public void run()
{
- thread.ML_call_back_to_mercury_cc_multi(goal, id);
+ thread.ML_call_back_to_mercury_detached_cc_multi(goal, thread_desc);
}
}").
diff --git a/tests/hard_coded/Mmakefile b/tests/hard_coded/Mmakefile
index bf8ffafc0..2fb0b8162 100644
--- a/tests/hard_coded/Mmakefile
+++ b/tests/hard_coded/Mmakefile
@@ -370,6 +370,7 @@ ORDINARY_PROGS = \
solve_quadratic \
space \
spawn_native \
+ spawn_native_joinable \
special_char \
stable_sort \
static_no_tag \
diff --git a/tests/hard_coded/spawn_native_joinable.exp b/tests/hard_coded/spawn_native_joinable.exp
new file mode 100644
index 000000000..b7ee1b54a
--- /dev/null
+++ b/tests/hard_coded/spawn_native_joinable.exp
@@ -0,0 +1,6 @@
+a start
+b start
+a stop
+b stop
+a join result: ok({"a", 1234})
+b join result: ok({"b", 1234})
diff --git a/tests/hard_coded/spawn_native_joinable.exp2 b/tests/hard_coded/spawn_native_joinable.exp2
new file mode 100644
index 000000000..c5b9e2b4e
--- /dev/null
+++ b/tests/hard_coded/spawn_native_joinable.exp2
@@ -0,0 +1 @@
+spawn_native_joinable not supported
diff --git a/tests/hard_coded/spawn_native_joinable.m b/tests/hard_coded/spawn_native_joinable.m
new file mode 100644
index 000000000..33e7fdfd2
--- /dev/null
+++ b/tests/hard_coded/spawn_native_joinable.m
@@ -0,0 +1,110 @@
+%---------------------------------------------------------------------------%
+% vim: ts=4 sw=4 et ft=mercury
+%---------------------------------------------------------------------------%
+
+:- module spawn_native_joinable.
+:- interface.
+
+:- import_module io.
+
+:- pred main(io::di, io::uo) is cc_multi.
+
+%---------------------------------------------------------------------------%
+%---------------------------------------------------------------------------%
+
+:- implementation.
+
+:- import_module list.
+:- import_module maybe.
+:- import_module string.
+:- import_module thread.
+
+:- pragma foreign_decl("C", local, "
+ #include <time.h>
+").
+
+%---------------------------------------------------------------------------%
+
+main(!IO) :-
+ ( if can_spawn_native then
+ Options = init_thread_options,
+ spawn_native_joinable(thread_proc("a"), Options, SpawnResA, !IO),
+ msleep(100, !IO),
+ spawn_native_joinable(thread_proc("b"), Options, SpawnResB, !IO),
+ (
+ SpawnResA = ok(ThreadA),
+ (
+ SpawnResB = ok(ThreadB),
+ join_thread(ThreadA, JoinResA, !IO),
+ join_thread(ThreadB, JoinResB, !IO),
+
+ io.write_string("a join result: ", !IO),
+ io.print_line(JoinResA, !IO),
+
+ io.write_string("b join result: ", !IO),
+ io.print_line(JoinResB, !IO)
+ ;
+ SpawnResB = error(ErrorB),
+ io.print_line(ErrorB, !IO),
+ join_thread(ThreadA, _JoinResA, !IO)
+ )
+ ;
+ SpawnResA = error(ErrorA),
+ io.print_line(ErrorA, !IO)
+ )
+ else
+ io.write_string("spawn_native_joinable not supported\n", !IO)
+ ).
+
+:- type thread_output == {string, int}.
+
+:- pred thread_proc(string::in, joinable_thread(thread_output)::in,
+ thread_output::out, io::di, io::uo) is cc_multi.
+
+thread_proc(Id, _Thread, Output, !IO) :-
+ io.write_string(Id ++ " start\n", !IO),
+ msleep(500, !IO),
+ cc_multi_equal({Id, 1234}, Output),
+ io.write_string(Id ++ " stop\n", !IO).
+
+%---------------------------------------------------------------------------%
+
+:- pred msleep(int::in, io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+ msleep(Msecs::in, _IO0::di, _IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+#ifdef MR_WIN32
+ Sleep(Msecs);
+#else
+{
+ struct timespec req;
+ int err;
+
+ req.tv_sec = 0;
+ req.tv_nsec = Msecs * 1000000;
+ do {
+ err = nanosleep(&req, &req);
+ } while (err == -1 && errno == EINTR);
+}
+#endif
+").
+
+:- pragma foreign_proc("C#",
+ msleep(Msecs::in, _IO0::di, _IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io],
+"
+ System.Threading.Thread.Sleep(Msecs);
+").
+
+:- pragma foreign_proc("Java",
+ msleep(Msecs::in, _IO0::di, _IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io],
+"
+ try {
+ Thread.sleep(Msecs);
+ } catch (InterruptedException e) {
+ }
+").
--
2.43.0
More information about the reviews
mailing list