[m-rev.] for review: Add future datatype for concurrent and parallel programming

Peter Wang novalazy at gmail.com
Tue Oct 7 14:36:45 AEDT 2014


On Mon,  6 Oct 2014 16:30:39 +1100, Paul Bone <paul at bone.id.au> wrote:
> Branches: master.
> 
> For review by anyone.
> 
> ---
> 
> Add future datatype for concurrent and parallel programming
> 
> library/library.m:
> library/thread.future.m:
> library/thread.m:
>     Add new future standard library module.
> 
> NEWS:
>     Announce the new addition.
> 
> library/thread.semaphore.m:
>     Add an impure interface to thread.semaphore.m.  Semaphores are used to
>     implement our other concurrency primatives and an impure interface can
>     often be useful to implement things such as futures, which don't require
>     IO state threading.
> 
> benchmarks/progs/mandelbrot/mandelbrot.m:
> benchmarks/progs/mandelbrot/bench.sh:
>     Add future example to mandelbrot benchmark.

> diff --git a/library/thread.future.m b/library/thread.future.m
> new file mode 100644
> index 0000000..45d05dd
> --- /dev/null
> +++ b/library/thread.future.m
> @@ -0,0 +1,238 @@
> +%-----------------------------------------------------------------------------%
> +% vim: ft=mercury ts=4 sw=4 et
> +%-----------------------------------------------------------------------------%
> +% Copyright (C) 2014 The Mercury Team.
> +% This file may only be copied under the terms of the GNU Library General
> +% Public License - see the file COPYING.LIB in the Mercury distribution.
> +%-----------------------------------------------------------------------------%
> +%
> +% File: thread.future.m.
> +% Authors: pbone.
> +% Stability: low.
> +%
> +% This module defines a future datatype for parallel and concurrent
> +% programming.
> +%
> +% A future represents a value that may or may-not exist yet.  There are two

may not

> +% styles of futures.  A future can be set exactly once, but can be read a
> +% number of times, this allows the implementor to use a more efficient
> +% algorithm than for mvars.

Two sentences.

> +%
> +% There are two ways to use futures.  The first is to create a future,
> +% and supply it's value as separate steps.  This is the most flexible way
> +% but requires use of the IO state:

its

> +%
> +%  First:
> +%    future(Fut, !IO),
> +%
> +%  Then in a seperate thread:
> +%    signal(Fut, Value0, !IO),

separate

> +%
> +%  Finally, in the original thread:
> +%    wait(Fut, Value, !IO),
> +%
> +% This is flexible because a thread can do more than provide a single future
> +% value, it can provide many future values or use any other concurrency
> +% feature such as mvars or channels, or do any IO operation.

Two sentences.

> +%
> +% The alternative is to create the future and supply a function which when
> +% evaluated will produce the value.  This is pure (and similar to a lazy
> +% value) and therefore does not require the IO state.  The spawning of the
> +% thread is done on behalf of the caller.
> +%
> +%  Just do:
> +%    Future = future(SomeFunction),
> +%    ... do something in the meantime ...
> +%    Value = wait(Future).
> +%
> +%-----------------------------------------------------------------------------%
> +%-----------------------------------------------------------------------------%
> +
> +:- module thread.future.
> +:- interface.
> +
> +    % future/1 represents a value that is evaluated in a separate thread
> +    % (using spawn/3).
> +    %
> +:- type future(T).
> +

I think the description should be more generic and not mention threads.
Perhaps

    future/1 represents a value that may yet to be computed.

> +%-----------------------------------------------------------------------------%
> +
> +    % Create a new empty future.
> +    %
> +:- pred init(future(T)::uo, io::di, io::uo) is det.
> +
> +    % Provide a value for the future and signal any waiting threads.  Any
> +    % further calls to wait will return immediatly.
> +    %
> +:- pred signal(future(T)::in, T::in, io::di, io::uo) is det.

immediately

What happens if it is signalled multiple times?  Probably throw an
exception.

> +
> +    % Return the future's value, potentially blocking until the future is
> +    % signaled.
> +    %
> +:- pred wait(future(T)::in, T::out, io::di, io::uo) is det.
> +
> +%-----------------------------------------------------------------------------%
> +
> +    % Create a future which has the value that the closure, when evaluated,
> +    % will produce.
> +    %
> +:- func future((func) = T) = future(T).

The implicit spawn should at least be mentioned.

What happens if the function throws an exception?  I guess the user is
responsible for catching exceptions and returning them.

It might be worth differentiating the two types of futures by a type
parameter.

> +    % Return the value of the future, potentially blocking until the value
> +    % is available.
> +    %
> +    % This is pure and does not require IO because if it terminates it
> +    % always returns the same value.
> +    %
> +:- func wait(future(T)) = T.
> +
> +%-----------------------------------------------------------------------------%
> +%-----------------------------------------------------------------------------%
> +:- implementation.
> +
> +:- import_module thread.semaphore.
> +:- import_module mutvar.
> +
> +:- type future(T)
> +    --->    future(
> +                f_ready         :: mutvar(ready),
> +                    % f_ready can be used to optimistically avoid locking.

> +
> +                f_wait          :: semaphore,
> +                f_value         :: mutvar(T)
> +            ).
> +
> +:- type ready
> +    --->    ready
> +    ;       not_ready.
> +
> +:- pragma promise_pure(init/3).
> +init(Future, !IO) :-
> +    impure init(Future).

Add newlines after the pragmas.

