[m-dev.] for discussion: stream library v2

Fergus Henderson fjh at cs.mu.OZ.AU
Mon Sep 25 10:49:03 AEDT 2000


On 24-Sep-2000, Peter Ross <petdr at miscrit.be> wrote:
> Please find attached my second attempt at a stream library.  This time
> I have tried to define a typeclass for an impure lowlevel interface
> for streams, and then defined a pure interface on top of that.

I think it's a good idea to build streams using a lowlevel interface
for defining streams with a highlevel interface for using streams
built on top of that, but it would be nicer if the lowlevel interface
didn't need to be impure.

> PS. I also attached the socket implementation which uses this new
> interface.

It would also be a good idea to have an implementation of ordinary file
I/O using this new interface.

> Any comments?
> And should I place this stuff into the std library?

Not yet, IMHO -- see the comments below.

> :- module stream.
> 
> :- interface.
> 
> :- import_module lowlevel.
> :- import_module char, list.
> 
> :- type stream(S).

Some comments about what the stream(S) type represents
would be helpful.

> :- type stream_error ---> stream_error(string).
> 
> :- type stream__result(T)
> 	--->	ok(T)
> 	;	eof
> 	;	error(string)
> 	.
> 
> :- type stream__result
> 	--->	ok
> 	;	eof
> 	;	error(string)
> 	.
> 
> :- pred stream__init(S::in, stream(S)::uo) is det.
> 
> :- pred stream__read_char(S::in, stream__result(char)::out,
> 		stream(S)::di, stream(S)::uo) is det <= lowlevel__input(S).
> 
> :- pred stream__putback_char(S::in, char::in,
> 		stream(S)::di, stream(S)::uo) is det <= lowlevel__input(S).
> 
> :- pred stream__write_char(S::in, char::in,
> 		stream(S)::di, stream(S)::uo) is det <= lowlevel__output(S).

These types and predicates should also be documented.

> :- type stream(S) ---> stream(list(char)).

You should document what the list(char) represents,
and why the `S' parameter is not used.

> :- pragma promise_pure(stream__read_char/4).
> stream__read_char(Stream, Result, stream(PutbackChars), StreamOut) :-
> 	(
> 		PutbackChars = [],
> 		NewPutbackChars = PutbackChars,
> 		( impure lowlevel__read_char(Stream, Chr) ->
> 			Result = ok(Chr)
> 		;
> 			( semipure lowlevel__eof(Stream) ->
> 				Result = eof
> 			;
> 				semipure Err = lowlevel__error_message(Stream),
> 				Result = error(Err)
> 			)
> 		)
> 	;
> 		PutbackChars = [Chr | NewPutbackChars],
> 		Result = ok(Chr)
> 	),
> 	unsafe_promise_unique(stream(NewPutbackChars), StreamOut).

Here you're assuming that if the stream is at eof, then the
previous operation succeeded.  This assumption is not documented
in the lowlevel interface, and I don't think it is likely to hold in
general.  Making this assumption may cause some errors to be ignored.

Also, why do you need `unsafe_promise_unique' here?

> stream__putback_char(_Stream, Chr, stream(PutbackChars), StreamOut) :-
> 	unsafe_promise_unique(stream([Chr | PutbackChars]), StreamOut).

Rather than calling unsafe_promise_unique here, it would be nicer to call
copy/2 on Chr. 

Currently copy/2 is not as efficient as it could be, so for efficiency
you might want to define

	:- func copy_char(char::in) = (char::ui).
	copy_char(C0) = C :-
		unsafe_promise_unique(C0, C).

and use that instead.  But this is still an improvement, because calling
unsafe_promise_unique on a char is nicer than calling it on a stream,
since it's easier for a reader to understand why it is safe.

> :- pragma promise_pure(stream__write_char/4).
> stream__write_char(Stream, Chr, StreamIn, StreamOut) :-
> 		% XXX A better design choice may be to throw the
> 		% stream_error exception from inside
> 		% lowlevel__write_char
> 	( impure lowlevel__write_char(Stream, Chr) ->
> 		true
> 	;
> 		semipure Err = lowlevel__error_message(Stream),
> 		throw(stream_error(Err))
> 	),
> 	StreamOut = StreamIn.

