[m-rev.] for review: more efficient parallel mmc --make
Peter Wang
novalazy at gmail.com
Tue Jul 28 17:04:27 AEST 2009
Branches: main
Fix a performance problem with mmc --make --jobs <n>. It would fork a new
process for each target, whether or not that target was already up-to-date.
For a large program which is mostly up-to-date, this could be very costly.
The main reason it was implemented so was to avoid interprocess communication
which might be non-portable (the only communication was via a sub-process's
exit code).
This patch makes use of IPC to allow the master process to fork off only N-1
worker processes, itself being the last worker. The workers communicate via
shared memory as to which tasks still need doing, and which are completed.
A worker process therefore may work on as many tasks as required before
terminating.
Parallel mmc --make is currently disabled in .par grades. It should be
fairly simple to restore.
configure.in:
runtime/mercury_conf.h.in:
Check sys/mman.h is available for mmap().
Check if -lrt is required for POSIX semaphores.
Mmake.common.in:
compiler/Mmakefile:
Link with -lrt if found.
compiler/make.util.m:
Reimplement process-based concurrent fold predicate as above.
Fall back to non-concurrent fold if the IPC mechanisms are
unavailable.
Delete old thread-based concurrent fold implementation.
compiler/process_util.m:
Add wait_pid to wait on a particular child process.
compiler/Mercury.options:
Set --no-ansi-c on make.util.m.
diff --git a/Mmake.common.in b/Mmake.common.in
index eb80417..aaca235 100644
--- a/Mmake.common.in
+++ b/Mmake.common.in
@@ -214,6 +214,7 @@ PERL=@PERL@
MATH_LIB=@MATH_LIB@
# More libraries to link
+SEMAPHORE_LIBRARY=@SEMAPHORE_LIBRARY@
SOCKET_LIBRARY=@SOCKET_LIBRARY@
NSL_LIBRARY=@NSL_LIBRARY@
DL_LIBRARY=@DL_LIBRARY@
diff --git a/compiler/Mercury.options b/compiler/Mercury.options
index ea6a779..cb770a1 100644
--- a/compiler/Mercury.options
+++ b/compiler/Mercury.options
@@ -20,6 +20,9 @@ MCFLAGS-hlds.quantification = --optimize-unused-args
# which are not available with `--ansi'.
MGNUCFLAGS-libs.process_util = --no-ansi
+# make.util.m uses POSIX IPC.
+MGNUCFLAGS-make.util = --no-ansi
+
# If intermodule optimization is enabled, we also need to use `--no-ansi'
# for the following modules, because they import process_util.
MGNUCFLAGS-make.module_dep_file = --no-ansi
@@ -33,6 +36,7 @@ MCFLAGS-libs.process_util = --no-ansi-c
MCFLAGS-make.module_dep_file = --no-ansi-c
MCFLAGS-make.module_target = --no-ansi-c
MCFLAGS-make.program_target = --no-ansi-c
+MCFLAGS-make.util = --no-ansi-c
MCFLAGS-erl_backend = --no-warn-unused-imports
MCFLAGS-hlds = --no-warn-unused-imports
diff --git a/compiler/Mmakefile b/compiler/Mmakefile
index e699a71..62a0d60 100644
--- a/compiler/Mmakefile
+++ b/compiler/Mmakefile
@@ -68,6 +68,7 @@ MCFLAGS += --flags COMP_FLAGS $(CONFIG_OVERRIDE)
MLOBJS := ../main.$O \
../trace/lib$(EVENTSPEC_LIB_NAME).$A \
$(MLOBJS)
+MLLIBS += $(SEMAPHORE_LIBRARY)
ALL_MLLIBS = $(MLLIBS) $(EXTRA_MLLIBS) $(GCC_BACKEND_LIBS)
MLFLAGS += --no-main --shared
C2INITARGS += $(MDBCOMP_DIR)/$(MDBCOMP_LIB_NAME).init
diff --git a/compiler/make.util.m b/compiler/make.util.m
index 0345470..bc883d0 100644
--- a/compiler/make.util.m
+++ b/compiler/make.util.m
@@ -330,6 +330,7 @@
:- import_module dir.
:- import_module exception.
:- import_module getopt_io.
+:- import_module maybe.
:- import_module set.
:- import_module thread.
:- import_module thread.channel.
@@ -392,50 +393,17 @@ foldl3_maybe_stop_at_error_2(KeepGoing, P, [T | Ts],
% Parallel (concurrent) fold
%
-:- type child_exit
- ---> child_succeeded
- ; child_failed
- ; child_exception(univ).
-
-:- inst child_succeeded_or_failed
- ---> child_succeeded
- ; child_failed.
-
- % A generic interface for the two parallel fold implementations:
- % one using processes and one using threads.
- %
-:- typeclass par_fold(PF) where [
-
- % run_in_child(Pred, Info, T, Succeeded, !PF, !IO)
- %
- % Start executing Pred in a child thread/process. Succeeded is `yes' iff
- % the child was successfully spawned.
- %
- pred run_in_child(
- foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
- Info::in, T::in, bool::out, PF::in, PF::out, io::di, io::uo) is det,
-
- % Block until a child exit code is received.
- %
- pred wait_for_child_exit(child_exit::out(child_succeeded_or_failed),
- PF::in, PF::out, io::di, io::uo) is det
-].
-
foldl2_maybe_stop_at_error_maybe_parallel(KeepGoing, MakeTarget, Targets,
Success, !Info, !IO) :-
globals.io_lookup_int_option(jobs, Jobs, !IO),
- ( Jobs > 1 ->
+ (
+ Jobs > 1,
+ process_util.can_fork,
+ have_job_ctl_ipc
+ ->
% First pass.
- % fork() is disabled on threaded grades.
- ( process_util.can_fork ->
- foldl2_maybe_stop_at_error_parallel_processes(KeepGoing, Jobs,
- MakeTarget, Targets, Success0, !.Info, !IO)
- ; thread.can_spawn ->
- foldl2_maybe_stop_at_error_parallel_threads(KeepGoing, Jobs,
- MakeTarget, Targets, Success0, !.Info, !IO)
- ;
- Success0 = yes
- ),
+ foldl2_maybe_stop_at_error_parallel_processes(KeepGoing, Jobs,
+ MakeTarget, Targets, Success0, !Info, !IO),
% Second pass (sequential).
(
Success0 = yes,
@@ -455,270 +423,336 @@ foldl2_maybe_stop_at_error_maybe_parallel(KeepGoing, MakeTarget, Targets,
!Info, !IO)
).
-:- pred do_parallel_foldl2(bool::in, int::in,
+:- pred foldl2_maybe_stop_at_error_parallel_processes(bool::in, int::in,
foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
- Info::in, list(T)::in, bool::out, PF::in, PF::out, io::di, io::uo)
- is det <= par_fold(PF).
-
-do_parallel_foldl2(KeepGoing, Jobs, MakeTarget, Info, Targets, Success,
- !PF, !IO) :-
- list.split_upto(Jobs, Targets, InitialTargets, LaterTargets),
- start_initial_child_jobs(KeepGoing, MakeTarget, Info, InitialTargets,
- 0, NumChildJobs, !PF, !IO),
- ( NumChildJobs < length(InitialTargets) ->
- Success0 = no
- ;
- Success0 = yes
- ),
- do_parallel_foldl2_parent_loop(KeepGoing, MakeTarget, Info,
- NumChildJobs, LaterTargets, Success0, Success, !PF, !IO).
+ list(T)::in, bool::out, Info::in, Info::out, io::di, io::uo) is det.
-:- pred start_initial_child_jobs(bool::in,
- foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
- Info::in, list(T)::in, int::in, int::out,
- PF::in, PF::out, io::di, io::uo) is det <= par_fold(PF).
-
-start_initial_child_jobs(_KeepGoing, _MakeTarget, _Info,
- [], !NumChildJobs, !PF, !IO).
-start_initial_child_jobs(KeepGoing, MakeTarget, Info,
- [Target | Targets], !NumChildJobs, !PF, !IO) :-
- run_in_child(MakeTarget, Info, Target, Success, !PF, !IO),
+foldl2_maybe_stop_at_error_parallel_processes(KeepGoing, Jobs, MakeTarget,
+ Targets, Success, !Info, !IO) :-
+ TotalTasks = list.length(Targets),
+ create_job_ctl(TotalTasks, MaybeJobCtl, !IO),
(
- Success = yes,
- start_initial_child_jobs(KeepGoing, MakeTarget, Info, Targets,
- !.NumChildJobs + 1, !:NumChildJobs, !PF, !IO)
- ;
- Success = no,
- KeepGoing = yes,
- start_initial_child_jobs(KeepGoing, MakeTarget, Info, Targets,
- !NumChildJobs, !PF, !IO)
+ MaybeJobCtl = yes(JobCtl),
+ list.map_foldl(
+ start_worker_process(KeepGoing, MakeTarget, Targets, JobCtl,
+ !.Info),
+ 2 .. Jobs, MaybePids, !IO),
+ worker_loop(KeepGoing, MakeTarget, Targets, JobCtl, yes, Success0,
+ !Info, !IO),
+ list.foldl2(reap_worker_process, MaybePids, Success0, Success, !IO),
+ destroy_job_ctl(JobCtl, !IO)
;
- Success = no,
- KeepGoing = no
+ MaybeJobCtl = no,
+ Success = no
).
-:- pred do_parallel_foldl2_parent_loop(bool::in,
+:- pred start_worker_process(bool::in,
foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
- Info::in, int::in, list(T)::in, bool::in, bool::out, PF::in, PF::out,
- io::di, io::uo) is det <= par_fold(PF).
+ list(T)::in, job_ctl::in, Info::in, int::in, maybe(pid)::out,
+ io::di, io::uo) is det.
-do_parallel_foldl2_parent_loop(KeepGoing, MakeTarget, Info, NumChildJobs0,
- Targets, !Success, !PF, !IO) :-
- (
- % We are done once all running children have terminated and there are
- % no more targets to make.
- NumChildJobs0 = 0,
- Targets = []
- ->
- true
- ;
- % Wait for a running child to indicate that it is finished.
- wait_for_child_exit(Exit, !PF, !IO),
+start_worker_process(KeepGoing, MakeTarget, Targets, JobCtl, Info, _ChildN,
+ MaybePid, !IO) :-
+ start_in_forked_process(
+ child_worker(KeepGoing, MakeTarget, Targets, JobCtl, Info),
+ MaybePid, !IO).
+
+:- pred child_worker(bool::in,
+ foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
+ list(T)::in, job_ctl::in, Info::in, bool::out, io::di, io::uo) is det.
+
+child_worker(KeepGoing, MakeTarget, Targets, JobCtl, Info0, Success, !IO) :-
+ worker_loop(KeepGoing, MakeTarget, Targets, JobCtl, yes, Success,
+ Info0, _Info, !IO).
+
+:- pred worker_loop(bool::in,
+ foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
+ list(T)::in, job_ctl::in, bool::in, bool::out, Info::in, Info::out,
+ io::di, io::uo) is det.
+
+worker_loop(KeepGoing, MakeTarget, Targets, JobCtl, !Success, !Info, !IO) :-
+ accept_task(JobCtl, TaskNumber, !IO),
+ ( TaskNumber >= 0 ->
+ Target = list.det_index0(Targets, TaskNumber),
+ MakeTarget(Target, TargetSuccess, !Info, !IO),
(
- Exit = child_succeeded,
- NewSuccess = yes
+ TargetSuccess = yes,
+ mark_task_done(JobCtl, TaskNumber, !IO)
;
- Exit = child_failed,
- NewSuccess = no
+ TargetSuccess = no,
+ mark_task_error(JobCtl, TaskNumber, KeepGoing, !IO),
+ !:Success = no
),
+ worker_loop(KeepGoing, MakeTarget, Targets, JobCtl,
+ !Success, !Info, !IO)
+ ;
+ % No more tasks.
+ true
+ ).
+
+:- pred reap_worker_process(maybe(pid)::in, bool::in, bool::out,
+ io::di, io::uo) is det.
+
+reap_worker_process(MaybePid, !Success, !IO) :-
+ (
+ MaybePid = yes(Pid),
+ wait_pid(Pid, Status, !IO),
(
- ( NewSuccess = yes
- ; KeepGoing = yes
- )
+ !.Success = yes,
+ Status = ok(exited(0))
->
- !:Success = !.Success `and` NewSuccess,
- (
- Targets = [],
- MoreTargets = [],
- NumChildJobs = NumChildJobs0 - 1
- ;
- Targets = [NextTarget | MoreTargets],
- run_in_child(MakeTarget, Info, NextTarget, ChildStarted,
- !PF, !IO),
- (
- ChildStarted = yes,
- NumChildJobs = NumChildJobs0
- ;
- ChildStarted = no,
- NumChildJobs = NumChildJobs0 - 1,
- !:Success = no
- )
- ),
- do_parallel_foldl2_parent_loop(KeepGoing, MakeTarget, Info,
- NumChildJobs, MoreTargets, !Success, !PF, !IO)
+ true
;
- % Wait for the other running children to terminate before
- % returning.
- !:Success = no,
- wait_for_child_exits(NumChildJobs0 - 1, !PF, !IO)
+ !:Success = no
)
- ).
-
-:- pred wait_for_child_exits(int::in,
- PF::in, PF::out, io::di, io::uo) is det <= par_fold(PF).
-
-wait_for_child_exits(Num, !PF, !IO) :-
- ( Num > 0 ->
- wait_for_child_exit(_, !PF, !IO),
- wait_for_child_exits(Num - 1, !PF, !IO)
;
- true
+ MaybePid = no
).
%-----------------------------------------------------------------------------%
%
-% Parallel fold using processes
+% Shared memory IPC for parallel workers
%
-:- type fork_par_fold
- ---> fork_par_fold(
- fpf_children :: set(pid)
- ).
+:- pragma foreign_decl("C", "
+typedef struct MC_JobCtl MC_JobCtl;
+").
-:- instance par_fold(fork_par_fold) where [
- pred(run_in_child/8) is run_in_child_process,
- pred(wait_for_child_exit/5) is wait_for_child_process_exit
-].
+:- pragma foreign_decl("C", local,
+"
+#if defined(MR_HAVE_SEMAPHORE_H) && defined(MR_HAVE_SYS_MMAN_H)
+ #include <semaphore.h>
+ #include <sys/mman.h>
-:- pred foldl2_maybe_stop_at_error_parallel_processes(bool::in, int::in,
- foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
- list(T)::in, bool::out, Info::in, io::di, io::uo) is det.
+ /* Just in case. */
+ #if !defined(MAP_ANONYMOUS) && defined(MAP_ANON)
+ #define MAP_ANONYMOUS MAP_ANON
+ #endif
-foldl2_maybe_stop_at_error_parallel_processes(KeepGoing, Jobs, MakeTarget,
- Targets, Success, Info, !IO) :-
- PF0 = fork_par_fold(set.init),
- do_parallel_foldl2(KeepGoing, Jobs, MakeTarget, Info, Targets,
- Success, PF0, _PF, !IO).
+ #ifdef MAP_ANONYMOUS
+ #define MC_HAVE_JOBCTL_IPC 1
+ #endif
+#endif
-:- pred run_in_child_process(
- foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
- Info::in, T::in, bool::out, fork_par_fold::in, fork_par_fold::out,
- io::di, io::uo) is det.
+#ifdef MC_HAVE_JOBCTL_IPC
-run_in_child_process(P, Info, T, ChildStarted, PF0, PF, !IO) :-
- start_in_forked_process(
- (pred(Success::out, !.IO::di, !:IO::uo) is det :-
- P(T, Success, Info, _Info, !IO)
- ), MaybePid, !IO),
- (
- MaybePid = yes(Pid),
- ChildStarted = yes,
- PF0 = fork_par_fold(Set0),
- set.insert(Set0, Pid, Set),
- PF = fork_par_fold(Set)
- ;
- MaybePid = no,
- ChildStarted = no,
- PF = PF0
- ).
+typedef enum MC_TaskStatus MC_TaskStatus;
-:- pred wait_for_child_process_exit(child_exit::out(child_succeeded_or_failed),
- fork_par_fold::in, fork_par_fold::out, io::di, io::uo) is det.
+enum MC_TaskStatus {
+ TASK_NEW, /* task not yet attempted */
+ TASK_ACCEPTED, /* someone is working on this task */
+ TASK_DONE, /* task successfully completed */
+ TASK_ERROR /* error occurred when working on the task */
+};
-wait_for_child_process_exit(ChildExit, PF0, PF, !IO) :-
- wait_any(DeadPid, ChildStatus, !IO),
- fork_par_fold(Pids0) = PF0,
- ( set.remove(Pids0, DeadPid, Pids) ->
- ( ChildStatus = ok(exited(0)) ->
- ChildExit = child_succeeded
- ;
- ChildExit = child_failed
- ),
- PF = fork_par_fold(Pids)
- ;
- % Not a child of ours, maybe a grand child. Ignore it.
- wait_for_child_process_exit(ChildExit, PF0, PF, !IO)
- ).
+/* This structure is placed in shared memory. */
+struct MC_JobCtl {
+ /* Static data. */
+ sem_t jc_sem;
+ MR_Integer jc_total_tasks;
-%-----------------------------------------------------------------------------%
-%
-% Parallel fold using threads
-%
+ /* Dynamic data, protected with a semaphore. */
+ MR_bool jc_abort;
+ MC_TaskStatus jc_tasks[MR_VARIABLE_SIZED];
+};
-:- type thread_par_fold
- ---> thread_par_fold(
- tpf_channel :: channel(child_exit),
- % A channel to communicate between the children
- % and the parent.
+#define MC_JOB_CTL_SIZE(N) (sizeof(MC_JobCtl) + (N) * sizeof(MC_TaskStatus))
- tpf_maybe_excp :: maybe(univ)
- % Remember the first of any exceptions thrown
- % by child threads.
- ).
+static MC_JobCtl * MC_create_job_ctl(MR_Integer total_tasks);
+static void MC_lock_job_ctl(MC_JobCtl *job_ctl);
+static void MC_unlock_job_ctl(MC_JobCtl *job_ctl);
-:- instance par_fold(thread_par_fold) where [
- pred(run_in_child/8) is run_in_child_thread,
- pred(wait_for_child_exit/5) is wait_for_child_thread_exit
-].
+#endif /* MC_HAVE_JOBCTL_IPC */
+").
-:- pred foldl2_maybe_stop_at_error_parallel_threads(bool::in, int::in,
- foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
- list(T)::in, bool::out, Info::in, io::di, io::uo) is det.
-
-foldl2_maybe_stop_at_error_parallel_threads(KeepGoing, Jobs, MakeTarget,
- Targets, Success, Info, !IO) :-
- channel.init(Channel, !IO),
- PF0 = thread_par_fold(Channel, no),
- do_parallel_foldl2(KeepGoing, Jobs, MakeTarget, Info, Targets, Success,
- PF0, PF, !IO),
- %
- % Rethrow the first of any exceptions which terminated a child thread.
+:- pragma foreign_code("C", "
+
+#ifdef MC_HAVE_JOBCTL_IPC
+
+static MC_JobCtl *
+MC_create_job_ctl(MR_Integer total_tasks)
+{
+ sem_t sem;
+ int shmid;
+ MC_JobCtl *job_ctl;
+ MR_Integer i;
+
+ shmid = -1;
+
+ /* Create semaphore. */
+ if (sem_init(&sem, 1, 1) != 0) {
+ perror(""MC_create_job_ctl: sem_init"");
+ return NULL;
+ }
+
+ /* Create the shared memory segment. */
+ job_ctl = mmap(NULL, MC_JOB_CTL_SIZE(total_tasks), PROT_READ | PROT_WRITE,
+ MAP_ANONYMOUS | MAP_SHARED, -1, 0);
+ if (job_ctl == (void *) -1) {
+ perror(""MC_create_job_ctl: mmap"");
+ sem_destroy(&sem);
+ return NULL;
+ }
+
+ job_ctl->jc_sem = sem;
+ job_ctl->jc_total_tasks = total_tasks;
+ job_ctl->jc_abort = MR_FALSE;
+ for (i = 0; i < total_tasks; i++) {
+ job_ctl->jc_tasks[i] = TASK_NEW;
+ }
+
+ return job_ctl;
+}
+
+static void
+MC_lock_job_ctl(MC_JobCtl *job_ctl)
+{
+ sem_wait(&job_ctl->jc_sem);
+}
+
+static void
+MC_unlock_job_ctl(MC_JobCtl *job_ctl)
+{
+ sem_post(&job_ctl->jc_sem);
+}
+
+#endif /* MC_HAVE_JOBCTL_IPC */
+").
+
+:- type job_ctl.
+:- pragma foreign_type("C", job_ctl, "MC_JobCtl *").
+
+:- pred have_job_ctl_ipc is semidet.
+
+have_job_ctl_ipc :-
+ semidet_fail.
+
+:- pragma foreign_proc("C",
+ have_job_ctl_ipc,
+ [will_not_call_mercury, promise_pure, thread_safe, may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+ SUCCESS_INDICATOR = MR_TRUE;
+#else
+ SUCCESS_INDICATOR = MR_FALSE;
+#endif
+").
+
+ % The job control structure should really be unique that's too annoying.
%
- MaybeExcp = PF ^ tpf_maybe_excp,
- (
- MaybeExcp = yes(Excp),
- rethrow(exception(Excp) : exception_result(unit))
- ;
- MaybeExcp = no
- ).
+:- pred create_job_ctl(int::in, maybe(job_ctl)::out, io::di, io::uo) is det.
-:- pred run_in_child_thread(
- foldl2_pred_with_status(T, Info, io)::in(foldl2_pred_with_status),
- Info::in, T::in, bool::out, thread_par_fold::in, thread_par_fold::out,
- io::di, io::uo) is det.
+:- pragma foreign_proc("C",
+ create_job_ctl(TotalJobs::in, MaybeJobCtl::out, IO0::di, IO::uo),
+ [may_call_mercury, promise_pure, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+ MC_JobCtl *job_ctl;
+
+ job_ctl = MC_create_job_ctl(TotalJobs);
+ if (job_ctl != NULL) {
+ MaybeJobCtl = MC_make_yes_job_ctl(job_ctl);
+ } else {
+ MaybeJobCtl = MC_make_no_job_ctl();
+ }
+#else
+ MaybeJobCtl = MC_make_no_job_ctl();
+#endif
+ IO = IO0;
+").
-run_in_child_thread(P, Info, T, ChildStarted, PF, PF, !IO) :-
- promise_equivalent_solutions [!:IO] (
- spawn((pred(!.IO::di, !:IO::uo) is cc_multi :-
- try_io((pred(Succ::out, !.IO::di, !:IO::uo) is det :-
- P(T, Succ, Info, _Info, !IO)
- ), Result, !IO),
- (
- Result = succeeded(yes),
- Exit = child_succeeded
- ;
- Result = succeeded(no),
- Exit = child_failed
- ;
- Result = exception(Excp),
- Exit = child_exception(Excp)
- ),
- channel.put(PF ^ tpf_channel, Exit, !IO)
- ), !IO)
- ),
- ChildStarted = yes.
+:- pred destroy_job_ctl(job_ctl::in, io::di, io::uo) is det.
-:- pred wait_for_child_thread_exit(child_exit::out(child_succeeded_or_failed),
- thread_par_fold::in, thread_par_fold::out, io::di, io::uo) is det.
+:- pragma foreign_proc("C",
+ destroy_job_ctl(JobCtl::in, IO0::di, IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+ sem_destroy(&JobCtl->jc_sem);
+ munmap(JobCtl, MC_JOB_CTL_SIZE(JobCtl->jc_total_tasks));
+#endif
+ IO = IO0;
+").
-wait_for_child_thread_exit(ChildExit, !PF, !IO) :-
- channel.take(!.PF ^ tpf_channel, ChildExit0, !IO),
- (
- ( ChildExit0 = child_succeeded
- ; ChildExit0 = child_failed
- ),
- ChildExit = ChildExit0
- ;
- ChildExit0 = child_exception(Excp),
- ChildExit = child_failed,
- MaybeExcp0 = !.PF ^ tpf_maybe_excp,
- (
- MaybeExcp0 = no,
- !PF ^ tpf_maybe_excp := yes(Excp)
- ;
- MaybeExcp0 = yes(_)
- )
- ).
+:- pred accept_task(job_ctl::in, int::out, io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+ accept_task(JobCtl::in, TaskNumber::out, IO0::di, IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+ TaskNumber = -1;
+
+#ifdef MC_HAVE_JOBCTL_IPC
+ MC_lock_job_ctl(JobCtl);
+
+ if (!JobCtl->jc_abort) {
+ MR_Integer i;
+
+ for (i = 0; i < JobCtl->jc_total_tasks; i++) {
+ if (JobCtl->jc_tasks[i] == TASK_NEW) {
+ JobCtl->jc_tasks[i] = TASK_ACCEPTED;
+ TaskNumber = i;
+ break;
+ }
+ }
+ }
+
+ MC_unlock_job_ctl(JobCtl);
+#endif
+
+ IO = IO0;
+").
+
+:- pred mark_task_done(job_ctl::in, int::in, io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+ mark_task_done(JobCtl::in, TaskNumber::in, IO0::di, IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+ MC_lock_job_ctl(JobCtl);
+ JobCtl->jc_tasks[TaskNumber] = TASK_DONE;
+ MC_unlock_job_ctl(JobCtl);
+#endif
+ IO = IO0;
+").
+
+:- pred mark_task_error(job_ctl::in, int::in, bool::in, io::di, io::uo) is det.
+
+:- pragma foreign_proc("C",
+ mark_task_error(JobCtl::in, TaskNumber::in, KeepGoing::in,
+ IO0::di, IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io,
+ may_not_duplicate],
+"
+#ifdef MC_HAVE_JOBCTL_IPC
+ MC_lock_job_ctl(JobCtl);
+
+ JobCtl->jc_tasks[TaskNumber] = TASK_ERROR;
+ if (!KeepGoing) {
+ JobCtl->jc_abort = MR_TRUE;
+ }
+
+ MC_unlock_job_ctl(JobCtl);
+#endif
+ IO = IO0;
+").
+
+:- func make_yes_job_ctl(job_ctl) = maybe(job_ctl).
+:- pragma foreign_export("C", make_yes_job_ctl(in) = out,
+ "MC_make_yes_job_ctl").
+
+make_yes_job_ctl(JobCtl) = yes(JobCtl).
+
+:- func make_no_job_ctl = maybe(job_ctl).
+:- pragma foreign_export("C", make_no_job_ctl = out,
+ "MC_make_no_job_ctl").
+
+make_no_job_ctl = no.
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
diff --git a/compiler/process_util.m b/compiler/process_util.m
index 3209b80..5cd3858 100644
--- a/compiler/process_util.m
+++ b/compiler/process_util.m
@@ -95,6 +95,14 @@
:- pred start_in_forked_process(io_pred::in(io_pred), maybe(pid)::out,
io::di, io::uo) is det.
+ % wait_pid(Pid, ExitCode, !IO)
+ %
+ % Block until the child process with process id Pid exited.
+ % Return the exit code of the child.
+ %
+:- pred wait_pid(pid::in, io.res(io.system_result)::out, io::di, io::uo)
+ is det.
+
% wait_any(Pid, ExitCode, !IO)
%
% Block until a child process has exited. Return the process ID
@@ -423,35 +431,32 @@ call_child_process_io_pred(P, Status, !IO) :-
#endif
while (1) {
- wait_status = wait(&child_status);
- if (Pid == -1 || wait_status == Pid) {
+ wait_status = waitpid(Pid, &child_status, 0);
+ if (wait_status != -1) {
WaitedPid = wait_status;
Status = child_status;
break;
- } else if (wait_status == -1) {
- if (MR_is_eintr(errno)) {
- if (MC_signalled) {
- /*
- ** A normally fatal signal has been received,
- ** so kill the child immediately.
- ** Use SIGTERM, not MC_signal_received,
- ** because the child may be inside a call
- ** to system() which would cause SIGINT
- ** to be ignored on some systems (e.g. Linux).
- */
- if (Pid != -1) {
- kill(Pid, SIGTERM);
- }
- break;
- }
- } else {
+ } else if (MR_is_eintr(errno)) {
+ if (MC_signalled) {
/*
- ** This should never happen.
+ ** A normally fatal signal has been received, so kill the
+ ** child immediately. Use SIGTERM, not MC_signal_received,
+ ** because the child may be inside a call to system() which
+ ** would cause SIGINT to be ignored on some systems (e.g.
+ ** Linux).
*/
- MR_perror(""error in wait(): "");
- Status = 1;
+ if (Pid != -1) {
+ kill(Pid, SIGTERM);
+ }
break;
}
+ } else {
+ /*
+ ** This should never happen.
+ */
+ MR_perror(""error in wait(): "");
+ Status = 1;
+ break;
}
}
@@ -475,6 +480,10 @@ call_child_process_io_pred(P, Status, !IO) :-
#endif /* ! MC_CAN_FORK */
").
+wait_pid(Pid, Status, !IO) :-
+ do_wait(Pid, _Pid, Status0, !IO),
+ Status = io.handle_system_command_exit_status(Status0).
+
wait_any(Pid, Status, !IO) :-
do_wait(-1, Pid, Status0, !IO),
Status = io.handle_system_command_exit_status(Status0).
diff --git a/configure.in b/configure.in
index 418dcef..b18d87e 100644
--- a/configure.in
+++ b/configure.in
@@ -1096,7 +1096,7 @@ MERCURY_CHECK_FOR_HEADERS( \
asm/sigcontext.h sys/param.h sys/time.h sys/times.h \
sys/types.h sys/stat.h fcntl.h termios.h sys/ioctl.h \
sys/stropts.h windows.h dirent.h getopt.h malloc.h \
- semaphore.h pthread.h time.h spawn.h fenv.h)
+ semaphore.h pthread.h time.h spawn.h fenv.h sys/mman.h)
if test "$MR_HAVE_GETOPT_H" = 1; then
GETOPT_H_AVAILABLE=yes
@@ -1111,7 +1111,7 @@ fi
#-----------------------------------------------------------------------------#
#
-# Check whether we can set the FP roudning mode
+# Check whether we can set the FP rounding mode
#
MERCURY_CHECK_FOR_FENV_FUNC([fesetround], [$MATH_LIB])
@@ -4379,6 +4379,15 @@ MERCURY_CHECK_FOR_IEEE_FUNC(isinff)
#-----------------------------------------------------------------------------#
#
+# Check whether POSIX semaphores requires -lrt
+#
+
+AC_CHECK_LIB(rt, sem_init, SEMAPHORE_LIBRARY=-lrt, SEMAPHORE_LIBRARY="")
+
+AC_SUBST(SEMAPHORE_LIBRARY)
+
+#-----------------------------------------------------------------------------#
+#
# Check whether sockets work (we need them for the external debugger)
#
diff --git a/runtime/mercury_conf.h.in b/runtime/mercury_conf.h.in
index f987a07..987e84c 100644
--- a/runtime/mercury_conf.h.in
+++ b/runtime/mercury_conf.h.in
@@ -134,6 +134,7 @@
** MR_HAVE_TIME_H we have <time.h>
** MR_HAVE_SPAWN_H we have <spawn.h>
** MR_HAVE_FENV_H we have <fenv.h>
+** MR_HAVE_SYS_MMAN_H we have <sys/mman.h>
*/
#undef MR_HAVE_SYS_SIGINFO_H
#undef MR_HAVE_SYS_SIGNAL_H
@@ -161,6 +162,7 @@
#undef MR_HAVE_TIME_H
#undef MR_HAVE_SPAWN_H
#undef MR_HAVE_FENV_H
+#undef MR_HAVE_SYS_MMAN_H
/*
** MR_HAVE_POSIX_TIMES is defined if we have the POSIX
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to: mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions: mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------
More information about the reviews
mailing list