[m-rev.] for review: more efficient parallel mmc --make

Peter Wang novalazy at gmail.com
Tue Jul 28 17:04:27 AEST 2009


Branches: main

Fix a performance problem with mmc --make --jobs <n>.  It would fork a new
process for each target, whether or not that target was already up-to-date.
For a large program which is mostly up-to-date, this could be very costly.
The main reason it was implemented so was to avoid interprocess communication
which might be non-portable (the only communication was via a sub-process's
exit code).

This patch makes use of IPC to allow the master process to fork off only N-1
worker processes, itself being the last worker.  The workers communicate via
shared memory as to which tasks still need doing, and which are completed.
A worker process therefore may work on as many tasks as required before
terminating.

Parallel mmc --make is currently disabled in .par grades.  It should be
fairly simple to restore.


configure.in:
runtime/mercury_conf.h.in:
        Check sys/mman.h is available for mmap().

        Check if -lrt is required for POSIX semaphores.

Mmake.common.in:
compiler/Mmakefile:
        Link with -lrt if found.

compiler/make.util.m:
        Reimplement process-based concurrent fold predicate as above.
        Fall back to non-concurrent fold if the IPC mechanisms are
        unavailable.

        Delete old thread-based concurrent fold implementation.

compiler/process_util.m:
        Add wait_pid to wait on a particular child process.

compiler/Mercury.options:
        Set --no-ansi-c on make.util.m.


diff --git a/Mmake.common.in b/Mmake.common.in
index eb80417..aaca235 100644
--- a/Mmake.common.in
+++ b/Mmake.common.in
@@ -214,6 +214,7 @@ PERL=@PERL@
 MATH_LIB=@MATH_LIB@
 
 # More libraries to link
+SEMAPHORE_LIBRARY=@SEMAPHORE_LIBRARY@
 SOCKET_LIBRARY=@SOCKET_LIBRARY@
 NSL_LIBRARY=@NSL_LIBRARY@
 DL_LIBRARY=@DL_LIBRARY@
diff --git a/compiler/Mercury.options b/compiler/Mercury.options
index ea6a779..cb770a1 100644
--- a/compiler/Mercury.options
+++ b/compiler/Mercury.options
@@ -20,6 +20,9 @@ MCFLAGS-hlds.quantification = --optimize-unused-args
 # which are not available with `--ansi'.
 MGNUCFLAGS-libs.process_util = --no-ansi
 
+# make.util.m uses POSIX IPC.
+MGNUCFLAGS-make.util = --no-ansi
+
 # If intermodule optimization is enabled, we also need to use `--no-ansi'
 # for the following modules, because they import process_util.
 MGNUCFLAGS-make.module_dep_file = --no-ansi
@@ -33,6 +36,7 @@ MCFLAGS-libs.process_util = --no-ansi-c
 MCFLAGS-make.module_dep_file = --no-ansi-c
 MCFLAGS-make.module_target = --no-ansi-c
 MCFLAGS-make.program_target = --no-ansi-c
+MCFLAGS-make.util = --no-ansi-c
 
 MCFLAGS-erl_backend = --no-warn-unused-imports
 MCFLAGS-hlds = --no-warn-unused-imports
diff --git a/compiler/Mmakefile b/compiler/Mmakefile
index e699a71..62a0d60 100644
--- a/compiler/Mmakefile
+++ b/compiler/Mmakefile
@@ -68,6 +68,7 @@ MCFLAGS	     += --flags COMP_FLAGS $(CONFIG_OVERRIDE)
 MLOBJS       := ../main.$O \
 		../trace/lib$(EVENTSPEC_LIB_NAME).$A \
 		$(MLOBJS)
+MLLIBS       += $(SEMAPHORE_LIBRARY)
 ALL_MLLIBS    = $(MLLIBS) $(EXTRA_MLLIBS) $(GCC_BACKEND_LIBS)
 MLFLAGS      += --no-main --shared
 C2INITARGS   += $(MDBCOMP_DIR)/$(MDBCOMP_LIB_NAME).init
diff --git a/compiler/make.util.m b/compiler/make.util.m
index 0345470..bc883d0 100644
--- a/compiler/make.util.m
+++ b/compiler/make.util.m
@@ -330,6 +330,7 @@
 :- import_module dir.
 :- import_module exception.
 :- import_module getopt_io.
