[m-rev.] for review: Add closeable channels to the standard library.
Peter Wang
novalazy at gmail.com
Mon Feb 17 14:37:18 AEDT 2020
library/thread.closeable_channel.m
Add the new module. It omits two operations from thread.channel.m:
- untake, because it is deprecated
- duplicate, because I'm not sure what it's useful for in practice
library/MODULES_DOC:
library/library.m
library/thread.m:
NEWS:
Mention the new module.
diff --git a/NEWS b/NEWS
index 412df8d12..41869d8a6 100644
--- a/NEWS
+++ b/NEWS
@@ -15,6 +15,13 @@ Changes to the Mercury standard library
kv_lists are not standard lists, and thus cannot be manipulated
using the functions and predicates of the `list` module.
+### New module: `thread.closeable_channel` module
+
+* This module implements closeable unbounded channels, similar to the unbounded
+ channels provided by the `thread.channel` module, but with the addition of a
+ close operation. Once a channel is closed, no more items can be added to it,
+ and reading from a closed channel will not block indefinitely.
+
### Changes to the `assoc_list` module
* The following predicates has been added:
diff --git a/library/MODULES_DOC b/library/MODULES_DOC
index 86ca69b2e..ef9ec250b 100644
--- a/library/MODULES_DOC
+++ b/library/MODULES_DOC
@@ -88,6 +88,7 @@ term_io.m
term_to_xml.m
thread.barrier.m
thread.channel.m
+thread.closeable_channel.m
thread.future.m
thread.m
thread.mvar.m
diff --git a/library/library.m b/library/library.m
index 23606f2dd..de0ca2922 100644
--- a/library/library.m
+++ b/library/library.m
@@ -354,6 +354,7 @@ mercury_std_library_module("time").
mercury_std_library_module("thread").
mercury_std_library_module("thread.barrier").
mercury_std_library_module("thread.channel").
+mercury_std_library_module("thread.closeable_channel").
mercury_std_library_module("thread.future").
mercury_std_library_module("thread.mvar").
mercury_std_library_module("thread.semaphore").
diff --git a/library/thread.closeable_channel.m b/library/thread.closeable_channel.m
new file mode 100644
index 000000000..32b906751
--- /dev/null
+++ b/library/thread.closeable_channel.m
@@ -0,0 +1,185 @@
+%---------------------------------------------------------------------------%
+% vim: ft=mercury ts=4 sw=4 et
+%---------------------------------------------------------------------------%
+% Copyright (C) 2020 The Mercury team.
+% This file is distributed under the terms specified in COPYING.LIB.
+%---------------------------------------------------------------------------%
+%
+% File: thread.closeable_channel.m.
+% Main author: wangp.
+% Stability: low.
+%
+% Unbounded closeable channels.
+%
+%---------------------------------------------------------------------------%
+%---------------------------------------------------------------------------%
+
+:- module thread.closeable_channel.
+:- interface.
+
+:- import_module bool.
+:- import_module io.
+
+%---------------------------------------------------------------------------%
+
+:- type closeable_channel(T).
+
+ % Initialise a channel.
+ %
+:- pred init(closeable_channel(T)::out, io::di, io::uo) is det.
+
+ % Put an item at the end of the channel.
+ % Returns `yes' if successful, or `no' if the channel is closed.
+ %
+:- pred put(closeable_channel(T)::in, T::in, bool::out, io::di, io::uo)
+ is det.
+
+ % Close a channel. Once a channel is closed, no more items can be added
+ % to it. Closing a channel that is already closed has no effect.
+ %
+:- pred close(closeable_channel(T)::in, io::di, io::uo) is det.
+
+ % Return the state of a channel at a point in time. If the return value
+ % is `yes' then the channel is closed, and will remain closed.
+ % If the return value is `no' then the channel was not yet closed,
+ % but by the time you inspect the value the channel could be closed by
+ % another thread.
+ %
+ % We (the Mercury developers) are unsure if this predicate will be useful
+ % in practice. If you do find a use for it, please let us know.
+ %
+:- pred is_closed(closeable_channel(T)::in, bool::out, io::di, io::uo) is det.
+
+:- type take_result(T)
+ ---> ok(T)
+ ; closed.
+
+ % Take an item from the start of the channel, blocking until an item is
+ % available or until the channel is closed. Returns `ok(Item)' if `Item'
+ % was taken, or `closed' if the channel is closed.
+ %
+:- pred take(closeable_channel(T)::in, take_result(T)::out, io::di, io::uo)
+ is det.
+
+:- type try_take_result(T)
+ ---> ok(T)
+ ; closed
+ ; empty.
+
+ % Take an item from the start of the channel, but do not block.
+ % Returns `ok(Item)' if `Item' was taken from the channel,
+ % `empty' if the channel is empty, or `closed' if the channel is closed.
+ %
+:- pred try_take(closeable_channel(T)::in, try_take_result(T)::out,
+ io::di, io::uo) is det.
+
+%---------------------------------------------------------------------------%
+%---------------------------------------------------------------------------%
+
+:- implementation.
+
+:- import_module maybe.
+:- import_module require.
+:- import_module thread.mvar.
+
+%---------------------------------------------------------------------------%
+
+ % Invariants:
+ % - If the channel is open, the hole at the write end is empty.
+ % - If the channel is closed, the hole at the write end is full and
+ % contains `closed'.
+ %
+:- type closeable_channel(T)
+ ---> channel(
+ mvar(stream(T)), % Read end.
+ mvar(stream(T)) % Write end.
+ ).
+
+:- type stream(T) == mvar(item(T)).
+
+:- type item(T)
+ ---> item(
+ T, % The current item.
+ stream(T) % The rest of the stream.
+ )
+ ; closed. % End of the stream.
+
+init(channel(Read, Write), !IO) :-
+ mvar.init(Read, !IO),
+ mvar.init(Write, !IO),
+ mvar.init(Hole, !IO),
+ mvar.put(Read, Hole, !IO),
+ mvar.put(Write, Hole, !IO).
+
+put(channel(_Read, Write), Val, Success, !IO) :-
+ mvar.init(NewHole, !IO),
+ mvar.take(Write, OldHole, !IO),
+ mvar.try_put(OldHole, item(Val, NewHole), Success, !IO),
+ (
+ Success = yes,
+ % The channel was open, and remains open.
+ mvar.put(Write, NewHole, !IO)
+ ;
+ Success = no,
+ % The channel was closed, and remains closed.
+ mvar.put(Write, OldHole, !IO)
+ ).
+
+close(channel(_Read, Write), !IO) :-
+ mvar.take(Write, Hole, !IO),
+ mvar.try_put(Hole, closed, _Success, !IO),
+ mvar.put(Write, Hole, !IO).
+
+is_closed(channel(_Read, Write), IsClosed, !IO) :-
+ mvar.take(Write, Hole, !IO),
+ mvar.try_read(Hole, HoleFull, !IO),
+ (
+ HoleFull = yes(ItemOrClosed),
+ (
+ ItemOrClosed = closed,
+ IsClosed = yes
+ ;
+ ItemOrClosed = item(_, _),
+ unexpected($pred, "open channel has full hole at write end")
+ )
+ ;
+ HoleFull = no,
+ IsClosed = no
+ ),
+ mvar.put(Write, Hole, !IO).
+
+take(channel(Read, _Write), Res, !IO) :-
+ mvar.take(Read, Head, !IO),
+ mvar.read(Head, ItemOrClosed, !IO),
+ (
+ ItemOrClosed = item(Val, NewHead),
+ Res = ok(Val)
+ ;
+ ItemOrClosed = closed,
+ Res = closed,
+ NewHead = Head
+ ),
+ mvar.put(Read, NewHead, !IO).
+
+try_take(channel(Read, _Write), Res, !IO) :-
+ mvar.take(Read, Head, !IO),
+ mvar.try_read(Head, TryRead, !IO),
+ (
+ TryRead = yes(ItemOrClosed),
+ (
+ ItemOrClosed = item(Val, NewHead),
+ Res = ok(Val)
+ ;
+ ItemOrClosed = closed,
+ Res = closed,
+ NewHead = Head
+ )
+ ;
+ TryRead = no,
+ Res = empty,
+ NewHead = Head
+ ),
+ mvar.put(Read, NewHead, !IO).
+
+%---------------------------------------------------------------------------%
+%---------------------------------------------------------------------------%
diff --git a/library/thread.m b/library/thread.m
index f1ec6157c..b03935fa5 100644
--- a/library/thread.m
+++ b/library/thread.m
@@ -28,6 +28,7 @@
:- include_module barrier.
:- include_module channel.
+:- include_module closeable_channel.
:- include_module future.
:- include_module mvar.
:- include_module semaphore.
--
2.25.0
More information about the reviews
mailing list