[m-rev.] for review: Add closeable channels to the standard library.

Peter Wang novalazy at gmail.com
Mon Feb 17 14:37:18 AEDT 2020


library/thread.closeable_channel.m
    Add the new module. It omits two operations from thread.channel.m:
    - untake, because it is deprecated
    - duplicate, because I'm not sure what it's useful for in practice

library/MODULES_DOC:
library/library.m
library/thread.m:
NEWS:
    Mention the new module.

diff --git a/NEWS b/NEWS
index 412df8d12..41869d8a6 100644
--- a/NEWS
+++ b/NEWS
@@ -15,6 +15,13 @@ Changes to the Mercury standard library
   kv_lists are not standard lists, and thus cannot be manipulated
   using the functions and predicates of the `list` module.
 
+### New module: `thread.closeable_channel` module
+
+* This module implements closeable unbounded channels, similar to the unbounded
+  channels provided by the `thread.channel` module, but with the addition of a
+  close operation. Once a channel is closed, no more items can be added to it,
+  and reading from a closed channel will not block indefinitely.
+
 ### Changes to the `assoc_list` module
 
 * The following predicates has been added:
diff --git a/library/MODULES_DOC b/library/MODULES_DOC
index 86ca69b2e..ef9ec250b 100644
--- a/library/MODULES_DOC
+++ b/library/MODULES_DOC
@@ -88,6 +88,7 @@ term_io.m
 term_to_xml.m
 thread.barrier.m
 thread.channel.m
+thread.closeable_channel.m
 thread.future.m
 thread.m
 thread.mvar.m
diff --git a/library/library.m b/library/library.m
index 23606f2dd..de0ca2922 100644
--- a/library/library.m
+++ b/library/library.m
@@ -354,6 +354,7 @@ mercury_std_library_module("time").
 mercury_std_library_module("thread").
 mercury_std_library_module("thread.barrier").
 mercury_std_library_module("thread.channel").
+mercury_std_library_module("thread.closeable_channel").
 mercury_std_library_module("thread.future").
 mercury_std_library_module("thread.mvar").
 mercury_std_library_module("thread.semaphore").
