[m-rev.] for review: Add interface for joinable threads.

Peter Wang novalazy at gmail.com
Thu Feb 22 14:19:51 AEDT 2024


It is often necessary to be certain that a thread has terminated before
the rest of the program can continue. In some cases, signalling that a
thread is ABOUT to terminate (using any synchronisation device)
is insufficient: even after the last piece of Mercury code has run,
the thread still has additional cleanup code to run, which might include
arbitrary code in thread-specific data destructors, etc.

Introduce a predicate to create a joinable native thread,
and a predicate to wait for that thread to terminate (joining).

Joinable threads are only implemented for C backends for now.

library/thread.m:
    Add new predicates spawn_native_joinable and join_thread.

    Add a internal type thread_handle.

    Rename the internal type thread_id type to thread_desc,
    as thread_id is too similar to thread_handle.

tests/hard_coded/Mmakefile:
tests/hard_coded/spawn_native_joinable.exp:
tests/hard_coded/spawn_native_joinable.exp2:
tests/hard_coded/spawn_native_joinable.m:
    Add test case.

NEWS.md:
    Announce changes.

(The diff is made with diff -b)

diff --git a/NEWS.md b/NEWS.md
index c0b5de5c5..310f55842 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -1049,6 +1049,13 @@ Changes to the Mercury standard library
 * This new module contains predicates and functions that find
   variables in terms.
 
+### Changes to the `thread` module
+
+* The following predicate have been added:
+
+    - pred `spawn_native_joinable/5`
+    - pred `join_thread/4`
+
 ### Changes to the `tree_bitset` module
 
 * The following predicates have been added:
diff --git a/library/thread.m b/library/thread.m
index 01b19e528..19474d41c 100644
--- a/library/thread.m
+++ b/library/thread.m
@@ -3,7 +3,7 @@
 %---------------------------------------------------------------------------%
 % Copyright (C) 2000-2001, 2003-2004, 2006-2008, 2010-2011 The University
 % of Melbourne.
-% Copyright (C) 2014-2022 The Mercury Team.
+% Copyright (C) 2014-2024 The Mercury team.
 % This file is distributed under the terms specified in COPYING.LIB.
 %---------------------------------------------------------------------------%
 %
@@ -35,10 +35,14 @@
 
 %---------------------------------------------------------------------------%
 
-    % Abstract type representing a thread.
+    % Abstract type representing a detached thread.
     %
 :- type thread.
 
+    % Abstract type representing a joinable thread.
+    %
+:- type joinable_thread(T).
+
     % can_spawn succeeds if spawn/4 is supported in the current grade.
     %
 :- pred can_spawn is semidet.
@@ -111,6 +115,34 @@
 :- pred spawn_native(pred(thread, io, io)::in(pred(in, di, uo) is cc_multi),
     thread_options::in, maybe_error(thread)::out, io::di, io::uo) is cc_multi.
 
+    % spawn_native_joinable(Closure, Options, Res, IO0, IO);
+    %
+    % Create a joinable native thread (like spawn_native), then perform Closure
+    % in that thread. Another thread can call join_thread/4 to wait for the
+    % thread to terminate, and fetch the output returned by Closure.
+    % The thread will continue to take up system resources until it terminates
+    % and has been joined by a call to join_thread/4.
+    %
+    % The Java and C# backends do not yet support joinable threads.
+    %
+:- pred spawn_native_joinable(
+    pred(joinable_thread(T), T, io, io)::in(pred(in, out, di, uo) is cc_multi),
+    thread_options::in, maybe_error(joinable_thread(T))::out, io::di, io::uo)
+    is cc_multi.
+
+    % join_thread(Thread, Res, !IO):
+    %
+    % Wait for the specified thread to terminate. If the thread has already
+    % terminated, join_thread/4 will return immediately. On success, Res will
+    % be ok(Output) where Output is the value returned by the closure
+    % performed on that thread.
+    %
+    % A thread must only be joined once. If multiple threads simultaneously
+    % try to join with the same thread, the results are undefined.
+    %
+:- pred join_thread(joinable_thread(T)::in, maybe_error(T)::out,
+    io::di, io::uo) is cc_multi.
+
     % yield(IO0, IO) is logically equivalent to (IO = IO0) but
     % operationally, yields the Mercury engine to some other thread
     % if one exists.
@@ -141,6 +173,7 @@
 :- implementation.
 
 :- import_module bool.
+:- import_module mutvar.
 :- import_module require.
 
 :- pragma foreign_decl("C", "
@@ -173,13 +206,32 @@ import jmercury.runtime.Task;
                 min_stack_size  :: uint
             ).
 