+:- import_module maybe.
 :- import_module set.
 :- import_module thread.
 :- import_module thread.channel.
@@ -392,50 +393,17 @@ foldl3_maybe_stop_at_error_2(KeepGoing, P, [T | Ts],
 % Parallel (concurrent) fold
 %
 
-:- type child_exit
-    --->    child_succeeded
-    ;       child_failed
-    ;       child_exception(univ).
-
-:- inst child_succeeded_or_failed
-    --->    child_succeeded
-    ;       child_failed.
-
-    % A generic interface for the two parallel fold implementations:
-    % one using processes and one using threads.
-    %
-:- typeclass par_fold(PF) where [
-
-    % run_in_child(Pred, Info, T, Succeeded, !PF, !IO)
-    %
-    % Start executing Pred in a child thread/process.  Succeeded is `yes' iff
-    % the child was successfully spawned.
-    %
-    pred run_in_child(
-        foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
-        Info::in, T::in, bool::out, PF::in, PF::out, io::di, io::uo) is det,
-
-    % Block until a child exit code is received.
-    %
-    pred wait_for_child_exit(child_exit::out(child_succeeded_or_failed),
-        PF::in, PF::out, io::di, io::uo) is det
-].
-
 foldl2_maybe_stop_at_error_maybe_parallel(KeepGoing, MakeTarget, Targets,
         Success, !Info, !IO) :-
     globals.io_lookup_int_option(jobs, Jobs, !IO),
-    ( Jobs > 1 ->
+    (
+        Jobs > 1,
+        process_util.can_fork,
+        have_job_ctl_ipc
+    ->
         % First pass.
-        % fork() is disabled on threaded grades.
-        ( process_util.can_fork ->
-            foldl2_maybe_stop_at_error_parallel_processes(KeepGoing, Jobs,
-                MakeTarget, Targets, Success0, !.Info, !IO)
-        ; thread.can_spawn ->
-            foldl2_maybe_stop_at_error_parallel_threads(KeepGoing, Jobs,
-                MakeTarget, Targets, Success0, !.Info, !IO)
-        ;
-            Success0 = yes
-        ),
+        foldl2_maybe_stop_at_error_parallel_processes(KeepGoing, Jobs,
+            MakeTarget, Targets, Success0, !Info, !IO),
         % Second pass (sequential).
         (
             Success0 = yes,
@@ -455,270 +423,336 @@ foldl2_maybe_stop_at_error_maybe_parallel(KeepGoing, MakeTarget, Targets,
             !Info, !IO)
     ).
 
-:- pred do_parallel_foldl2(bool::in, int::in,
+:- pred foldl2_maybe_stop_at_error_parallel_processes(bool::in, int::in,
     foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
-    Info::in, list(T)::in, bool::out, PF::in, PF::out, io::di, io::uo)
-    is det <= par_fold(PF).
-
-do_parallel_foldl2(KeepGoing, Jobs, MakeTarget, Info, Targets, Success,
-        !PF, !IO) :-
-    list.split_upto(Jobs, Targets, InitialTargets, LaterTargets),
-    start_initial_child_jobs(KeepGoing, MakeTarget, Info, InitialTargets,
-        0, NumChildJobs, !PF, !IO),
-    ( NumChildJobs < length(InitialTargets) ->
-        Success0 = no
-    ;
-        Success0 = yes
-    ),
-    do_parallel_foldl2_parent_loop(KeepGoing, MakeTarget, Info,
-        NumChildJobs, LaterTargets, Success0, Success, !PF, !IO).
+    list(T)::in, bool::out, Info::in, Info::out, io::di, io::uo) is det.
 
-:- pred start_initial_child_jobs(bool::in,
-    foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
-    Info::in, list(T)::in, int::in, int::out,
-    PF::in, PF::out, io::di, io::uo) is det <= par_fold(PF).
-
-start_initial_child_jobs(_KeepGoing, _MakeTarget, _Info,
-        [], !NumChildJobs, !PF, !IO).
-start_initial_child_jobs(KeepGoing, MakeTarget, Info,
-        [Target | Targets], !NumChildJobs, !PF, !IO) :-
-    run_in_child(MakeTarget, Info, Target, Success, !PF, !IO),
+foldl2_maybe_stop_at_error_parallel_processes(KeepGoing, Jobs, MakeTarget,
+        Targets, Success, !Info, !IO) :-
+    TotalTasks = list.length(Targets),
+    create_job_ctl(TotalTasks, MaybeJobCtl, !IO),
     (
-        Success = yes,
-        start_initial_child_jobs(KeepGoing, MakeTarget, Info, Targets,
-            !.NumChildJobs + 1, !:NumChildJobs, !PF, !IO)
-    ;
-        Success = no,
-        KeepGoing = yes,
-        start_initial_child_jobs(KeepGoing, MakeTarget, Info, Targets,
-            !NumChildJobs, !PF, !IO)
+        MaybeJobCtl = yes(JobCtl),
+        list.map_foldl(
+            start_worker_process(KeepGoing, MakeTarget, Targets, JobCtl,
+                !.Info),
+            2 .. Jobs, MaybePids, !IO),
+        worker_loop(KeepGoing, MakeTarget, Targets, JobCtl, yes, Success0,
+            !Info, !IO),
+        list.foldl2(reap_worker_process, MaybePids, Success0, Success, !IO),
+        destroy_job_ctl(JobCtl, !IO)
     ;
-        Success = no,
-        KeepGoing = no
+        MaybeJobCtl = no,
+        Success = no
     ).
 
-:- pred do_parallel_foldl2_parent_loop(bool::in,
+:- pred start_worker_process(bool::in,
     foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
-    Info::in, int::in, list(T)::in, bool::in, bool::out, PF::in, PF::out,
-    io::di, io::uo) is det <= par_fold(PF).
+    list(T)::in, job_ctl::in, Info::in, int::in, maybe(pid)::out,
+    io::di, io::uo) is det.
 
-do_parallel_foldl2_parent_loop(KeepGoing, MakeTarget, Info, NumChildJobs0,
-        Targets, !Success, !PF, !IO) :-
-    (
-        % We are done once all running children have terminated and there are
-        % no more targets to make.
-        NumChildJobs0 = 0,
-        Targets = []
-    ->
-        true
-    ;
-        % Wait for a running child to indicate that it is finished.
-        wait_for_child_exit(Exit, !PF, !IO),
+start_worker_process(KeepGoing, MakeTarget, Targets, JobCtl, Info, _ChildN,
+        MaybePid, !IO) :-
+    start_in_forked_process(
+        child_worker(KeepGoing, MakeTarget, Targets, JobCtl, Info),
+        MaybePid, !IO).
+
+:- pred child_worker(bool::in,
+    foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
+    list(T)::in, job_ctl::in, Info::in, bool::out, io::di, io::uo) is det.
+
+child_worker(KeepGoing, MakeTarget, Targets, JobCtl, Info0, Success, !IO) :-
+    worker_loop(KeepGoing, MakeTarget, Targets, JobCtl, yes, Success,
+        Info0, _Info, !IO).
+
+:- pred worker_loop(bool::in,
+    foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
+    list(T)::in, job_ctl::in, bool::in, bool::out, Info::in, Info::out,
+    io::di, io::uo) is det.
+
+worker_loop(KeepGoing, MakeTarget, Targets, JobCtl, !Success, !Info, !IO) :-
+    accept_task(JobCtl, TaskNumber, !IO),
+    ( TaskNumber >= 0 ->
+        Target = list.det_index0(Targets, TaskNumber),
+        MakeTarget(Target, TargetSuccess, !Info, !IO),
         (
-            Exit = child_succeeded,
-            NewSuccess = yes
+            TargetSuccess = yes,
+            mark_task_done(JobCtl, TaskNumber, !IO)
         ;
-            Exit = child_failed,
-            NewSuccess = no
+            TargetSuccess = no,
+            mark_task_error(JobCtl, TaskNumber, KeepGoing, !IO),
+            !:Success = no
         ),
+        worker_loop(KeepGoing, MakeTarget, Targets, JobCtl,
+            !Success, !Info, !IO)
+    ;
+        % No more tasks.
+        true
+    ).
+
+:- pred reap_worker_process(maybe(pid)::in, bool::in, bool::out,
+    io::di, io::uo) is det.
+
+reap_worker_process(MaybePid, !Success, !IO) :-
+    (
+        MaybePid = yes(Pid),
+        wait_pid(Pid, Status, !IO),
         (
-            ( NewSuccess = yes
-            ; KeepGoing = yes
-            )
+            !.Success = yes,
+            Status = ok(exited(0))
         ->
-            !:Success = !.Success `and` NewSuccess,
-            (
-                Targets = [],
-                MoreTargets = [],
-                NumChildJobs = NumChildJobs0 - 1
-            ;
-                Targets = [NextTarget | MoreTargets],
-                run_in_child(MakeTarget, Info, NextTarget, ChildStarted,
-                    !PF, !IO),
-                (
-                    ChildStarted = yes,
-                    NumChildJobs = NumChildJobs0
-                ;
-                    ChildStarted = no,
-                    NumChildJobs = NumChildJobs0 - 1,
-                    !:Success = no
-                )
-            ),
-            do_parallel_foldl2_parent_loop(KeepGoing, MakeTarget, Info,
-                NumChildJobs, MoreTargets, !Success, !PF, !IO)
+            true
         ;
-            % Wait for the other running children to terminate before
-            % returning.
-            !:Success = no,
-            wait_for_child_exits(NumChildJobs0 - 1, !PF, !IO)
+            !:Success = no
         )
-    ).
-
-:- pred wait_for_child_exits(int::in,
-    PF::in, PF::out, io::di, io::uo) is det <= par_fold(PF).
-
-wait_for_child_exits(Num, !PF, !IO) :-
-    ( Num > 0 ->
-        wait_for_child_exit(_, !PF, !IO),
-        wait_for_child_exits(Num - 1, !PF, !IO)
     ;
-        true
+        MaybePid = no
     ).
 
 %-----------------------------------------------------------------------------%
 %
-% Parallel fold using processes
+% Shared memory IPC for parallel workers
 %
 
-:- type fork_par_fold
-    --->    fork_par_fold(
-                fpf_children :: set(pid)
-            ).
+:- pragma foreign_decl("C", "
+typedef struct MC_JobCtl MC_JobCtl;
+").
 
-:- instance par_fold(fork_par_fold) where [
-    pred(run_in_child/8) is run_in_child_process,
-    pred(wait_for_child_exit/5) is wait_for_child_process_exit
-].
+:- pragma foreign_decl("C", local,
+"
+#if defined(MR_HAVE_SEMAPHORE_H) && defined(MR_HAVE_SYS_MMAN_H)
+  #include <semaphore.h>
+  #include <sys/mman.h>
 
-:- pred foldl2_maybe_stop_at_error_parallel_processes(bool::in, int::in,
-    foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
-    list(T)::in, bool::out, Info::in, io::di, io::uo) is det.
+  /* Just in case. */
+  #if !defined(MAP_ANONYMOUS) && defined(MAP_ANON)
+    #define MAP_ANONYMOUS MAP_ANON
+  #endif
 
-foldl2_maybe_stop_at_error_parallel_processes(KeepGoing, Jobs, MakeTarget,
-        Targets, Success, Info, !IO) :-
-    PF0 = fork_par_fold(set.init),
-    do_parallel_foldl2(KeepGoing, Jobs, MakeTarget, Info, Targets,
-        Success, PF0, _PF, !IO).
+  #ifdef MAP_ANONYMOUS
+    #define MC_HAVE_JOBCTL_IPC 1
+  #endif
+#endif
 
-:- pred run_in_child_process(
-    foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
-    Info::in, T::in, bool::out, fork_par_fold::in, fork_par_fold::out,
-    io::di, io::uo) is det.
+#ifdef MC_HAVE_JOBCTL_IPC
 
-run_in_child_process(P, Info, T, ChildStarted, PF0, PF, !IO) :-
-    start_in_forked_process(
-        (pred(Success::out, !.IO::di, !:IO::uo) is det :-
-            P(T, Success, Info, _Info, !IO)
-        ), MaybePid, !IO),
-    (
-        MaybePid = yes(Pid),
-        ChildStarted = yes,
-        PF0 = fork_par_fold(Set0),
-        set.insert(Set0, Pid, Set),
-        PF = fork_par_fold(Set)
-    ;
-        MaybePid = no,
-        ChildStarted = no,
-        PF = PF0
-    ).
+typedef enum MC_TaskStatus MC_TaskStatus;
 
-:- pred wait_for_child_process_exit(child_exit::out(child_succeeded_or_failed),
-    fork_par_fold::in, fork_par_fold::out, io::di, io::uo) is det.
+enum MC_TaskStatus {
+    TASK_NEW,       /* task not yet attempted */
+    TASK_ACCEPTED,  /* someone is working on this task */
+    TASK_DONE,      /* task successfully completed */
+    TASK_ERROR      /* error occurred when working on the task */
+};
 
-wait_for_child_process_exit(ChildExit, PF0, PF, !IO) :-
-    wait_any(DeadPid, ChildStatus, !IO),
-    fork_par_fold(Pids0) = PF0,
-    ( set.remove(Pids0, DeadPid, Pids) ->
-        ( ChildStatus = ok(exited(0)) ->
-            ChildExit = child_succeeded
-        ;
-            ChildExit = child_failed
-        ),
-        PF = fork_par_fold(Pids)
-    ;
-        % Not a child of ours, maybe a grand child.  Ignore it.
-        wait_for_child_process_exit(ChildExit, PF0, PF, !IO)
-    ).
+/* This structure is placed in shared memory. */
+struct MC_JobCtl {
+    /* Static data. */
+    sem_t           jc_sem;
+    MR_Integer      jc_total_tasks;
 
-%-----------------------------------------------------------------------------%
-%
-% Parallel fold using threads
-%
+    /* Dynamic data, protected with a semaphore. */
+    MR_bool         jc_abort;
+    MC_TaskStatus   jc_tasks[MR_VARIABLE_SIZED];
+};
 
-:- type thread_par_fold
-    --->    thread_par_fold(
-                tpf_channel     :: channel(child_exit),
-                                % A channel to communicate between the children
-                                % and the parent.
+#define MC_JOB_CTL_SIZE(N)  (sizeof(MC_JobCtl) + (N) * sizeof(MC_TaskStatus))
 
-                tpf_maybe_excp  :: maybe(univ)
-                                % Remember the first of any exceptions thrown
-                                % by child threads.
-            ).
+static MC_JobCtl *  MC_create_job_ctl(MR_Integer total_tasks);
+static void         MC_lock_job_ctl(MC_JobCtl *job_ctl);
+static void         MC_unlock_job_ctl(MC_JobCtl *job_ctl);
 
-:- instance par_fold(thread_par_fold) where [
-    pred(run_in_child/8) is run_in_child_thread,
-    pred(wait_for_child_exit/5) is wait_for_child_thread_exit
-].
+#endif /* MC_HAVE_JOBCTL_IPC */
+").
 
-:- pred foldl2_maybe_stop_at_error_parallel_threads(bool::in, int::in,
-    foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
-    list(T)::in, bool::out, Info::in, io::di, io::uo) is det.
-
-foldl2_maybe_stop_at_error_parallel_threads(KeepGoing, Jobs, MakeTarget,
-        Targets, Success, Info, !IO) :-
-    channel.init(Channel, !IO),
-    PF0 = thread_par_fold(Channel, no),
-    do_parallel_foldl2(KeepGoing, Jobs, MakeTarget, Info, Targets, Success,
-        PF0, PF, !IO),
-    %
-    % Rethrow the first of any exceptions which terminated a child thread.
+:- pragma foreign_code("C", "
+
+#ifdef MC_HAVE_JOBCTL_IPC
+
+static MC_JobCtl *
+MC_create_job_ctl(MR_Integer total_tasks)
+{
+    sem_t       sem;
+    int         shmid;
+    MC_JobCtl   *job_ctl;
+    MR_Integer  i;
+
+    shmid = -1;
+
+    /* Create semaphore. */
+    if (sem_init(&sem, 1, 1) != 0) {
+        perror(""MC_create_job_ctl: sem_init"");
+        return NULL;
+    }
+
+    /* Create the shared memory segment. */
+    job_ctl = mmap(NULL, MC_JOB_CTL_SIZE(total_tasks), PROT_READ | PROT_WRITE,
+        MAP_ANONYMOUS | MAP_SHARED, -1, 0);
+    if (job_ctl == (void *) -1) {
+        perror(""MC_create_job_ctl: mmap"");
+        sem_destroy(&sem);
+        return NULL;
+    }
+
+    job_ctl->jc_sem = sem;
+    job_ctl->jc_total_tasks = total_tasks;
+    job_ctl->jc_abort = MR_FALSE;
+    for (i = 0; i < total_tasks; i++) {
+        job_ctl->jc_tasks[i] = TASK_NEW;
+    }
+
+    return job_ctl;
+}
+
+static void
+MC_lock_job_ctl(MC_JobCtl *job_ctl)
+{
+    sem_wait(&job_ctl->jc_sem);
+}
+
+static void
+MC_unlock_job_ctl(MC_JobCtl *job_ctl)
+{
+    sem_post(&job_ctl->jc_sem);
+}
+
+#endif /* MC_HAVE_JOBCTL_IPC */
+").
+
+:- type job_ctl.
+:- pragma foreign_type("C", job_ctl, "MC_JobCtl *").
+
+:- pred have_job_ctl_ipc is semidet.
+
+have_job_ctl_ipc :-
+    semidet_fail.
+
+:- pragma foreign_proc("C",
+    have_job_ctl_ipc,
+    [will_not_call_mercury, promise_pure, thread_safe, may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+    SUCCESS_INDICATOR = MR_TRUE;
+#else
+    SUCCESS_INDICATOR = MR_FALSE;
+#endif
+").
+
+    % The job control structure should really be unique that's too annoying.
     %
-    MaybeExcp = PF ^ tpf_maybe_excp,
-    (
-        MaybeExcp = yes(Excp),
-        rethrow(exception(Excp) : exception_result(unit))
-    ;
-        MaybeExcp = no
-    ).
+:- pred create_job_ctl(int::in, maybe(job_ctl)::out, io::di, io::uo) is det.
 
-:- pred run_in_child_thread(
-    foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
-    Info::in, T::in, bool::out, thread_par_fold::in, thread_par_fold::out,
-    io::di, io::uo) is det.
+:- pragma foreign_proc("C",
+    create_job_ctl(TotalJobs::in, MaybeJobCtl::out, IO0::di, IO::uo),
+    [may_call_mercury, promise_pure, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+    MC_JobCtl *job_ctl;
+
+    job_ctl = MC_create_job_ctl(TotalJobs);
+    if (job_ctl != NULL) {
+        MaybeJobCtl = MC_make_yes_job_ctl(job_ctl);
+    } else {
+        MaybeJobCtl = MC_make_no_job_ctl();
+    }
+#else
+    MaybeJobCtl = MC_make_no_job_ctl();
+#endif
+    IO = IO0;
+").
 
-run_in_child_thread(P, Info, T, ChildStarted, PF, PF, !IO) :-
-    promise_equivalent_solutions [!:IO] (
-        spawn((pred(!.IO::di, !:IO::uo) is cc_multi :-
-            try_io((pred(Succ::out, !.IO::di, !:IO::uo) is det :-
-                P(T, Succ, Info, _Info, !IO)
-            ), Result, !IO),
-            (
-                Result = succeeded(yes),
-                Exit = child_succeeded
-            ;
-                Result = succeeded(no),
-                Exit = child_failed
-            ;
-                Result = exception(Excp),
-                Exit = child_exception(Excp)
-            ),
-            channel.put(PF ^ tpf_channel, Exit, !IO)
-        ), !IO)
-    ),
-    ChildStarted = yes.
+:- pred destroy_job_ctl(job_ctl::in, io::di, io::uo) is det.
 
-:- pred wait_for_child_thread_exit(child_exit::out(child_succeeded_or_failed),
-    thread_par_fold::in, thread_par_fold::out, io::di, io::uo) is det.
+:- pragma foreign_proc("C",
+    destroy_job_ctl(JobCtl::in, IO0::di, IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+    sem_destroy(&JobCtl->jc_sem);
+    munmap(JobCtl, MC_JOB_CTL_SIZE(JobCtl->jc_total_tasks));
+#endif
+    IO = IO0;
+").
 
-wait_for_child_thread_exit(ChildExit, !PF, !IO) :-
-    channel.take(!.PF ^ tpf_channel, ChildExit0, !IO),
-    (
-        ( ChildExit0 = child_succeeded
-        ; ChildExit0 = child_failed
-        ),
-        ChildExit = ChildExit0
-    ;
-        ChildExit0 = child_exception(Excp),
-        ChildExit = child_failed,
-        MaybeExcp0 = !.PF ^ tpf_maybe_excp,
-        (
-            MaybeExcp0 = no,
-            !PF ^ tpf_maybe_excp := yes(Excp)
-        ;
-            MaybeExcp0 = yes(_)
-        )
-    ).
+:- pred accept_task(job_ctl::in, int::out, io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+    accept_task(JobCtl::in, TaskNumber::out, IO0::di, IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+    TaskNumber = -1;
+
+#ifdef MC_HAVE_JOBCTL_IPC
+    MC_lock_job_ctl(JobCtl);
+
+    if (!JobCtl->jc_abort) {
+        MR_Integer  i;
+
+        for (i = 0; i < JobCtl->jc_total_tasks; i++) {
+            if (JobCtl->jc_tasks[i] == TASK_NEW) {
+                JobCtl->jc_tasks[i] = TASK_ACCEPTED;
+                TaskNumber = i;
+                break;
+            }
+        }
+    }
+
+    MC_unlock_job_ctl(JobCtl);
+#endif
+
+    IO = IO0;
+").
+
+:- pred mark_task_done(job_ctl::in, int::in, io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+    mark_task_done(JobCtl::in, TaskNumber::in, IO0::di, IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+    MC_lock_job_ctl(JobCtl);
+    JobCtl->jc_tasks[TaskNumber] = TASK_DONE;
+    MC_unlock_job_ctl(JobCtl);
+#endif
+    IO = IO0;
+").
+
+:- pred mark_task_error(job_ctl::in, int::in, bool::in, io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+    mark_task_error(JobCtl::in, TaskNumber::in, KeepGoing::in,
+        IO0::di, IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+    MC_lock_job_ctl(JobCtl);
+
+    JobCtl->jc_tasks[TaskNumber] = TASK_ERROR;
+    if (!KeepGoing) {
+        JobCtl->jc_abort = MR_TRUE;
+    }
+
+    MC_unlock_job_ctl(JobCtl);
+#endif
+    IO = IO0;
+").
+
+:- func make_yes_job_ctl(job_ctl) = maybe(job_ctl).
+:- pragma foreign_export("C", make_yes_job_ctl(in) = out,
+    "MC_make_yes_job_ctl").
+
+make_yes_job_ctl(JobCtl) = yes(JobCtl).
+
+:- func make_no_job_ctl = maybe(job_ctl).
+:- pragma foreign_export("C", make_no_job_ctl = out,
+    "MC_make_no_job_ctl").
+
+make_no_job_ctl = no.
 
 %-----------------------------------------------------------------------------%
 %-----------------------------------------------------------------------------%
diff --git a/compiler/process_util.m b/compiler/process_util.m
index 3209b80..5cd3858 100644
--- a/compiler/process_util.m
+++ b/compiler/process_util.m
@@ -95,6 +95,14 @@
 :- pred start_in_forked_process(io_pred::in(io_pred), maybe(pid)::out,
     io::di, io::uo) is det.
 
+    % wait_pid(Pid, ExitCode, !IO)
+    %
+    % Block until the child process with process id Pid exited.
+    % Return the exit code of the child.
+    %
+:- pred wait_pid(pid::in, io.res(io.system_result)::out, io::di, io::uo)
+    is det.
+
     % wait_any(Pid, ExitCode, !IO)
     %
     % Block until a child process has exited. Return the process ID
@@ -423,35 +431,32 @@ call_child_process_io_pred(P, Status, !IO) :-
 #endif
 
         while (1) {
-            wait_status = wait(&child_status);
-            if (Pid == -1 || wait_status == Pid) {
+            wait_status = waitpid(Pid, &child_status, 0);
+            if (wait_status != -1) {
                 WaitedPid = wait_status;
                 Status = child_status;
                 break;
-            } else if (wait_status == -1) {
-                if (MR_is_eintr(errno)) {
-                    if (MC_signalled) {
-                        /*
-                        ** A normally fatal signal has been received,
-                        ** so kill the child immediately.
-                        ** Use SIGTERM, not MC_signal_received,
-                        ** because the child may be inside a call
-                        ** to system() which would cause SIGINT
-                        ** to be ignored on some systems (e.g. Linux).
-                        */
-                        if (Pid != -1) {
-                            kill(Pid, SIGTERM);
-                        }
-                        break;
-                    }
-                } else {
+            } else if (MR_is_eintr(errno)) {
+                if (MC_signalled) {
                     /*
-                    ** This should never happen.
+                    ** A normally fatal signal has been received, so kill the
+                    ** child immediately.  Use SIGTERM, not MC_signal_received,
+                    ** because the child may be inside a call to system() which
+                    ** would cause SIGINT to be ignored on some systems (e.g.
+                    ** Linux).
                     */
-                    MR_perror(""error in wait(): "");
-                    Status = 1;
+                    if (Pid != -1) {
+                        kill(Pid, SIGTERM);
+                    }
                     break;
                 }
+            } else {
+                /*
+                ** This should never happen.
+                */
+                MR_perror(""error in wait(): "");
+                Status = 1;
+                break;
             }
         }
 
@@ -475,6 +480,10 @@ call_child_process_io_pred(P, Status, !IO) :-
 #endif /* ! MC_CAN_FORK */
 ").
 
+wait_pid(Pid, Status, !IO) :-
+    do_wait(Pid, _Pid, Status0, !IO),
+    Status = io.handle_system_command_exit_status(Status0).
+
 wait_any(Pid, Status, !IO) :-
     do_wait(-1, Pid, Status0, !IO),
     Status = io.handle_system_command_exit_status(Status0).
diff --git a/configure.in b/configure.in
index 418dcef..b18d87e 100644
--- a/configure.in
+++ b/configure.in
@@ -1096,7 +1096,7 @@ MERCURY_CHECK_FOR_HEADERS( \
         asm/sigcontext.h sys/param.h sys/time.h sys/times.h \
         sys/types.h sys/stat.h fcntl.h termios.h sys/ioctl.h \
         sys/stropts.h windows.h dirent.h getopt.h malloc.h \
-        semaphore.h pthread.h time.h spawn.h fenv.h)
+        semaphore.h pthread.h time.h spawn.h fenv.h sys/mman.h)
 
 if test "$MR_HAVE_GETOPT_H" = 1; then
     GETOPT_H_AVAILABLE=yes
@@ -1111,7 +1111,7 @@ fi
 
 #-----------------------------------------------------------------------------#
 #
-# Check whether we can set the FP roudning mode
+# Check whether we can set the FP rounding mode
 #
 
 MERCURY_CHECK_FOR_FENV_FUNC([fesetround], [$MATH_LIB])
@@ -4379,6 +4379,15 @@ MERCURY_CHECK_FOR_IEEE_FUNC(isinff)
 
 #-----------------------------------------------------------------------------#
 #
+# Check whether POSIX semaphores requires -lrt
+#
+
+AC_CHECK_LIB(rt, sem_init, SEMAPHORE_LIBRARY=-lrt, SEMAPHORE_LIBRARY="")
+
+AC_SUBST(SEMAPHORE_LIBRARY)
+
+#-----------------------------------------------------------------------------#
+#
 # Check whether sockets work (we need them for the external debugger)
 #
 
diff --git a/runtime/mercury_conf.h.in b/runtime/mercury_conf.h.in
index f987a07..987e84c 100644
--- a/runtime/mercury_conf.h.in
+++ b/runtime/mercury_conf.h.in
@@ -134,6 +134,7 @@
 **	MR_HAVE_TIME_H		we have <time.h>
 ** 	MR_HAVE_SPAWN_H		we have <spawn.h>
 **	MR_HAVE_FENV_H		we have <fenv.h>
+**	MR_HAVE_SYS_MMAN_H	we have <sys/mman.h>
 */
 #undef	MR_HAVE_SYS_SIGINFO_H
 #undef	MR_HAVE_SYS_SIGNAL_H
@@ -161,6 +162,7 @@
 #undef	MR_HAVE_TIME_H
 #undef	MR_HAVE_SPAWN_H
 #undef	MR_HAVE_FENV_H
+#undef	MR_HAVE_SYS_MMAN_H
 
 /*
 ** MR_HAVE_POSIX_TIMES is defined if we have the POSIX
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to:       mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions:          mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------



More information about the reviews mailing list