[m-rev.] for review: Add future datatype for concurrent and parallel programming

Peter Wang novalazy at gmail.com
Tue Oct 7 14:36:45 AEDT 2014

On Mon,  6 Oct 2014 16:30:39 +1100, Paul Bone <paul at bone.id.au> wrote:
> Branches: master.
> For review by anyone.
> ---
> Add future datatype for concurrent and parallel programming
> library/library.m:
> library/thread.future.m:
> library/thread.m:
>     Add new future standard library module.
>     Announce the new addition.
> library/thread.semaphore.m:
>     Add an impure interface to thread.semaphore.m.  Semaphores are used to
>     implement our other concurrency primatives and an impure interface can
>     often be useful to implement things such as futures, which don't require
>     IO state threading.
> benchmarks/progs/mandelbrot/mandelbrot.m:
> benchmarks/progs/mandelbrot/bench.sh:
>     Add future example to mandelbrot benchmark.

> diff --git a/library/thread.future.m b/library/thread.future.m
> new file mode 100644
> index 0000000..45d05dd
> --- /dev/null
> +++ b/library/thread.future.m
> @@ -0,0 +1,238 @@
> +%-----------------------------------------------------------------------------%
> +% vim: ft=mercury ts=4 sw=4 et
> +%-----------------------------------------------------------------------------%
> +% Copyright (C) 2014 The Mercury Team.
> +% This file may only be copied under the terms of the GNU Library General
> +% Public License - see the file COPYING.LIB in the Mercury distribution.
> +%-----------------------------------------------------------------------------%
> +%
> +% File: thread.future.m.
> +% Authors: pbone.
> +% Stability: low.
> +%
> +% This module defines a future datatype for parallel and concurrent
> +% programming.
> +%
> +% A future represents a value that may or may-not exist yet.  There are two

may not

> +% styles of futures.  A future can be set exactly once, but can be read a
> +% number of times, this allows the implementor to use a more efficient
> +% algorithm than for mvars.

Two sentences.

> +%
> +% There are two ways to use futures.  The first is to create a future,
> +% and supply it's value as separate steps.  This is the most flexible way
> +% but requires use of the IO state:


> +%
> +%  First:
> +%    future(Fut, !IO),
> +%
> +%  Then in a seperate thread:
> +%    signal(Fut, Value0, !IO),


> +%
> +%  Finally, in the original thread:
> +%    wait(Fut, Value, !IO),
> +%
> +% This is flexible because a thread can do more than provide a single future
> +% value, it can provide many future values or use any other concurrency
> +% feature such as mvars or channels, or do any IO operation.

Two sentences.

> +%
> +% The alternative is to create the future and supply a function which when
> +% evaluated will produce the value.  This is pure (and similar to a lazy
> +% value) and therefore does not require the IO state.  The spawning of the
> +% thread is done on behalf of the caller.
> +%
> +%  Just do:
> +%    Future = future(SomeFunction),
> +%    ... do something in the meantime ...
> +%    Value = wait(Future).
> +%
> +%-----------------------------------------------------------------------------%
> +%-----------------------------------------------------------------------------%
> +
> +:- module thread.future.
> +:- interface.
> +
> +    % future/1 represents a value that is evaluated in a separate thread
> +    % (using spawn/3).
> +    %
> +:- type future(T).
> +

I think the description should be more generic and not mention threads.

    future/1 represents a value that may yet to be computed.

> +%-----------------------------------------------------------------------------%
> +
> +    % Create a new empty future.
> +    %
> +:- pred init(future(T)::uo, io::di, io::uo) is det.
> +
> +    % Provide a value for the future and signal any waiting threads.  Any
> +    % further calls to wait will return immediatly.
> +    %
> +:- pred signal(future(T)::in, T::in, io::di, io::uo) is det.


What happens if it is signalled multiple times?  Probably throw an

> +
> +    % Return the future's value, potentially blocking until the future is
> +    % signaled.
> +    %
> +:- pred wait(future(T)::in, T::out, io::di, io::uo) is det.
> +
> +%-----------------------------------------------------------------------------%
> +
> +    % Create a future which has the value that the closure, when evaluated,
> +    % will produce.
> +    %
> +:- func future((func) = T) = future(T).

The implicit spawn should at least be mentioned.

What happens if the function throws an exception?  I guess the user is
responsible for catching exceptions and returning them.

It might be worth differentiating the two types of futures by a type

> +    % Return the value of the future, potentially blocking until the value
> +    % is available.
> +    %
> +    % This is pure and does not require IO because if it terminates it
> +    % always returns the same value.
> +    %
> +:- func wait(future(T)) = T.
> +
> +%-----------------------------------------------------------------------------%
> +%-----------------------------------------------------------------------------%
> +:- implementation.
> +
> +:- import_module thread.semaphore.
> +:- import_module mutvar.
> +
> +:- type future(T)
> +    --->    future(
> +                f_ready         :: mutvar(ready),
> +                    % f_ready can be used to optimistically avoid locking.

> +
> +                f_wait          :: semaphore,
> +                f_value         :: mutvar(T)
> +            ).
> +
> +:- type ready
> +    --->    ready
> +    ;       not_ready.
> +
> +:- pragma promise_pure(init/3).
> +init(Future, !IO) :-
> +    impure init(Future).

Add newlines after the pragmas.