-    % The thread id is not formally exposed yet but allows different thread
-    % handles to compare unequal.
-    %
 :- type thread
-    --->    thread(thread_id).
+    --->    detached_thread(thread_desc).
 
-:- type thread_id == string.
+:- type joinable_thread(T)
+    --->    joinable_thread(
+                jt_handle   :: thread_handle,
+                jt_mutvar   :: mutvar(T)
+            ).
+
+    % A descriptor for a (detached) Mercury thread.
+    % thread_desc values are not publicly exported, but they may help with
+    % debugging by printing and/or comparing of 'thread' values. There is no
+    % guarantee that a thread descriptor remains unique after a thread
+    % terminates, as the memory address used to derive the descriptor
+    % may be reused.
+    %
+:- type thread_desc == string.
+
+    % A thread handle from the underlying thread API.
+    % The C# and Java grades currently use strings for this type but
+    % that will need to change to support joinable threads in those grades.
+    %
+:- type thread_handle.
+:- pragma foreign_type("C", thread_handle, "pthread_t").
+:- pragma foreign_type("C#", thread_handle, "string").
+:- pragma foreign_type("Java", thread_handle, "java.lang.String").
 
 %---------------------------------------------------------------------------%
 
@@ -258,17 +310,17 @@ spawn(Goal, Res, !IO) :-
     maybe_error(thread)::out, io::di, io::uo) is cc_multi.
 
 spawn_context(Goal, Res, !IO) :-
-    spawn_context_2(Goal, Success, ThreadId, !IO),
+    spawn_context_2(Goal, Success, ThreadDesc, !IO),
     (
         Success = yes,
-        Res = ok(thread(ThreadId))
+        Res = ok(detached_thread(ThreadDesc))
     ;
         Success = no,
         Res = error("Unable to spawn threads in this grade.")
     ).
 
 :- pred spawn_context_2(pred(thread, io, io)::in(pred(in, di, uo) is cc_multi),
-    bool::out, string::out, io::di, io::uo) is cc_multi.
+    bool::out, thread_desc::out, io::di, io::uo) is cc_multi.
 
 spawn_context_2(_, Res, "", !IO) :-
     ( Res = no
@@ -277,7 +329,7 @@ spawn_context_2(_, Res, "", !IO) :-
 
 :- pragma foreign_proc("C",
     spawn_context_2(Goal::in(pred(in, di, uo) is cc_multi), Success::out,
-        ThreadId::out, _IO0::di, _IO::uo),
+        ThreadDesc::out, _IO0::di, _IO::uo),
     [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
         may_not_duplicate],
 "
@@ -294,15 +346,15 @@ spawn_context_2(_, Res, "", !IO) :-
     tlm = MR_clone_thread_local_mutables(MR_THREAD_LOCAL_MUTABLES);
     ctxt->MR_ctxt_thread_local_mutables = tlm;
 
-    // Derive a thread id from the address of the thread-local mutable vector
-    // for the Mercury thread. It should actually be more unique than a
+    // Derive a thread descriptor from the address of the thread-local mutable
+    // vector for the Mercury thread. It should actually be more unique than a
     // context address as contexts are kept around and reused.
-    ThreadId = MR_make_string(MR_ALLOC_ID, ""%p"", tlm);
+    ThreadDesc = MR_make_string(MR_ALLOC_ID, ""%p"", tlm);
 
-    // Store Goal and ThreadId on the top of the new context's stack.
+    // Store Goal and ThreadDesc on the top of the new context's stack.
     ctxt->MR_ctxt_sp += 2;
     ctxt->MR_ctxt_sp[0] = Goal;                     // MR_stackvar(1)
-    ctxt->MR_ctxt_sp[-1] = (MR_Word) ThreadId;  // MR_stackvar(2)
+    ctxt->MR_ctxt_sp[-1] = (MR_Word) ThreadDesc;    // MR_stackvar(2)
 
     MR_schedule_context(ctxt);
 
@@ -311,21 +363,21 @@ spawn_context_2(_, Res, "", !IO) :-
 #else // MR_HIGHLEVEL_CODE
 {
     Success = MR_FALSE;
-    ThreadId = MR_make_string_const("""");
+    ThreadDesc = MR_make_string_const("""");
 }
 #endif // MR_HIGHLEVEL_CODE
 ").
 
 :- pragma foreign_proc("Java",
     spawn_context_2(Goal::in(pred(in, di, uo) is cc_multi), Success::out,
-        ThreadId::out, _IO0::di, _IO::uo),
+        ThreadDesc::out, _IO0::di, _IO::uo),
     [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
         may_not_duplicate],
 "
     RunGoal rg = new RunGoal((Object[]) Goal);
     Task task = new Task(rg);
-    ThreadId = String.valueOf(task.getId());
-    rg.setId(ThreadId);
+    ThreadDesc = String.valueOf(task.getId());
+    rg.setThreadDesc(ThreadDesc);
     JavaInternal.getThreadPool().submit(task);
     Success = bool.YES;
 ").
@@ -344,39 +396,47 @@ spawn_native(Goal, Res, !IO) :-
 
 spawn_native(Goal, Options, Res, !IO) :-
     Options = thread_options(MinStackSize),
-    spawn_native_2(Goal, MinStackSize, Success, ThreadId, ErrorMsg, !IO),
+    Dummy = 0,  % for the typeinfo
+    spawn_native_2(Goal, Dummy, MinStackSize, Success, ThreadDesc, ErrorMsg,
+        !IO),
     (
         Success = yes,
-        Res = ok(thread(ThreadId))
+        Res = ok(detached_thread(ThreadDesc))
     ;
         Success = no,
         Res = error(ErrorMsg)
     ).
 
 :- pred spawn_native_2(pred(thread, io, io)::in(pred(in, di, uo) is cc_multi),
-    uint::in, bool::out, thread_id::out, string::out,
+    T::unused, uint::in, bool::out, thread_desc::out, string::out,
     io::di, io::uo) is cc_multi.
 
 :- pragma foreign_proc("C",
-    spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), MinStackSize::in,
-        Success::out, ThreadId::out, ErrorMsg::out, _IO0::di, _IO::uo),
+    spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _T::unused,
+        MinStackSize::in, Success::out, ThreadDesc::out, ErrorMsg::out,
+        _IO0::di, _IO::uo),
     [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
         may_not_duplicate],
 "
 #ifdef MR_THREAD_SAFE