I agree with the XXX comment here -- it would be better to throw
the exception from inside lowlevel__write_char.

> %---------------------------------------------------------------------------%
> % Copyright (C) 2000 The University of Melbourne.
> % This file may only be copied under the terms of the GNU Library General
> % Public License - see the file COPYING.LIB
> %-----------------------------------------------------------------------------%
> %
> % Module:	tcp
> % Main Author:	peter.ross at miscrit.be (based on code written by pma at miscrit.be)
> % Stability:	low
> %
> % An implementation of TCP streams.
> %
> %-----------------------------------------------------------------------------%
> %-----------------------------------------------------------------------------%
> 
> :- module tcp.
> :- interface.
> :- import_module lowlevel.
> 
> :- type tcp.
> :- type bound_tcp.
> 
> :- type host == string.		% A hostname 	ie "localhost"
> :- type service == string.	% A service 	ie "www"
> :- type protocol == string.	% A protocol 	ie "tcp"
> :- type port == int.		% A portnumber	ie 80 - the webserver
> 
> :- pred tcp__connect(host::in, port::in, tcp::out) is det.
> 
> 	% You bind to a port to start listening at that port.
> :- pred tcp__bind(host::in, port::in, bound_tcp::out) is det.
> :- pred tcp__accept(bound_tcp::in, tcp::out) is det.

Those predicates are all impure, and so should be declared as such.

> :- pred tcp__shutdown(tcp::in) is det.
> 
> :- func tcp__service_port(service) = port.

Likewise.

> %-----------------------------------------------------------------------------%
> 
> tcp__connect(Host, Port, Connection) :-
> 	(
> 		sockets__init,
> 		Socket = sockets__socket(2, 1, 0),
> 		SocketAddr = sockets__port_address(Host, Port),
> 		sockets__connect(Socket, SocketAddr, 16)

What are the magic numbers?

> 	->
> 		Connection = tcp(Socket)
> 	;
> 		error(string__format(
> 				"Failed to connect to %s:%d because %s.\n",
> 				[s(Host), i(Port), s(sockets__error_message)]))
> 	).

error/1 should be used for program bugs, not for I/O errors.

The error message would probably be clearer if you said
"host %s port %d: %s" rather than "%s:%d because %s".

> %-----------------------------------------------------------------------------%
> 
> tcp__bind(Host, Port, Connection) :-
> 	(
> 		sockets__init,
> 		Socket = sockets__socket(2, 1, 0),
> 		SocketAddr = sockets__port_address(Host, Port),
> 		sockets__bind(Socket, SocketAddr, 16),
> 		sockets__listen(Socket, 10)
> 	->
> 		Connection = bound_tcp(Socket, SocketAddr)
> 	;
> 		error(string__format(
> 			"Failed to connect to %s on port %d because %s.\n",
> 			[s(Host), i(Port), s(sockets__error_message)]))
> 	).

Likewise.

> %-----------------------------------------------------------------------------%
> 
> tcp__accept(bound_tcp(SocketNo, SocketAddr), Connection) :-
> 	(
> 		NewSocket = sockets__accept(SocketNo, SocketAddr)
> 	->
> 		Connection = tcp(NewSocket)
> 	;
> 		error(string__format("tcp__accept failed because %s.\n",
> 				[s(sockets__error_message)]))
> 	).

Likewise.

> 	% XXX this is not thread safe!
> :- func error_message(tcp) = string.
> error_message(tcp(_Socket)) = sockets__error_message.
> 
> :- pred eof(tcp::in) is semidet.
> eof(tcp(_Socket)) :- semidet_fail.
> 
> %-----------------------------------------------------------------------------%
> %-----------------------------------------------------------------------------%