> +
> +:- impure pred init(future(T)::uo) is det.
> +
> +init(future(Ready, Wait, Value)) :-
> +    impure new_mutvar(not_ready, Ready),
> +    impure init(Wait),
> +    impure new_mutvar0(Value).
> +
> +%-----------------------------------------------------------------------------%
> +
> +:- pragma promise_pure(signal/4).
> +signal(Future, Value, !IO) :-
> +    impure signal(Future, Value).
> +
> +:- impure pred signal(future(T)::in, T::in) is det.
> +
> +signal(future(Ready, Wait, MValue), Value) :-
> +    impure set_mutvar(MValue, Value),
> +    impure set_mutvar(Ready, ready),
> +    % TODO: Implement signal_all.
> +    impure signal(Wait).
> +
> +%-----------------------------------------------------------------------------%
> +
> +wait(Future, Value, !IO) :-
> +    wait(Future, Value).
> +
> +    % Wait is pure because it always returns the same value for the same
> +    % future (if it terminates).
> +    %
> +:- pred wait(future(T)::in, T::out) is det.
> +:- pragma promise_pure(wait/2).
> +
> +wait(Future, Value) :-
> +    Future = future(MReady, Wait, MValue),
> +    impure get_mutvar(MReady, Ready),
> +    (
> +        Ready = ready
> +        % No wait necessary
> +    ;
> +        Ready = not_ready,
> +        % We need to wait, this will probably block.
> +        impure wait(Wait),
> +        % Signal the semaphore to release the next waiting thread.
> +        impure signal(Wait)
> +    ),
> +    impure get_mutvar(MValue, Value).

Is it not possible that you see Read = ready but the old (uninitialised)
value of MValue?  I expect futures will usually be read exactly once so
the optimisation seems unnecessary.

> +%-----------------------------------------------------------------------------%
> +%-----------------------------------------------------------------------------%
> +
> +:- pragma promise_pure(future/1).
> +future(Func) = Future :-
> +    impure init(Future),
> +    impure spawn_impure(run_future(Future, Func)).
> +
> +:- pragma no_determinism_warning(run_future/4).
> +:- pred run_future(future(T), (func) = T, io, io).
> +:- mode run_future(in, ((func) = out) is det, di, uo) is cc_multi.
> +
> +run_future(Future, Func, !IO) :-
> +    signal(Future, apply(Func), !IO).
> +
> +wait(Future) = Value :-
> +    wait(Future, Value).
> +
> +%-----------------------------------------------------------------------------%
> +
> +:- impure pred spawn_impure(pred(io, io)).
> +:-        mode spawn_impure(pred(di, uo) is cc_multi) is det.
> +
> +spawn_impure(Task) :-
> +    some [!IO] (
> +        impure make_io(!:IO),
> +        promise_equivalent_solutions [!:IO] (
> +            spawn(Task, !IO)
> +        ),
> +        impure destroy_io(!.IO)
> +    ).

Make run_future impure, and keep the made-up IO state contained to this
section of code, something like:

    spawn_impure(Task) :-
	impure make_io(IO0),
	spawn(spawn_impure_2(Task), IO0, IO),
	impure destroy_io(IO).

    spawn_impure_2(Task, !IO) :-
	impure Task.

> +
> +:- impure pred make_io(io::uo) is det.
> +
> +:- pragma foreign_proc("C",
> +    make_io(IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "/* IO */").
> +:- pragma foreign_proc("C#",
> +    make_io(IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "// IO").
> +:- pragma foreign_proc("Java",
> +    make_io(IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "// IO").
> +
> +:- impure pred destroy_io(io::di) is det.
> +
> +:- pragma foreign_proc("C",
> +    destroy_io(IO::di),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "/* IO */").
> +:- pragma foreign_proc("C#",
> +    destroy_io(IO::di),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "// IO").
> +:- pragma foreign_proc("Java",
> +    destroy_io(IO::di),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "// IO").

You can copy the code from exception.m; we might as well use consistent
names.  _IO is preferred in any case.

> +
> +:- impure pred touch_io(io::di, io::uo) is det.
> +
> +:- pragma foreign_proc("C",
> +    touch_io(_IO0::di, _IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "").
> +:- pragma foreign_proc("C#",
> +    touch_io(_IO0::di, _IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "").
> +:- pragma foreign_proc("Java",
> +    touch_io(_IO0::di, _IO::uo),
> +    [will_not_call_mercury, thread_safe, tabled_for_io],
> +    "").
> +

Delete touch_io.

> diff --git a/library/thread.semaphore.m b/library/thread.semaphore.m
> index 46c53e2..f886121 100644
> --- a/library/thread.semaphore.m
> +++ b/library/thread.semaphore.m
> @@ -28,33 +28,41 @@
>  
>  :- type semaphore.
>  
> -    % init(Sem, !IO) creates a new semaphore `Sem' with its counter
> -    % initialized to 0.
> +    % init(Sem, Count, !IO) creates a new semaphore `Sem' with its counter
> +    % initialized to Count.
>      %
> -:- pred semaphore.init(semaphore::out, io::di, io::uo) is det.
> +:- pred init(semaphore::uo, int::in, io::di, io::uo) is det.
> +:- impure pred init(semaphore::uo, int::in) is det.

Swap the arguments?


So, we would have lazy(T), mvar(T), future(T) with three different
interfaces, and mutvars acting as unsynchronised mvars with another
interface.

Peter



More information about the reviews mailing list