-    Success = ML_create_exclusive_thread(Goal, MinStackSize, &ThreadId,
-        &ErrorMsg, MR_ALLOC_ID);
+    pthread_t   thread_handle;
+
+    // Pass 0 for joinable_thread_mutvar to create a detached thread.
+    Success = ML_create_exclusive_thread(TypeInfo_for_T, Goal,
+        MinStackSize, (MR_Word) 0, &ThreadDesc, &thread_handle, &ErrorMsg,
+        MR_ALLOC_ID);
 #else
     Success = MR_FALSE;
-    ThreadId = MR_make_string_const("""");
+    ThreadDesc = MR_make_string_const("""");
     ErrorMsg = MR_make_string_const(
         ""Cannot create native thread in this grade."");
 #endif
 ").
 
 :- pragma foreign_proc("C#",
-    spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _MinStackSize::in,
-        Success::out, ThreadId::out, ErrorMsg::out, _IO0::di, _IO::uo),
+    spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _T::unused,
+        _MinStackSize::in, Success::out, ThreadDesc::out, ErrorMsg::out,
+        _IO0::di, _IO::uo),
     [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
         may_not_duplicate],
 "
@@ -385,33 +445,34 @@ spawn_native(Goal, Options, Res, !IO) :-
         MercuryThread mt = new MercuryThread(Goal, thread_locals);
         System.Threading.Thread thread = new System.Threading.Thread(
             new System.Threading.ThreadStart(mt.run));