> +
> +:- impure pred init(future(T)::uo) is det.
> +
> +init(future(Ready, Wait, Value)) :-
> +    impure new_mutvar(not_ready, Ready),
> +    impure init(Wait),
> +    impure new_mutvar0(Value).
> +
> +%-----------------------------------------------------------------------------%
> +
> +:- pragma promise_pure(signal/4).
> +signal(Future, Value, !IO) :-
> +    impure signal(Future, Value).
> +
> +:- impure pred signal(future(T)::in, T::in) is det.
> +
> +signal(future(Ready, Wait, MValue), Value) :-
> +    impure set_mutvar(MValue, Value),
> +    impure set_mutvar(Ready, ready),
> +    % TODO: Implement signal_all.
> +    impure signal(Wait).
> +
> +%-----------------------------------------------------------------------------%
> +
> +wait(Future, Value, !IO) :-
> +    wait(Future, Value).
> +
> +    % Wait is pure because it always returns the same value for the same
> +    % future (if it terminates).
> +    %
> +:- pred wait(future(T)::in, T::out) is det.
> +:- pragma promise_pure(wait/2).
> +
> +wait(Future, Value) :-
> +    Future = future(MReady, Wait, MValue),
> +    impure get_mutvar(MReady, Ready),
> +    (
> +        Ready = ready
> +        % No wait necessary
> +    ;
> +        Ready = not_ready,
> +        % We need to wait, this will probably block.
> +        impure wait(Wait),
> +        % Signal the semaphore to release the next waiting thread.
> +        impure signal(Wait)
> +    ),
> +    impure get_mutvar(MValue, Value).

Is it not possible that you see Read = ready but the old (uninitialised)
value of MValue?  I expect futures will usually be read exactly once so
the optimisation seems unnecessary.

> +%-----------------------------------------------------------------------------%
> +%-----------------------------------------------------------------------------%
> +
> +:- pragma promise_pure(future/1).
> +future(Func) = Future :-
> +    impure init(Future),
> +    impure spawn_impure(run_future(Future, Func)).
> +
> +:- pragma no_determinism_warning(run_future/4).
> +:- pred run_future(future(T), (func) = T, io, io).
> +:- mode run_future(in, ((func) = out) is det, di, uo) is cc_multi.
> +
> +run_future(Future, Func, !IO) :-
> +    signal(Future, apply(Func), !IO).
> +
> +wait(Future) = Value :-
> +    wait(Future, Value).
> +
> +%-----------------------------------------------------------------------------%
> +
> +:- impure pred spawn_impure(pred(io, io)).
> +:-        mode spawn_impure(pred(di, uo) is cc_multi) is det.
> +
> +spawn_impure(Task) :-
> +    some [!IO] (
> +        impure make_io(!:IO),
> +        promise_equivalent_solutions [!:IO] (
> +            spawn(Task, !IO)
> +        ),
> +        impure destroy_io(!.IO)
> +    ).

Make run_future impure, and keep the made-up IO state contained to this
section of code, something like:

    spawn_impure(Task) :-
	impure make_io(IO0),
	spawn(spawn_impure_2(Task), IO0, IO),
	impure destroy_io(IO).

    spawn_impure_2(Task, !IO) :-
	impure Task.

> +
> +:- impure pred make_io(io::uo) is det.
> +
> +:- pragma foreign_proc("C",
> +    make_io(IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "/* IO */").
> +:- pragma foreign_proc("C#",
> +    make_io(IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "// IO").
> +:- pragma foreign_proc("Java",
> +    make_io(IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "// IO").
> +
> +:- impure pred destroy_io(io::di) is det.
> +
> +:- pragma foreign_proc("C",
> +    destroy_io(IO::di),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "/* IO */").
> +:- pragma foreign_proc("C#",
> +    destroy_io(IO::di),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "// IO").
> +:- pragma foreign_proc("Java",
> +    destroy_io(IO::di),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "// IO").

You can copy the code from exception.m; we might as well use consistent
names.  _IO is preferred in any case.

> +
> +:- impure pred touch_io(io::di, io::uo) is det.
> +
> +:- pragma foreign_proc("C",
> +    touch_io(_IO0::di, _IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "").
> +:- pragma foreign_proc("C#",
> +    touch_io(_IO0::di, _IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "").
> +:- pragma foreign_proc("Java",
> +    touch_io(_IO0::di, _IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "").
> +

Delete touch_io.

> diff --git a/library/thread.semaphore.m b/library/thread.semaphore.m
> index 46c53e2..f886121 100644
> --- a/library/thread.semaphore.m
> +++ b/library/thread.semaphore.m
> @@ -28,33 +28,41 @@
>  :- type semaphore.
> -    % init(Sem, !IO) creates a new semaphore `Sem' with its counter
> -    % initialized to 0.
> +    % init(Sem, Count, !IO) creates a new semaphore `Sem' with its counter
> +    % initialized to Count.
>      %
> -:- pred semaphore.init(semaphore::out, io::di, io::uo) is det.
> +:- pred init(semaphore::uo, int::in, io::di, io::uo) is det.
> +:- impure pred init(semaphore::uo, int::in) is det.

Swap the arguments?

So, we would have lazy(T), mvar(T), future(T) with three different
interfaces, and mutvars acting as unsynchronised mvars with another


More information about the reviews mailing list