diff --git a/library/thread.closeable_channel.m b/library/thread.closeable_channel.m
new file mode 100644
index 000000000..32b906751
--- /dev/null
+++ b/library/thread.closeable_channel.m
@@ -0,0 +1,185 @@
+%---------------------------------------------------------------------------%
+% vim: ft=mercury ts=4 sw=4 et
+%---------------------------------------------------------------------------%
+% Copyright (C) 2020 The Mercury team.
+% This file is distributed under the terms specified in COPYING.LIB.
+%---------------------------------------------------------------------------%
+%
+% File: thread.closeable_channel.m.
+% Main author: wangp.
+% Stability: low.
+%
+% Unbounded closeable channels.
+%
+%---------------------------------------------------------------------------%
+%---------------------------------------------------------------------------%
+
+:- module thread.closeable_channel.
+:- interface.
+
+:- import_module bool.
+:- import_module io.
+
+%---------------------------------------------------------------------------%
+
+:- type closeable_channel(T).
+
+    % Initialise a channel.
+    %
+:- pred init(closeable_channel(T)::out, io::di, io::uo) is det.
+
+    % Put an item at the end of the channel.
+    % Returns `yes' if successful, or `no' if the channel is closed.
+    %
+:- pred put(closeable_channel(T)::in, T::in, bool::out, io::di, io::uo)
+    is det.
+
+    % Close a channel. Once a channel is closed, no more items can be added
+    % to it. Closing a channel that is already closed has no effect.
+    %
+:- pred close(closeable_channel(T)::in, io::di, io::uo) is det.
+
+    % Return the state of a channel at a point in time. If the return value
+    % is `yes' then the channel is closed, and will remain closed.
+    % If the return value is `no' then the channel was not yet closed,
+    % but by the time you inspect the value the channel could be closed by
+    % another thread.
+    %
+    % We (the Mercury developers) are unsure if this predicate will be useful
+    % in practice. If you do find a use for it, please let us know.
+    %
+:- pred is_closed(closeable_channel(T)::in, bool::out, io::di, io::uo) is det.
+
+:- type take_result(T)
+    --->    ok(T)
+    ;       closed.
+
+    % Take an item from the start of the channel, blocking until an item is
+    % available or until the channel is closed. Returns `ok(Item)' if `Item'
+    % was taken, or `closed' if the channel is closed.
+    %
+:- pred take(closeable_channel(T)::in, take_result(T)::out, io::di, io::uo)
+    is det.
+
+:- type try_take_result(T)
+    --->    ok(T)
+    ;       closed
+    ;       empty.
+
+    % Take an item from the start of the channel, but do not block.
+    % Returns `ok(Item)' if `Item' was taken from the channel,
+    % `empty' if the channel is empty, or `closed' if the channel is closed.
+    %
+:- pred try_take(closeable_channel(T)::in, try_take_result(T)::out,
+    io::di, io::uo) is det.
+
+%---------------------------------------------------------------------------%
+%---------------------------------------------------------------------------%
+
+:- implementation.
+
+:- import_module maybe.
+:- import_module require.
+:- import_module thread.mvar.
+
+%---------------------------------------------------------------------------%
+
+    % Invariants:
+    % - If the channel is open, the hole at the write end is empty.
+    % - If the channel is closed, the hole at the write end is full and
+    %   contains `closed'.
+    %
+:- type closeable_channel(T)
+    --->    channel(
+                mvar(stream(T)),    % Read end.
+                mvar(stream(T))     % Write end.
+            ).
+
+:- type stream(T) == mvar(item(T)).
+
+:- type item(T)
+    --->    item(
+                T,          % The current item.
+                stream(T)   % The rest of the stream.
+            )
+    ;       closed.         % End of the stream.
+
+init(channel(Read, Write), !IO) :-
+    mvar.init(Read, !IO),
+    mvar.init(Write, !IO),
+    mvar.init(Hole, !IO),
+    mvar.put(Read, Hole, !IO),
+    mvar.put(Write, Hole, !IO).
+
+put(channel(_Read, Write), Val, Success, !IO) :-
+    mvar.init(NewHole, !IO),
+    mvar.take(Write, OldHole, !IO),
+    mvar.try_put(OldHole, item(Val, NewHole), Success, !IO),
+    (
+        Success = yes,
+        % The channel was open, and remains open.
+        mvar.put(Write, NewHole, !IO)
+    ;
+        Success = no,
+        % The channel was closed, and remains closed.
+        mvar.put(Write, OldHole, !IO)
+    ).
+
+close(channel(_Read, Write), !IO) :-
+    mvar.take(Write, Hole, !IO),
+    mvar.try_put(Hole, closed, _Success, !IO),
+    mvar.put(Write, Hole, !IO).
+
+is_closed(channel(_Read, Write), IsClosed, !IO) :-
+    mvar.take(Write, Hole, !IO),
+    mvar.try_read(Hole, HoleFull, !IO),
+    (
+        HoleFull = yes(ItemOrClosed),
+        (
+            ItemOrClosed = closed,
+            IsClosed = yes
+        ;
+            ItemOrClosed = item(_, _),
+            unexpected($pred, "open channel has full hole at write end")
+        )
+    ;
+        HoleFull = no,
+        IsClosed = no
+    ),
+    mvar.put(Write, Hole, !IO).
+
+take(channel(Read, _Write), Res, !IO) :-
+    mvar.take(Read, Head, !IO),
+    mvar.read(Head, ItemOrClosed, !IO),
+    (
+        ItemOrClosed = item(Val, NewHead),
+        Res = ok(Val)
+    ;
+        ItemOrClosed = closed,
+        Res = closed,
+        NewHead = Head
+    ),
+    mvar.put(Read, NewHead, !IO).
+
+try_take(channel(Read, _Write), Res, !IO) :-
+    mvar.take(Read, Head, !IO),
+    mvar.try_read(Head, TryRead, !IO),
+    (
+        TryRead = yes(ItemOrClosed),
+        (
+            ItemOrClosed = item(Val, NewHead),
+            Res = ok(Val)
+        ;
+            ItemOrClosed = closed,
+            Res = closed,
+            NewHead = Head
+        )
+    ;
+        TryRead = no,
+        Res = empty,
+        NewHead = Head
+    ),
+    mvar.put(Read, NewHead, !IO).
+
+%---------------------------------------------------------------------------%
+%---------------------------------------------------------------------------%
diff --git a/library/thread.m b/library/thread.m
index f1ec6157c..b03935fa5 100644
--- a/library/thread.m
+++ b/library/thread.m
@@ -28,6 +28,7 @@
 
 :- include_module barrier.
 :- include_module channel.
+:- include_module closeable_channel.
 :- include_module future.
 :- include_module mvar.
 :- include_module semaphore.
-- 
2.25.0



More information about the reviews mailing list