-        ThreadId = thread.ManagedThreadId.ToString();
-        mt.setThreadId(ThreadId);
+        ThreadDesc = thread.ManagedThreadId.ToString();
+        mt.setThreadDesc(ThreadDesc);
         thread.Start();
         Success = mr_bool.YES;
         ErrorMsg = """";
     } catch (System.Threading.ThreadStartException e) {
         Success = mr_bool.NO;
-        ThreadId = """";
+        ThreadDesc = """";
         ErrorMsg = e.Message;
     } catch (System.SystemException e) {
         // Seen with mono.
         Success = mr_bool.NO;
-        ThreadId = """";
+        ThreadDesc = """";
         ErrorMsg = e.Message;
     }
 ").
 
 :- pragma foreign_proc("Java",
-    spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _MinStackSize::in,
-        Success::out, ThreadId::out, ErrorMsg::out, _IO0::di, _IO::uo),
+    spawn_native_2(Goal::in(pred(in, di, uo) is cc_multi), _T::unused,
+        _MinStackSize::in, Success::out, ThreadDesc::out, ErrorMsg::out,
+        _IO0::di, _IO::uo),
     [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
         may_not_duplicate],
 "
     RunGoal rg = new RunGoal((Object[]) Goal);
     Task task = new Task(rg);
-    ThreadId = String.valueOf(task.getId());
-    rg.setId(ThreadId);
+    ThreadDesc = String.valueOf(task.getId());
+    rg.setThreadDesc(ThreadDesc);
     try {
         JavaInternal.getThreadPool().submitExclusiveThread(task);
         Success = bool.YES;
@@ -424,12 +485,129 @@ spawn_native(Goal, Options, Res, !IO) :-
         ErrorMsg = e.getMessage();
     }
     if (Success == bool.NO && ErrorMsg == null) {
-        ErrorMsg = ""unable to create new native thread"";
+        ErrorMsg = ""Unable to create new native thread."";
     }
 ").
 
 %---------------------------------------------------------------------------%
 
+spawn_native_joinable(Goal, Options, Res, !IO) :-
+    Options = thread_options(MinStackSize),
+    promise_pure (
+        impure new_mutvar0(OutputMutvar),
+        spawn_native_joinable_2(Goal, MinStackSize, OutputMutvar,
+            Success, ThreadHandle, ErrorMsg, !IO)
+    ),
+    (
+        Success = yes,
+        Res = ok(joinable_thread(ThreadHandle, OutputMutvar))
+    ;
+        Success = no,
+        Res = error(ErrorMsg)
+    ).
+
+:- pred spawn_native_joinable_2(
+    pred(joinable_thread(T), T, io, io)::in(pred(in, out, di, uo) is cc_multi),
+    uint::in, mutvar(T)::in, bool::out, thread_handle::out, string::out,
+    io::di, io::uo) is cc_multi.
+
+:- pragma foreign_proc("C",
+    spawn_native_joinable_2(Goal::in(pred(in, out, di, uo) is cc_multi),
+        MinStackSize::in, OutputMutvar::in,
+        Success::out, ThreadHandle::out, ErrorMsg::out, _IO0::di, _IO::uo),
+    [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+#ifdef MR_THREAD_SAFE
+    MR_String   thread_desc;
+
+    Success = ML_create_exclusive_thread(TypeInfo_for_T, Goal,
+        MinStackSize, OutputMutvar, &thread_desc, &ThreadHandle, &ErrorMsg,
+        MR_ALLOC_ID);
+#else
+    Success = MR_FALSE;
+    ThreadHandle = 0;
+    ErrorMsg = MR_make_string_const(
+        ""Cannot create joinable thread in this grade."");
+#endif
+").
+
+:- pragma foreign_proc("C#",
+    spawn_native_joinable_2(_Goal::in(pred(in, out, di, uo) is cc_multi),
+        _MinStackSize::in, _OutputMutvar::in,
+        Success::out, ThreadHandle::out, ErrorMsg::out, _IO0::di, _IO::uo),
+    [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+    Success = mr_bool.NO;
+    ThreadHandle = null;
+    ErrorMsg = ""Cannot create joinable thread in this grade."";
+").
+
+:- pragma foreign_proc("Java",
+    spawn_native_joinable_2(_Goal::in(pred(in, out, di, uo) is cc_multi),
+        _MinStackSize::in, _OutputMutvar::in,
+        Success::out, ThreadHandle::out, ErrorMsg::out, _IO0::di, _IO::uo),
+    [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+    Success = bool.NO;
+    ThreadHandle = null;
+    ErrorMsg = ""Cannot create joinable thread in this grade."";
+").
+
+%---------------------------------------------------------------------------%
+
+join_thread(Thread, Res, !IO) :-
+    Thread = joinable_thread(ThreadHandle, OutputMutvar),
+    promise_pure (
+        join_thread_2(ThreadHandle, Success, ErrorMsg, !IO),
+        (
+            Success = yes,
+            impure get_mutvar(OutputMutvar, Output),
+            Res0 = ok(Output)
+        ;
+            Success = no,
+            Res0 = error(ErrorMsg)
+        )
+    ),
+    cc_multi_equal(Res0, Res).
+
+:- pred join_thread_2(thread_handle::in, bool::out, string::out,
+    io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+    join_thread_2(ThreadHandle::in, Success::out, ErrorMsg::out,
+        _IO0::di, _IO::uo),
+    [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+#ifdef MR_THREAD_SAFE
+    int     err;
+    char    errbuf[MR_STRERROR_BUF_SIZE];
+
+    err = pthread_join(ThreadHandle, NULL);
+    if (err == 0) {
+        Success = MR_YES;
+        ErrorMsg = MR_make_string_const("""");
+    } else {
+        Success = MR_NO;
+        ErrorMsg = MR_make_string(MR_ALLOC_ID, ""pthread_join failed: %s"",
+            MR_strerror(err, errbuf, sizeof(errbuf)));
+    }
+#else
+    Success = MR_NO;
+    ErrorMsg = MR_make_string_const(
+        ""Native threads are not supported in this grade."");
+#endif
+").
+
+join_thread_2(_ThreadHandle, Success, ErrorMsg, !IO) :-
+    Success = no,
+    ErrorMsg = "Joinable threads not supported in this grade.".
+
+%---------------------------------------------------------------------------%
+
 :- pragma no_inline(pred(yield/2)).
 :- pragma foreign_proc("C",
     yield(_IO0::di, _IO::uo),
@@ -507,7 +685,7 @@ INIT mercury_sys_init_thread_modules
     {
         // Call the closure placed the top of the stack.
         MR_r1 = MR_stackvar(1); // Goal
-        MR_r2 = MR_stackvar(2); // ThreadId
+        MR_r2 = MR_stackvar(2); // ThreadDesc
         MR_decr_sp(2);
         MR_noprof_call(MR_ENTRY(mercury__do_call_closure_1),
             MR_LABEL(mercury__thread__spawn_end_thread));
@@ -566,46 +744,53 @@ INIT mercury_sys_init_thread_modules
 
 :- pragma foreign_decl("C", local, "
 #if defined(MR_THREAD_SAFE)
-  #include  <pthread.h>
 
-  static MR_bool ML_create_exclusive_thread(MR_Word goal,
-                    size_t min_stack_size, MR_String *thread_id,
+#include  <pthread.h>
+
+static MR_bool ML_create_exclusive_thread(MR_Word typeinfo_for_T, MR_Word goal,
+                size_t min_stack_size, MR_Word joinable_thread_mutvar,
+                MR_String *thread_desc, pthread_t *thread_handle,
                 MR_String *error_msg, MR_AllocSiteInfoPtr alloc_id);
-  static void   *ML_exclusive_thread_wrapper(void *arg);
+static void   *ML_exclusive_thread_wrapper(void *arg);
 
-  typedef struct ML_ThreadWrapperArgs ML_ThreadWrapperArgs;
-  struct ML_ThreadWrapperArgs {
+typedef struct ML_ThreadWrapperArgs ML_ThreadWrapperArgs;
+struct ML_ThreadWrapperArgs {
     MercuryLock         mutex;
     MercuryCond         cond;
+    MR_Word             typeinfo_for_T;
     MR_Word             goal;
     MR_ThreadLocalMuts  *thread_local_mutables;
+    MR_Word             joinable_thread_mutvar; // 0 for detached thread
     MR_Integer          thread_state;
-        MR_String           thread_id;
-  };
+    MR_String           thread_desc;
+    pthread_t           thread_handle;
+};
 
-  enum {
+enum {
     ML_THREAD_NOT_READY,
     ML_THREAD_READY,
     ML_THREAD_START_ERROR
-  };
+};
 
 #endif // MR_THREAD_SAFE
 ").
 
 :- pragma foreign_code("C", "
 #if defined(MR_THREAD_SAFE)
-  static MR_bool
-  ML_create_exclusive_thread(MR_Word goal, size_t min_stack_size,
-        MR_String *thread_id, MR_String *error_msg,
-        MR_AllocSiteInfoPtr alloc_id)
-  {
+static MR_bool
+ML_create_exclusive_thread(MR_Word typeinfo_for_T, MR_Word goal,
+    size_t min_stack_size, MR_Word joinable_thread_mutvar,
+    MR_String *thread_desc, pthread_t *thread_handle,
+    MR_String *error_msg, MR_AllocSiteInfoPtr alloc_id)
+{
     ML_ThreadWrapperArgs    args;
     pthread_t               thread;
     pthread_attr_t          attrs;
     int                     err;
     char                    errbuf[MR_STRERROR_BUF_SIZE];
 
-    *thread_id = MR_make_string_const("""");
+    *thread_desc = MR_make_string_const("""");
+    *thread_handle = 0;
     *error_msg = MR_make_string_const("""");
 
     ML_incr_thread_barrier_count();
@@ -618,13 +803,18 @@ INIT mercury_sys_init_thread_modules
 
     pthread_mutex_init(&args.mutex, MR_MUTEX_ATTR);
     pthread_cond_init(&args.cond, MR_COND_ATTR);
+    args.typeinfo_for_T = typeinfo_for_T;
     args.goal = goal;
     args.thread_local_mutables =
         MR_clone_thread_local_mutables(MR_THREAD_LOCAL_MUTABLES);
+    args.joinable_thread_mutvar = joinable_thread_mutvar;
+    // These fields will be updated by the newly created thread.
     args.thread_state = ML_THREAD_NOT_READY;
-    args.thread_id = NULL;
+    args.thread_desc = NULL;
+    args.thread_handle = 0;
 
     pthread_attr_init(&attrs);
+    if (joinable_thread_mutvar == 0) {
         err = pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
         if (err != 0) {
             *error_msg = MR_make_string(alloc_id,
@@ -632,6 +822,7 @@ INIT mercury_sys_init_thread_modules
                 MR_strerror(errno, errbuf, sizeof(errbuf)));
             goto failed_to_create_thread;
         }
+    }
     if (min_stack_size > 0) {
         err = pthread_attr_setstacksize(&attrs, min_stack_size);
         if (err != 0) {
@@ -673,19 +864,23 @@ failed_to_create_thread:
     pthread_mutex_destroy(&args.mutex);
 
     if (args.thread_state == ML_THREAD_READY) {
-        *thread_id = args.thread_id;
+        *thread_desc = args.thread_desc;
+        *thread_handle = args.thread_handle;
         return MR_TRUE;
     }
 
     ML_decr_thread_barrier_count();
     return MR_FALSE;
-  }
+}
 
-  static void *ML_exclusive_thread_wrapper(void *arg)
-  {
+static void *ML_exclusive_thread_wrapper(void *arg)
+{
     ML_ThreadWrapperArgs    *args = arg;
+    MR_Word                 typeinfo_for_T;
     MR_Word                 goal;
-    MR_String               thread_id;
+    MR_Word                 joinable_thread_mutvar;
+    MR_String               thread_desc;
+    pthread_t               thread_handle;
 
     if (MR_init_thread(MR_use_now) == MR_FALSE) {
         MR_LOCK(&args->mutex, ""ML_exclusive_thread_wrapper"");
@@ -703,44 +898,80 @@ failed_to_create_thread:
     MR_assert(MR_THREAD_LOCAL_MUTABLES == NULL);
     MR_SET_THREAD_LOCAL_MUTABLES(args->thread_local_mutables);
 
-    thread_id = MR_make_string(MR_ALLOC_SITE_RUNTIME,
-        ""%"" MR_INTEGER_LENGTH_MODIFIER ""x"", MR_SELF_THREAD_ID);
-
-    // Take a copy of the goal before telling the parent we are ready.
+    // Take a copy of args fields.
+    typeinfo_for_T = args->typeinfo_for_T;
     goal = args->goal;
+    joinable_thread_mutvar = args->joinable_thread_mutvar;
 
+    thread_desc = MR_make_string(MR_ALLOC_SITE_RUNTIME,
+        ""%"" MR_INTEGER_LENGTH_MODIFIER ""x"", MR_SELF_THREAD_ID);
+    thread_handle = pthread_self();
+
+    // Tell the 'parent' we are ready, passing back a thread descriptor and
+    // thread handle.
     MR_LOCK(&args->mutex, ""ML_exclusive_thread_wrapper"");
     args->thread_state = ML_THREAD_READY;
-    args->thread_id = thread_id;
+    args->thread_desc = thread_desc;
+    args->thread_handle = thread_handle;
     MR_COND_SIGNAL(&args->cond, ""ML_exclusive_thread_wrapper"");
     MR_UNLOCK(&args->mutex, ""ML_exclusive_thread_wrapper"");
+    // We must not dereference args after this point.
 
-    ML_call_back_to_mercury_cc_multi(goal, thread_id);
+    if (joinable_thread_mutvar == 0) {
+        ML_call_back_to_mercury_detached_cc_multi(goal, thread_desc);
+    } else {
+        ML_call_back_to_mercury_joinable_cc_multi(typeinfo_for_T, goal,
+            thread_handle, joinable_thread_mutvar);
+    }
 
     MR_finalize_thread_engine();
 
     ML_decr_thread_barrier_count();
 
     return NULL;
-  }
+}
 #endif // MR_THREAD_SAFE
 ").
 
-:- pred call_back_to_mercury(
+:- pred call_back_to_mercury_detached(
     pred(thread, io, io)::in(pred(in, di, uo) is cc_multi),
-    thread_id::in, io::di, io::uo) is cc_multi.
+    thread_desc::in, io::di, io::uo) is cc_multi.
 :- pragma foreign_export("C",
-    call_back_to_mercury(in(pred(in, di, uo) is cc_multi), in, di, uo),
-    "ML_call_back_to_mercury_cc_multi").
+    call_back_to_mercury_detached(in(pred(in, di, uo) is cc_multi),
+    in, di, uo), "ML_call_back_to_mercury_detached_cc_multi").
 :- pragma foreign_export("C#",
-    call_back_to_mercury(in(pred(in, di, uo) is cc_multi), in, di, uo),
-    "ML_call_back_to_mercury_cc_multi").
+    call_back_to_mercury_detached(in(pred(in, di, uo) is cc_multi),
+    in, di, uo), "ML_call_back_to_mercury_detached_cc_multi").
 :- pragma foreign_export("Java",
-    call_back_to_mercury(in(pred(in, di, uo) is cc_multi), in, di, uo),
-    "ML_call_back_to_mercury_cc_multi").
+    call_back_to_mercury_detached(in(pred(in, di, uo) is cc_multi),
+    in, di, uo), "ML_call_back_to_mercury_detached_cc_multi").
+:- pragma no_inline(pred(call_back_to_mercury_detached/4)).
 
-call_back_to_mercury(Goal, ThreadId, !IO) :-
-    Goal(thread(ThreadId), !IO).
+call_back_to_mercury_detached(Goal, ThreadDesc, !IO) :-
+    Thread = detached_thread(ThreadDesc),
+    Goal(Thread, !IO).
+
+:- pred call_back_to_mercury_joinable(
+    pred(joinable_thread(T), T, io, io)::in(pred(in, out, di, uo) is cc_multi),
+    thread_handle::in, mutvar(T)::in, io::di, io::uo) is cc_multi.
+:- pragma foreign_export("C",
+    call_back_to_mercury_joinable(in(pred(in, out, di, uo) is cc_multi),
+    in, in, di, uo), "ML_call_back_to_mercury_joinable_cc_multi").
+:- pragma no_inline(pred(call_back_to_mercury_joinable/5)).
+
+call_back_to_mercury_joinable(Goal, ThreadHandle, OutputMutvar, !IO) :-
+    Thread = joinable_thread(ThreadHandle, OutputMutvar),
+    promise_pure (
+        Goal(Thread, Output, !IO),
+        % Store a reference to the output term in a mutvar that is in turn
+        % referenced from a joinable_thread term. If we simply returned the
+        % output term as the return value of the pthread_create start routine
+        % (ML_exclusive_thread_wrapper), it might be possible that the last
+        % reference to the term resides only in some GC-inaccessible memory
+        % in the pthread implementation, and therefore could be collected
+        % before join_thread retrieves the value.
+        impure set_mutvar(OutputMutvar, Output)
+    ).
 
 %---------------------------------------------------------------------------%
 
@@ -794,47 +1025,47 @@ call_back_to_mercury(Goal, ThreadId, !IO) :-
 
 :- pragma foreign_code("C#", "
 private class MercuryThread {
-    private object[] Goal;
+    private object[] goal;
     private object[] thread_local_mutables;
-    private string ThreadId;
+    private string thread_desc;
 
-    internal MercuryThread(object[] g, object[] tlmuts)
+    internal MercuryThread(object[] goal, object[] tlmuts)
     {
-        Goal = g;
-        thread_local_mutables = tlmuts;
+        this.goal = goal;
+        this.thread_local_mutables = tlmuts;
     }
 
-    internal void setThreadId(string id)
+    internal void setThreadDesc(string thread_desc)
     {
-        ThreadId = id;
+        this.thread_desc = thread_desc;
     }
 
     internal void run()
     {
         runtime.ThreadLocalMutables.set_array(thread_local_mutables);
-        thread.ML_call_back_to_mercury_cc_multi(Goal, ThreadId);
+        thread.ML_call_back_to_mercury_detached_cc_multi(goal, thread_desc);
     }
 }").
 
 :- pragma foreign_code("Java", "
 public static class RunGoal implements Runnable {
     private final Object[]  goal;
-    private String          id;
+    private String          thread_desc;
 
-    private RunGoal(Object[] g)
+    private RunGoal(Object[] goal)
     {
-        goal = g;
-        id = null;
+        this.goal = goal;
+        this.thread_desc = null;
     }
 
-    private void setId(String id)
+    private void setThreadDesc(String thread_desc)
     {
-        this.id = id;
+        this.thread_desc = thread_desc;
     }
 
     public void run()
     {
-        thread.ML_call_back_to_mercury_cc_multi(goal, id);
+        thread.ML_call_back_to_mercury_detached_cc_multi(goal, thread_desc);
     }
 }").
 
diff --git a/tests/hard_coded/Mmakefile b/tests/hard_coded/Mmakefile
index bf8ffafc0..2fb0b8162 100644
--- a/tests/hard_coded/Mmakefile
+++ b/tests/hard_coded/Mmakefile
@@ -370,6 +370,7 @@ ORDINARY_PROGS = \
 	solve_quadratic \
 	space \
 	spawn_native \
+	spawn_native_joinable \
 	special_char \
 	stable_sort \
 	static_no_tag \
diff --git a/tests/hard_coded/spawn_native_joinable.exp b/tests/hard_coded/spawn_native_joinable.exp
new file mode 100644
index 000000000..b7ee1b54a
--- /dev/null
+++ b/tests/hard_coded/spawn_native_joinable.exp
@@ -0,0 +1,6 @@
+a start
+b start
+a stop
+b stop
+a join result: ok({"a", 1234})
+b join result: ok({"b", 1234})
diff --git a/tests/hard_coded/spawn_native_joinable.exp2 b/tests/hard_coded/spawn_native_joinable.exp2
new file mode 100644
index 000000000..c5b9e2b4e
--- /dev/null
+++ b/tests/hard_coded/spawn_native_joinable.exp2
@@ -0,0 +1 @@
+spawn_native_joinable not supported
diff --git a/tests/hard_coded/spawn_native_joinable.m b/tests/hard_coded/spawn_native_joinable.m
new file mode 100644
index 000000000..33e7fdfd2
--- /dev/null
+++ b/tests/hard_coded/spawn_native_joinable.m
@@ -0,0 +1,110 @@
+%---------------------------------------------------------------------------%
+% vim: ts=4 sw=4 et ft=mercury
+%---------------------------------------------------------------------------%
+
+:- module spawn_native_joinable.
+:- interface.
+
+:- import_module io.
+
+:- pred main(io::di, io::uo) is cc_multi.
+
+%---------------------------------------------------------------------------%
+%---------------------------------------------------------------------------%
+
+:- implementation.
+
+:- import_module list.
+:- import_module maybe.
+:- import_module string.
+:- import_module thread.
+
+:- pragma foreign_decl("C", local, "
+    #include <time.h>
+").
+
+%---------------------------------------------------------------------------%
+
+main(!IO) :-
+    ( if can_spawn_native then
+        Options = init_thread_options,
+        spawn_native_joinable(thread_proc("a"), Options, SpawnResA, !IO),
+        msleep(100, !IO),
+        spawn_native_joinable(thread_proc("b"), Options, SpawnResB, !IO),
+        (
+            SpawnResA = ok(ThreadA),
+            (
+                SpawnResB = ok(ThreadB),
+                join_thread(ThreadA, JoinResA, !IO),
+                join_thread(ThreadB, JoinResB, !IO),
+
+                io.write_string("a join result: ", !IO),
+                io.print_line(JoinResA, !IO),
+
+                io.write_string("b join result: ", !IO),
+                io.print_line(JoinResB, !IO)
+            ;
+                SpawnResB = error(ErrorB),
+                io.print_line(ErrorB, !IO),
+                join_thread(ThreadA, _JoinResA, !IO)
+            )
+        ;
+            SpawnResA = error(ErrorA),
+            io.print_line(ErrorA, !IO)
+        )
+    else
+        io.write_string("spawn_native_joinable not supported\n", !IO)
+    ).
+
+:- type thread_output == {string, int}.
+
+:- pred thread_proc(string::in, joinable_thread(thread_output)::in,
+    thread_output::out, io::di, io::uo) is cc_multi.
+
+thread_proc(Id, _Thread, Output, !IO) :-
+    io.write_string(Id ++ " start\n", !IO),
+    msleep(500, !IO),
+    cc_multi_equal({Id, 1234}, Output),
+    io.write_string(Id ++ " stop\n", !IO).
+
+%---------------------------------------------------------------------------%
+
+:- pred msleep(int::in, io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+    msleep(Msecs::in, _IO0::di, _IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+#ifdef MR_WIN32
+    Sleep(Msecs);
+#else
+{
+    struct timespec req;
+    int err;
+
+    req.tv_sec = 0;
+    req.tv_nsec = Msecs * 1000000;
+    do {
+        err = nanosleep(&req, &req);
+    } while (err == -1 && errno == EINTR);
+}
+#endif
+").
+
+:- pragma foreign_proc("C#",
+    msleep(Msecs::in, _IO0::di, _IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io],
+"
+    System.Threading.Thread.Sleep(Msecs);
+").
+
+:- pragma foreign_proc("Java",
+    msleep(Msecs::in, _IO0::di, _IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io],
+"
+    try {
+        Thread.sleep(Msecs);
+    } catch (InterruptedException e) {
+    }
+").
-- 
2.43.0



More information about the reviews mailing list