[m-rev.] diff: fix midi monitor concurrency example

Julien Fischer juliensf at csse.unimelb.edu.au
Tue Nov 9 01:57:09 AEDT 2010


Branches: main

Fix the midi monitor concurrency example in the extras distribution.
It wasn't compiling due to module name conflicts with the standard library.

Generalise the example so that it works for non-C backends that support
concurrency.

extras/concurrency/stream.m:
extras/concurrency/concurrent_stream.m:
 	Rename the module stream to concurrent_stream so that it
 	doesn't conflict with the standard library module named stream.

 	Use I/O mutvars instead of the global module from this directory.
 	This allows this program to be compiled in non-C grades that
 	support concurrency.

 	Require a compilation model that supports concurreny.

extras/concurrency/midi.m:
 	s/stream/concurrent_stream/

extras/concurrency/midimon.m:
 	Use the thread module from the standard library instead of the
 	old spawn module from this directory.

 	Return a non-zero exit status on an error.

 	Use state variables instead of a DCG for threading the I/O
 	state in a spot.

extras/concurrency/global.m:
 	Delete this module.

Julien.

Index: concurrent_stream.m
===================================================================
RCS file: concurrent_stream.m
diff -N concurrent_stream.m
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ concurrent_stream.m	8 Nov 2010 14:53:44 -0000
@@ -0,0 +1,133 @@
+%-----------------------------------------------------------------------------%
+% vim: ft=mercury ts=4 sw=4 et
+%-----------------------------------------------------------------------------%
+% Copyright (C) 2000, 2006 The University of Melbourne.
+% This file may only be copied under the terms of the GNU Library General
+% Public License - see the file COPYING.LIB in the Mercury distribution.
+%-----------------------------------------------------------------------------%
+%
+% File: concurrent_stream.m.
+% Main author: conway.
+% Stability: medium.
+%
+% This module implements a simple concurrent data-stream.
+%
+%-----------------------------------------------------------------------------%
+%-----------------------------------------------------------------------------%
+
+:- module concurrent_stream.
+:- interface.
+
+:- import_module io.
+
+%-----------------------------------------------------------------------------%
+
+:- type concurrent_stream(T).
+
+:- type concurrent_stream.result(T)
+    --->    end
+    ;       error(string)
+    ;       ok(T).
+
+    % new(Stream, !IO) creates a new data concurrent_stream `Stream'.
+    %
+:- pred new(concurrent_stream(T)::out, io::di, io::uo) is det.
+
+    % get(Stream, Result, !IO) blocks until a message appears
+    % on the data stream `Stream'. When a message arrives, `Result' is
+    % bound to the value of the message.
+    %
+:- pred get(concurrent_stream(T)::in, concurrent_stream.result(T)::out,
+    io::di, io::uo) is det.
+
+    % put(Stream, Thing, !IO) adds `Thing' to the end of the stream
+    % `Stream', waking a call to get/4 if necessary.
+    %
+:- pred put(concurrent_stream(T)::in, T::in, io::di, io::uo) is det.
+
+    % end(Stream, !IO) puts an end-of-stream marker on the stream
+    % `Stream', waking a call to get/4 if necessary.
+    %
+:- pred end(concurrent_stream(T)::in, io::di, io::uo) is det.
+
+    % error(Stream, !IO) puts an error message on the stream
+    % `Stream', waking a call to get/4 if necessary.
+    %
+:- pred error(concurrent_stream(T)::in, string::in, io::di, io::uo) is det.
+
+%-----------------------------------------------------------------------------%
+%-----------------------------------------------------------------------------%
+
+:- implementation.
+
+:- pragma require_feature_set([concurrency]).
+
+:- import_module thread.
+:- import_module thread.semaphore.
+
+:- import_module queue.
+:- import_module require.
+:- import_module store.
+
+%-----------------------------------------------------------------------------%
+
+:- type concurrent_stream(T)
+    --->    concurrent_stream(
+                semaphore,
+                io_mutvar(concurrent_stream0(T)),
+                semaphore
+            ).
+
+:- type concurrent_stream0(T) == queue(concurrent_stream.result(T)).
+
+new(Stream, !IO) :-
+    queue.init(Queue),
+    store.new_mutvar(Queue, QueueRef, !IO),
+    semaphore.new(Lock, !IO),
+    semaphore.signal(Lock, !IO),
+    semaphore.new(Semaphore, !IO),
+    Stream = concurrent_stream(Lock, QueueRef, Semaphore).
+
+put(Stream, Thing, !IO) :-
+    Stream = concurrent_stream(Lock, QueueRef, Semaphore),
+    wait(Lock, !IO),
+    store.get_mutvar(QueueRef, Queue0, !IO),
+    queue.put(Queue0, ok(Thing), Queue),
+    store.set_mutvar(QueueRef, Queue, !IO),
+    signal(Lock, !IO),
+    signal(Semaphore, !IO).
+
+end(Stream, !IO) :-
+    Stream = concurrent_stream(Lock, QueueRef, Semaphore),
+    semaphore.wait(Lock, !IO),
+    store.get_mutvar(QueueRef, Queue0, !IO),
+    queue.put(Queue0, end, Queue),
+    store.set_mutvar(QueueRef, Queue, !IO),
+    semaphore.signal(Lock, !IO),
+    semaphore.signal(Semaphore, !IO).
+
+error(Stream, Msg, !IO) :-
+    Stream = concurrent_stream(Lock, QueueRef, Semaphore),
+    semaphore.wait(Lock, !IO),
+    store.get_mutvar(QueueRef, Queue0, !IO),
+    queue.put(Queue0, error(Msg), Queue),
+    store.set_mutvar(QueueRef, Queue, !IO),
+    semaphore.signal(Lock, !IO),
+    semaphore.signal(Semaphore, !IO).
+
+get(Stream, Thing, !IO) :-
+    Stream = concurrent_stream(Lock, QueueRef, Semaphore),
+    semaphore.wait(Semaphore, !IO),
+    semaphore.wait(Lock, !IO),
+    store.get_mutvar(QueueRef, Queue0, !IO),
+    ( queue.get(Queue0, Thing0, Queue) ->
+        Thing = Thing0,
+        store.set_mutvar(QueueRef, Queue, !IO)
+    ;
+        error("concurrent_stream.get/4: queue and semaphore out of sync")
+    ),
+    semaphore.signal(Lock, !IO).
+
+%-----------------------------------------------------------------------------%
+:- end_module concurrent_stream.
+%-----------------------------------------------------------------------------%
Index: global.m
===================================================================
RCS file: global.m
diff -N global.m
--- global.m	20 Apr 2006 07:32:05 -0000	1.4
+++ /dev/null	1 Jan 1970 00:00:00 -0000
@@ -1,111 +0,0 @@
-%-----------------------------------------------------------------------------%
-% vim: ft=mercury ts=4 sw=4 et
-%-----------------------------------------------------------------------------%
-% Copyright (C) 2000, 2006 The University of Melbourne.
-% This file may only be copied under the terms of the GNU Library General
-% Public License - see the file COPYING.LIB in the Mercury distribution.
-%-----------------------------------------------------------------------------%
-%
-% File: global.m.
-% Main author: conway.
-% Stability: medium.
-%
-% This module provides a simple mechanism for storing values associated
-% with keys in the global io.state. It is quite like library/store.m,
-% except that it implicitly stores things in the io.state rather than in a
-% separate store.
-%
-%-----------------------------------------------------------------------------%
-%-----------------------------------------------------------------------------%
-
-:- module global.
-:- interface.
-
-:- import_module io.
-
-%-----------------------------------------------------------------------------%
-
-:- type global(T).
-
-    % new(Thing, Key, !IO) binds `Key' to an abstract key referring
-    % to the object `Thing'.
-    %
-:- pred global.new(T::in, global(T)::out, io::di, io::uo) is det.
-
-    % get(Key, Thing, !IO) binds `Thing' to the object currently
-    % associated with `Key'.
-    %
-:- pred global.get(global(T)::in, T::out, io::di, io::uo) is det.
-
-    % set(Key, Thing, !IO) changes the value associated with `Key'
-    % to be `Thing'.
-    %
-:- pred global.set(global(T)::in, T::in, io::di, io::uo) is det.
-
-%-----------------------------------------------------------------------------%
-%-----------------------------------------------------------------------------%
-
-:- implementation.
-
-:- pragma foreign_type(c,  global(T), "MR_Word").
-:- pragma foreign_type(il, global(T), "class [global__csharp_code]ME_Global").
-
-:- pragma foreign_decl("C#", "
-    public class ME_Global {
-        public object val;
-    }
-").
-
-%-----------------------------------------------------------------------------%
-
-:- pragma foreign_proc("C",
-    new(Thing::in, Glob::out, IO0::di, IO::uo),
-    [promise_pure, will_not_call_mercury],
-"
-    MR_Word tmp;
-    MR_incr_hp(tmp, 1);
-    *((MR_Word *)tmp) = Thing;
-    Glob = tmp;
-    IO = IO0;
-").
-
-:- pragma foreign_proc("C#",
-    new(Thing::in, Glob::out, _IO0::di, _IO::uo),
-    [promise_pure, will_not_call_mercury, thread_safe],
-"
-    Glob = new ME_Global();
-    Glob.val = Thing;
-").
-
-:- pragma foreign_proc("C",
-    get(Glob::in, Thing::out, IO0::di, IO::uo),
-    [promise_pure, will_not_call_mercury],
-"
-    Thing = * (MR_Word *) Glob;
-    IO = IO0;
-").
-
-:- pragma foreign_proc("C#",
-    get(Glob::in, Thing::out, _IO0::di, _IO::uo),
-    [promise_pure, will_not_call_mercury, thread_safe],
-"
-    Thing = Glob.val;
-").
-
-:- pragma foreign_proc("C",
-    set(Glob::in, Thing::in, IO0::di, IO::uo),
-    [promise_pure, will_not_call_mercury],
-"
-    * ((MR_Word *) Glob) = Thing;
-    IO = IO0;
-").
-
-:- pragma foreign_proc("C#",
-    set(Glob::in, Thing::in, _IO0::di, _IO::uo),
-    [promise_pure, will_not_call_mercury, thread_safe],
-"
-    Glob.val = Thing;
-").
-
-%-----------------------------------------------------------------------------%
-%-----------------------------------------------------------------------------%
Index: midi.m
===================================================================
RCS file: /home/mercury/mercury1/repository/mercury/extras/concurrency/midi.m,v
retrieving revision 1.2
diff -u -r1.2 midi.m
--- midi.m	20 Apr 2006 07:32:05 -0000	1.2
+++ midi.m	8 Nov 2010 14:31:05 -0000
@@ -20,7 +20,7 @@
  :- module midi.
  :- interface.

-:- import_module stream.
+:- import_module concurrent_stream.

  :- import_module io.
  :- import_module list.
@@ -80,14 +80,14 @@
      % Reads from a concurrent stream of bytes and puts its outputs
      % on to a concurrent stream of midi messages.
      %
-:- pred read_midi(stream(byte)::in, stream(message)::in, io::di, io::uo)
-    is det.
+:- pred read_midi(concurrent_stream(byte)::in, concurrent_stream(message)::in,
+    io::di, io::uo) is det.

      % Reads from a concurrent stream of messages, and puts the messages
      % on to a concurrent stream of bytes.
      %
-:- pred write_midi(stream(message)::in, stream(byte)::in, io::di, io::uo)
-    is det.
+:- pred write_midi(concurrent_stream(message)::in, concurrent_stream(byte)::in,
+    io::di, io::uo) is det.

  %-----------------------------------------------------------------------------%
  %-----------------------------------------------------------------------------%
@@ -160,8 +160,8 @@
  read_midi(Ins, Outs, !IO) :-
      byte0(none, Ins, Outs, !IO).

-:- pred byte0(status::in, stream(byte)::in, stream(message)::in,
-    io::di, io::uo) is det.
+:- pred byte0(status::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  byte0(Status, Ins, Outs, !IO) :-
      get(Ins, Res0, !IO),
@@ -177,8 +177,9 @@
          byte0a(MSN, LSN, Status, Ins, Outs, !IO)
      ).

-:- pred byte0a(hex::in, hex::in, status::in, stream(byte)::in,
-    stream(message)::in, io::di, io::uo) is det.
+:- pred byte0a(hex::in, hex::in, status::in,
+    concurrent_stream(byte)::in, concurrent_stream(message)::in,
+    io::di, io::uo) is det.

  byte0a(x0, LSN, Status, Ins, Outs, !IO) :-
      hex2byte(x0, LSN, Byte),
@@ -272,8 +273,8 @@
      put(Outs, rt(reset), !IO),
      byte0(Status, Ins, Outs, !IO).

-:- pred byte1(status::in, stream(byte)::in, stream(message)::in,
-    io::di, io::uo) is det.
+:- pred byte1(status::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  byte1(Status, Ins, Outs, !IO) :-
      get(Ins, Res0, !IO),
@@ -289,8 +290,9 @@
          byte1a(MSN, LSN, Status, Ins, Outs, !IO)
      ).

-:- pred byte1a(hex::in, hex::in, status::in, stream(byte)::in,
-    stream(message)::in, io::di, io::uo) is det.
+:- pred byte1a(hex::in, hex::in, status::in,
+    concurrent_stream(byte)::in, concurrent_stream(message)::in,
+    io::di, io::uo) is det.

  byte1a(x0, LSN, Status, Ins, Outs, !IO) :-
      hex2byte(x0, LSN, Byte),
@@ -370,8 +372,8 @@
      put(Outs, rt(reset), !IO),
      byte1(Status, Ins, Outs, !IO).

-:- pred byte1b(status::in, byte::in, stream(byte)::in, stream(message)::in,
-    io::di, io::uo) is det.
+:- pred byte1b(status::in, byte::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  byte1b(none, _Byte, Ins, Outs, !IO) :-
      byte0(none, Ins, Outs, !IO).
@@ -388,8 +390,8 @@
  byte1b(status(two(Kind), Chan), Byte1, Ins, Outs, !IO) :-
      byte2(status(two(Kind), Chan), Byte1, Ins, Outs, !IO).

-:- pred byte2(status::in, byte::in, stream(byte)::in, stream(message)::in,
-    io::di, io::uo) is det.
+:- pred byte2(status::in, byte::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  byte2(Status, Byte1, Ins, Outs, !IO) :-
      get(Ins, Res0, !IO),
@@ -405,8 +407,9 @@
          byte2a(MSN2, LSN2, Byte1, Status, Ins, Outs, !IO)
      ).

-:- pred byte2a(hex::in, hex::in, byte::in, status::in, stream(byte)::in,
-    stream(message)::in, io::di, io::uo) is det.
+:- pred byte2a(hex::in, hex::in, byte::in, status::in,
+    concurrent_stream(byte)::in, concurrent_stream(message)::in,
+    io::di, io::uo) is det.

  byte2a(x0, LSN, Byte1, Status, Ins, Outs, !IO) :-
      hex2byte(x0, LSN, Byte2),
@@ -486,8 +489,9 @@
      put(Outs, rt(reset), !IO),
      byte2(Status, Byte1, Ins, Outs, !IO).

-:- pred byte2b(status::in, byte::in, byte::in, stream(byte)::in,
-    stream(message)::in, io::di, io::uo) is det.
+:- pred byte2b(status::in, byte::in, byte::in,
+    concurrent_stream(byte)::in, concurrent_stream(message)::in,
+    io::di, io::uo) is det.

  byte2b(none, _Byte1, _Byte2, Ins, Outs, !IO) :-
      byte0(none, Ins, Outs, !IO).
@@ -539,14 +543,14 @@
      put(Outs, Msg, !IO),
      byte0(status(two(Kind), Chan), Ins, Outs, !IO).

-:- pred sysex0(status::in, stream(byte)::in, stream(message)::in,
-    io::di, io::uo) is det.
+:- pred sysex0(status::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  sysex0(Status, Ins, Outs, !IO) :-
      sysex1([], Status, Ins, Outs, !IO).

-:- pred sysex1(list(byte)::in, status::in, stream(byte)::in,
-    stream(message)::in, io::di, io::uo) is det.
+:- pred sysex1(list(byte)::in, status::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  sysex1(Bytes0, Status, Ins, Outs, !IO) :-
      get(Ins, Res0, !IO),
@@ -572,8 +576,8 @@
          )
      ).

-:- pred pos0(status::in, stream(byte)::in, stream(message)::in,
-    io::di, io::uo) is det.
+:- pred pos0(status::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  pos0(Status, Ins, Outs, !IO) :-
      get(Ins, Res0, !IO),
@@ -588,8 +592,8 @@
          pos1(Byte, Status, Ins, Outs, !IO)
      ).

-:- pred pos1(byte::in, status::in, stream(byte)::in, stream(message)::in,
-    io::di, io::uo) is det.
+:- pred pos1(byte::in, status::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  pos1(Byte1, Status, Ins, Outs, !IO) :-
      get(Ins, Res0, !IO),
@@ -606,8 +610,8 @@
          byte0(Status, Ins, Outs, !IO)
      ).

-:- pred sel0(status::in, stream(byte)::in, stream(message)::in,
-    io::di, io::uo) is det.
+:- pred sel0(status::in, concurrent_stream(byte)::in,
+    concurrent_stream(message)::in, io::di, io::uo) is det.

  sel0(Status, Ins, Outs, !IO) :-
      get(Ins, Res0, !IO),
@@ -629,8 +633,8 @@
  write_midi(Ins, Outs, !IO) :-
      write_midi(none, Ins, Outs, !IO).

-:- pred write_midi(status::in, stream(message)::in, stream(byte)::in,
-    io::di, io::uo) is det.
+:- pred write_midi(status::in, concurrent_stream(message)::in,
+    concurrent_stream(byte)::in, io::di, io::uo) is det.

  write_midi(Status, Ins, Outs, !IO) :-
      get(Ins, Res0, !IO),
@@ -645,8 +649,8 @@
          write_midi(Msg, Status, Ins, Outs, !IO)
      ).

-:- pred write_midi(message::in, status::in, stream(message)::in,
-    stream(byte)::in, io::di, io::uo) is det.
+:- pred write_midi(message::in, status::in, concurrent_stream(message)::in,
+    concurrent_stream(byte)::in, io::di, io::uo) is det.

  write_midi(off(Chan, Note, Vel), Status0, Ins, Outs, !IO) :-
      Status1 = status(two(off), Chan),
@@ -741,8 +745,9 @@
      put(Outs, 0xFF, !IO),
      write_midi(Status, Ins, Outs, !IO).

-:- pred write_one(status::in, status::in, byte::in, stream(message)::in,
-    stream(byte)::in, io::di, io::uo) is det.
+:- pred write_one(status::in, status::in, byte::in,
+    concurrent_stream(message)::in, concurrent_stream(byte)::in,
+    io::di, io::uo) is det.

  write_one(Status0, Status1, Byte1, Ins, Outs, !IO) :-
      ( Status0 = Status1 ->
@@ -763,7 +768,8 @@
      write_midi(Status, Ins, Outs, !IO).

  :- pred write_two(status::in, status::in, byte::in, byte::in,
-    stream(message)::in, stream(byte)::in, io::di, io::uo) is det.
+    concurrent_stream(message)::in,
+    concurrent_stream(byte)::in, io::di, io::uo) is det.

  write_two(Status0, Status1, Byte1, Byte2, Ins, Outs, !IO) :-
      ( Status0 = Status1 ->
@@ -849,4 +855,5 @@
  nibble2hex(0xF, xF).

  %-----------------------------------------------------------------------------%
+:- end_module midi.
  %-----------------------------------------------------------------------------%
Index: midimon.m
===================================================================
RCS file: /home/mercury/mercury1/repository/mercury/extras/concurrency/midimon.m,v
retrieving revision 1.3
diff -u -r1.3 midimon.m
--- midimon.m	20 Apr 2006 07:32:05 -0000	1.3
+++ midimon.m	8 Nov 2010 14:54:15 -0000
@@ -14,10 +14,10 @@

  :- implementation.

-:- import_module global.
+:- pragma require_feature_set([concurrency]).
+
+:- import_module concurrent_stream.
  :- import_module midi.
-:- import_module spawn.
-:- import_module stream.

  :- import_module bool.
  :- import_module char.
@@ -27,6 +27,7 @@
  :- import_module maybe.
  :- import_module require.
  :- import_module string.
+:- import_module thread.

  %----------------------------------------------------------------------------%

@@ -63,7 +64,8 @@
      ;
          MOpts = error(Msg),
          io.stderr_stream(StdErr, !IO),
-        io.format(StdErr, "%s\n", [s(Msg)], !IO)
+        io.format(StdErr, "%s\n", [s(Msg)], !IO),
+        io.set_exit_status(1, !IO)
      ).

  :- pred open_input(maybe(string)::in, bool::out, io::di, io::uo) is det.
@@ -99,7 +101,8 @@
          )
      ).

-:- pred read_input(stream(byte)::in, io::di, io::uo) is det.
+:- pred read_input(concurrent_stream(byte)::in,
+    io::di, io::uo) is det.

  read_input(Stream, !IO) :-
      io.read_byte(Res0, !IO),
@@ -116,19 +119,22 @@
          read_input(Stream, !IO)
      ).

-:- pred print_messages(stream(message)::in, io::di, io::uo) is det.
+:- pred print_messages(concurrent_stream(message)::in,
+    io::di, io::uo) is det.

-print_messages(Stream) -->
-    get(Stream, Res0),
+print_messages(Stream, !IO) :-
+    get(Stream, Res0, !IO),
      (
-        { Res0 = ok(Msg) },
-        write(Msg), write_string(".\n"),
-        print_messages(Stream)
-    ;
-        { Res0 = end }
-    ;
-        { Res0 = error(Msg) },
-        write_string(Msg), nl
+        Res0 = ok(Msg),
+        io.write(Msg, !IO),
+        io.write_string(".\n", !IO),
+        print_messages(Stream, !IO)
+    ;
+        Res0 = end 
+    ;
+        Res0 = error(Msg),
+        io.write_string(Msg, !IO),
+        io.nl(!IO)
      ).

  %-----------------------------------------------------------------------------%
@@ -168,4 +174,5 @@
      ], !IO).

  %-----------------------------------------------------------------------------%
+:- end_module midimon.
  %-----------------------------------------------------------------------------%
Index: stream.m
===================================================================
RCS file: stream.m
diff -N stream.m
--- stream.m	20 Apr 2006 07:32:06 -0000	1.2
+++ /dev/null	1 Jan 1970 00:00:00 -0000
@@ -1,129 +0,0 @@
-%-----------------------------------------------------------------------------%
-% vim: ft=mercury ts=4 sw=4 et
-%-----------------------------------------------------------------------------%
-% Copyright (C) 2000, 2006 The University of Melbourne.
-% This file may only be copied under the terms of the GNU Library General
-% Public License - see the file COPYING.LIB in the Mercury distribution.
-%-----------------------------------------------------------------------------%
-%
-% File: stream.m.
-% Main author: conway.
-% Stability: medium.
-%
-% This module implements a simple concurrent data-stream.
-%
-%-----------------------------------------------------------------------------%
-%-----------------------------------------------------------------------------%
-
-:- module stream.
-:- interface.
-
-:- import_module io.
-
-%-----------------------------------------------------------------------------%
-
-:- type stream(T).
-
-:- type stream.result(T)
-    --->    end
-    ;       error(string)
-    ;       ok(T).
-
-    % new(Stream, !IO) creates a new data stream `Stream'.
-    %
-:- pred new(stream(T)::out, io::di, io::uo) is det.
-
-    % get(Stream, Result, !IO) blocks until a message appears
-    % on the data stream `Stream'. When a message arrives, `Result' is
-    % bound to the value of the message.
-    %
-:- pred get(stream(T)::in, stream.result(T)::out, io::di, io::uo) is det.
-
-    % put(Stream, Thing, !IO) adds `Thing' to the end of the stream
-    % `Stream', waking a call to get/4 if necessary.
-    %
-:- pred put(stream(T)::in, T::in, io::di, io::uo) is det.
-
-    % end(Stream, !IO) puts an end-of-stream marker on the stream
-    % `Stream', waking a call to get/4 if necessary.
-    %
-:- pred end(stream(T)::in, io::di, io::uo) is det.
-
-    % error(Stream, !IO) puts an error message on the stream
-    % `Stream', waking a call to get/4 if necessary.
-    %
-:- pred error(stream(T)::in, string::in, io::di, io::uo) is det.
-
-%-----------------------------------------------------------------------------%
-%-----------------------------------------------------------------------------%
-
-:- implementation.
-
-:- import_module global.
-:- import_module semaphore.
-
-:- import_module queue.
-:- import_module require.
-
-%-----------------------------------------------------------------------------%
-
-:- type stream(T)
-    --->    stream(
-                semaphore,
-                global(stream0(T)),
-                semaphore
-            ).
-
-:- type stream0(T) == queue(stream.result(T)).
-
-new(Stream, !IO) :-
-    queue.init(Queue),
-    new(Queue, QueueGlob, !IO),
-    new(Lock, !IO),
-    signal(Lock, !IO),
-    new(Semaphore, !IO),
-    Stream = stream(Lock, QueueGlob, Semaphore).
-
-put(Stream, Thing, !IO) :-
-    Stream = stream(Lock, QueueGlob, Semaphore),
-    wait(Lock, !IO),
-    get(QueueGlob, Queue0, !IO),
-    queue.put(Queue0, ok(Thing), Queue),
-    set(QueueGlob, Queue, !IO),
-    signal(Lock, !IO),
-    signal(Semaphore, !IO).
-
-end(Stream, !IO) :-
-    Stream = stream(Lock, QueueGlob, Semaphore),
-    wait(Lock, !IO),
-    get(QueueGlob, Queue0, !IO),
-    queue.put(Queue0, end, Queue),
-    set(QueueGlob, Queue, !IO),
-    signal(Lock, !IO),
-    signal(Semaphore, !IO).
-
-error(Stream, Msg, !IO) :-
-    Stream = stream(Lock, QueueGlob, Semaphore),
-    wait(Lock, !IO),
-    get(QueueGlob, Queue0, !IO),
-    queue.put(Queue0, error(Msg), Queue),
-    set(QueueGlob, Queue, !IO),
-    signal(Lock, !IO),
-    signal(Semaphore, !IO).
-
-get(Stream, Thing, !IO) :-
-    Stream = stream(Lock, QueueGlob, Semaphore),
-    wait(Semaphore, !IO),
-    wait(Lock, !IO),
-    get(QueueGlob, Queue0, !IO),
-    ( queue.get(Queue0, Thing0, Queue) ->
-        Thing = Thing0,
-        set(QueueGlob, Queue, !IO)
-    ;
-        error("stream.get/4: queue and semaphore out of sync")
-    ),
-    signal(Lock, !IO).
-
-%-----------------------------------------------------------------------------%
-%-----------------------------------------------------------------------------%
-
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to:       mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions:          mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------



More information about the reviews mailing list