> :- module sockets.
> :- interface.
> 
> :- type sockets__res(Type)
> 	--->	ok(Type)
> 	;	error(string).
> 
> 	% This predicate needs to be called at least once to initialse a
> 	% socket under Win32.
> :- pred sockets__init is semidet.
> 
> :- func sockets__gethostbyname(string) = string.
> :- func sockets__getservbyname(string, string) = int.
> 
> :- func sockets__socket(int, int, int) = int is semidet.
> :- func sockets__port_address(string, int) = c_pointer is semidet.
> :- func sockets__service_address(string, string) = c_pointer is semidet.
> :- pred sockets__connect(int::in, c_pointer::in, int::in) is semidet.
> :- pred sockets__bind(int::in, c_pointer::in, int::in) is semidet.
> :- pred sockets__listen(int::in, int::in) is semidet.
> :- func sockets__accept(int, c_pointer) = int is semidet.
> :- pred sockets__close(int::in) is det.
> 
> 	% Why did the socket operation fail?
> :- func sockets__error_message = string.

These procedures are all impure.

> :- pragma c_code("
> 	/* Save the errno into this variable if a function fails */
> static int socket_errno;
> ").

Using a static variable in pragma c_code can be bad news if
intermodule optimization is enabled.  Better to make it extern
(and declare it using `pragma c_header_code' as well as
defining it using `pragma c_code').

But using static variables at all here is not good design.
The design should be modified so that you don't need to save
errno into a static/extern variable.

> :- pragma c_code(sockets__init,
> 		 [will_not_call_mercury, not_thread_safe], "{
> #ifdef MR_WIN32
> 	static int initialiased = FALSE;

s/initialiased/initialised/
s/int/bool/

> 	% XXX thread safe?
> :- pragma c_code(gethostbyname(Name::in) = (Host::out),
> 		 [will_not_call_mercury], "{
> 	struct hostent	*host;
> 	host = gethostbyname(Name);
> 	Host = (MR_String) host->h_name;
> }").

No, that is not thread safe.

Also you need to make a copy of the string that you're returning
(e.g. using MR_make_aligned_string_copy()).

> 	% XXX thread safe?
> :- pragma c_code(socket(Domain::in, Type::in, Protocol::in) = (Socket::out),
> 		 [will_not_call_mercury], "{
> 	Socket = socket(Domain, Type, Protocol);
> 	if (Socket == INVALID_SOCKET) {
> 		socket_errno = error();
> 		SUCCESS_INDICATOR = FALSE;
> 	} else {
> 		SUCCESS_INDICATOR = TRUE;
> 	}
> }").

This is not thread safe, since it modifies socket_errno, but it would
be a nicer design, and more efficient for multithreaded programs, 
if any error indication was returned as a parameter rather than
via a global variable -- in that case, it would be thread-safe.

Declaring this as not_thread_safe (as is the default)
doesn't actually help to make it thread safe, because
there is a window between when this function returns and
when you call socket__error_message during which another
thread may clobber socket_errno.

> 	% XXX thread safe?
> :- pragma c_code(connect(Fd::in, Addr::in, AddrLen::in), 
> 		 [will_not_call_mercury], "{
> 	struct sockaddr *addr = (struct sockaddr *) Addr;
> 	if (connect(Fd, addr, AddrLen) == INVALID_SOCKET) {
> 		socket_errno = error();
> 		SUCCESS_INDICATOR = FALSE;
> 	} else {
> 		SUCCESS_INDICATOR = TRUE;
> 	}
> }").

The same issue arises here, only it is even worse.  Since this is not
declared thread-safe, the Mercury compiler will generate code around
this which acquires the global lock, to ensure that it is not called
by multiple threads simultaneously.  But that is bad news, because
this procedure can block while it has the global lock!  This may cause
other threads to be blocked indefinitely while this thread is blocked
waiting for a connection.

-- 
Fergus Henderson <fjh at cs.mu.oz.au>  |  "I have always known that the pursuit
WWW: <http://www.cs.mu.oz.au/~fjh>  |  of excellence is a lethal habit"
PGP: finger fjh at 128.250.37.3        |     -- the last words of T. S. Garp.
--------------------------------------------------------------------------
mercury-developers mailing list
Post messages to:       mercury-developers at cs.mu.oz.au
Administrative Queries: owner-mercury-developers at cs.mu.oz.au
Subscriptions:          mercury-developers-request at cs.mu.oz.au
--------------------------------------------------------------------------



More information about the developers mailing list