[m-dev.] for discussion: stream library v2

Peter Ross petdr at miscrit.be
Mon Sep 25 00:15:24 AEDT 2000


Please find attached my second attempt at a stream library.  This time
I have tried to define a typeclass for an impure lowlevel interface
for streams, and then defined a pure interface on top of that.

This version is an improvement on the old version for two reasons
    * it requires less implementation effort to add a new stream type.
    * you have a non-unique handle on a stream which means that you can
      still process that stream after an exception.

The only design issue is that currently each call to a stream lowlevel
interface needs both the handle and unique pair of stream state and it
would be possible to use the wrong stream pair with a handle.

stream__write_char(Tcp, 'a', Stream, _) % Is Stream associated with Tcp?

This problem could be solved by placing the handle inside the stream
unique pair.

Any comments?
And should I place this stuff into the std library?

Cheers,
Pete

PS. I also attached the socket implementation which uses this new
interface.
-------------- next part --------------
%-----------------------------------------------------------------------------%
% Copyright (C) 2000 The University of Melbourne.
% This file may only be copied under the terms of the GNU Library General
% Public License - see the file COPYING.LIB
%-----------------------------------------------------------------------------%
%
% File: lowlevel.m.
% Main author: petdr
% Stability: exceptionally low.
%
% An impure interface for describing streams.
%
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%

:- module lowlevel.

:- interface.

:- import_module char.

:- typeclass lowlevel(S) where [
		% Get an error message which describes the stream error.
	semipure func lowlevel__error_message(S) = string
].

:- typeclass lowlevel__input(S) <= lowlevel(S) where [
		% Read one character from the stream described by S.
		% Fail if we reach eof or some error condition.
	impure pred lowlevel__read_char(S::in, char::out) is semidet,

		% Have we reached the eof for S?
	semipure pred lowlevel__eof(S::in) is semidet
].

:- typeclass lowlevel__output(S) <= lowlevel(S) where [
		% Read one character from the current stream.
	impure pred lowlevel__write_char(S::in, char::in) is semidet
].

:- typeclass lowlevel__duplex(S)
		<= (lowlevel__input(S), lowlevel__output(S)) where [].

%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
-------------- next part --------------
%-----------------------------------------------------------------------------%
% Copyright (C) 2000 The University of Melbourne.
% This file may only be copied under the terms of the GNU Library General
% Public License - see the file COPYING.LIB
%-----------------------------------------------------------------------------%
%
% File: stream.m.
% Main author: petdr
% Stability: exceptionally low.
%
% This file provides interfaces for stream based I/O.
% This interface is built on top of the impure interface defined in
% lowlevel.m
%
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%

:- module stream.

:- interface.

:- import_module lowlevel.
:- import_module char, list.

:- type stream(S).

:- type stream_error ---> stream_error(string).

:- type stream__result(T)
	--->	ok(T)
	;	eof
	;	error(string)
	.

:- type stream__result
	--->	ok
	;	eof
	;	error(string)
	.

:- pred stream__init(S::in, stream(S)::uo) is det.

:- pred stream__read_char(S::in, stream__result(char)::out,
		stream(S)::di, stream(S)::uo) is det <= lowlevel__input(S).

:- pred stream__putback_char(S::in, char::in,
		stream(S)::di, stream(S)::uo) is det <= lowlevel__input(S).

:- pred stream__write_char(S::in, char::in,
		stream(S)::di, stream(S)::uo) is det <= lowlevel__output(S).

%-----------------------------------------------------------------------------%

% Predicates which require an input stream.

	% Reads a whitespace delimited word from the current input stream.
:- pred stream__read_word(S, stream__result(list(char)),
		stream(S), stream(S)) <= lowlevel__input(S).
:- mode stream__read_word(in, out, di, uo) is det.

	% Reads one line of input from the current input stream.
:- pred stream__read_line(S, stream__result(list(char)),
		stream(S), stream(S)) <= lowlevel__input(S).
:- mode stream__read_line(in, out, di, uo) is det.

	% Discards all the whitespace from the current stream.
:- pred stream__ignore_whitespace(S, stream__result,
		stream(S), stream(S)) <= lowlevel__input(S).
:- mode stream__ignore_whitespace(in, out, di, uo) is det.

%-----------------------------------------------------------------------------%

% Predicates which require an output stream.

:- pred stream__write_string(S, string,
		stream(S), stream(S)) <= lowlevel__output(S).
:- mode stream__write_string(in, in, di, uo) is det.

%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%

:- implementation.

:- import_module exception, string.

:- type stream(S) ---> stream(list(char)).

stream__init(_S, stream([])).

%-----------------------------------------------------------------------------%

:- pragma promise_pure(stream__read_char/4).
stream__read_char(Stream, Result, stream(PutbackChars), StreamOut) :-
	(
		PutbackChars = [],
		NewPutbackChars = PutbackChars,
		( impure lowlevel__read_char(Stream, Chr) ->
			Result = ok(Chr)
		;
			( semipure lowlevel__eof(Stream) ->
				Result = eof
			;
				semipure Err = lowlevel__error_message(Stream),
				Result = error(Err)
			)
		)
	;
		PutbackChars = [Chr | NewPutbackChars],
		Result = ok(Chr)
	),
	unsafe_promise_unique(stream(NewPutbackChars), StreamOut).

%-----------------------------------------------------------------------------%

stream__putback_char(_Stream, Chr, stream(PutbackChars), StreamOut) :-
	unsafe_promise_unique(stream([Chr | PutbackChars]), StreamOut).

%-----------------------------------------------------------------------------%

:- pragma promise_pure(stream__write_char/4).
stream__write_char(Stream, Chr, StreamIn, StreamOut) :-
		% XXX A better design choice may be to throw the
		% stream_error exception from inside
		% lowlevel__write_char
	( impure lowlevel__write_char(Stream, Chr) ->
		true
	;
		semipure Err = lowlevel__error_message(Stream),
		throw(stream_error(Err))
	),
	StreamOut = StreamIn.

%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%

stream__read_word(Stream, Result) -->
	stream__ignore_whitespace(Stream, WSResult),
	(
		{ WSResult = error(Error) },
		{ Result = error(Error) }
	;
		{ WSResult = eof },
		{ Result = eof }
	;
		{ WSResult = ok },
		stream__read_word_2(Stream, Result)
	).

:- pred read_word_2(S, stream__result(list(char)),
		stream(S), stream(S)) <= lowlevel__input(S).
:- mode read_word_2(in, out, di, uo) is det.

read_word_2(Stream, Result) -->
	stream__read_char(Stream, CharResult),
	(
		{ CharResult = error(Error) },
		{ Result = error(Error) }
	;
		{ CharResult = eof },
		{ Result = eof }
	;
		{ CharResult = ok(Char) },
		( { char__is_whitespace(Char) } ->
			stream__putback_char(Stream, Char),
			{ Result = ok([]) }
		;
			read_word_2(Stream, Result0),
			(
				{ Result0 = ok(Chars) },
				{ Result = ok([Char | Chars]) }
			;
				{ Result0 = error(_) },
				{ Result = Result0 }
			;
				{ Result0 = eof },
				{ Result = ok([Char]) }
			)
		)	
	).

read_line(Stream, Result) -->
	stream__read_char(Stream, CharResult),
	(
		{ CharResult = error(Error) },
		{ Result = error(Error) }
	;
		{ CharResult = eof },
		{ Result = eof }
	;
		{ CharResult = ok(Char) },
		( { Char = '\n' } ->
			{ Result = ok([Char]) }
		;
			read_line(Stream, Result0),
			(
				{ Result0 = ok(Chars) },
				{ Result = ok([Char | Chars]) }
			;
				{ Result0 = error(_) },
				{ Result = Result0 }
			;
				{ Result0 = eof },
				{ Result = ok([Char]) }
			)
		)	
	).

stream__ignore_whitespace(Stream, Result) -->
	stream__read_char(Stream, CharResult),
	(
		{ CharResult = error(Error) },
		{ Result = error(Error) }
	;
		{ CharResult = eof },
		{ Result = eof }
	;
		{ CharResult = ok(Char) },
		( { char__is_whitespace(Char) } ->
			stream__ignore_whitespace(Stream, Result)
		;
			stream__putback_char(Stream, Char),
			{ Result = ok }
		)	
	).

%-----------------------------------------------------------------------------%

stream__write_string(Stream, String) -->
	{ string__to_char_list(String, CharList) },
	list__foldl(stream__write_char(Stream), CharList).

%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
-------------- next part --------------
%---------------------------------------------------------------------------%
% Copyright (C) 2000 The University of Melbourne.
% This file may only be copied under the terms of the GNU Library General
% Public License - see the file COPYING.LIB
%-----------------------------------------------------------------------------%
%
% Module:	tcp
% Main Author:	peter.ross at miscrit.be (based on code written by pma at miscrit.be)
% Stability:	low
%
% An implementation of TCP streams.
%
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%

:- module tcp.
:- interface.
:- import_module lowlevel.

:- type tcp.
:- type bound_tcp.

:- type host == string.		% A hostname 	ie "localhost"
:- type service == string.	% A service 	ie "www"
:- type protocol == string.	% A protocol 	ie "tcp"
:- type port == int.		% A portnumber	ie 80 - the webserver

:- pred tcp__connect(host::in, port::in, tcp::out) is det.

	% You bind to a port to start listening at that port.
:- pred tcp__bind(host::in, port::in, bound_tcp::out) is det.
:- pred tcp__accept(bound_tcp::in, tcp::out) is det.

:- instance lowlevel(tcp).
:- instance lowlevel__input(tcp).
:- instance lowlevel__output(tcp).
:- instance lowlevel__duplex(tcp).

:- pred tcp__shutdown(tcp::in) is det.

:- func tcp__service_port(service) = port.

%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%

:- implementation.

:- import_module sockets.
:- import_module char, list, string, std_util, require.

:- type tcp
	--->	tcp(
			int		% socket fd
		).

:- type bound_tcp
	--->	bound_tcp(
			int,		% socket fd
			c_pointer	% struct sockaddr
		).

:- instance lowlevel(tcp) where [
	func(lowlevel__error_message/1) is tcp__error_message
].

:- instance lowlevel__input(tcp) where [
	pred(lowlevel__read_char/2) is read_char_from_socket,
	pred(lowlevel__eof/1) is tcp__eof
].

:- instance lowlevel__output(tcp) where [
	pred(lowlevel__write_char/2) is write_char_to_socket
].

:- instance lowlevel__duplex(tcp) where [].

:- pragma c_header_code("
#ifndef MR_WIN32
  #include <unistd.h>
#endif
").

%-----------------------------------------------------------------------------%

tcp__connect(Host, Port, Connection) :-
	(
		sockets__init,
		Socket = sockets__socket(2, 1, 0),
		SocketAddr = sockets__port_address(Host, Port),
		sockets__connect(Socket, SocketAddr, 16)
	->
		Connection = tcp(Socket)
	;
		error(string__format(
				"Failed to connect to %s:%d because %s.\n",
				[s(Host), i(Port), s(sockets__error_message)]))
	).

%-----------------------------------------------------------------------------%

tcp__bind(Host, Port, Connection) :-
	(
		sockets__init,
		Socket = sockets__socket(2, 1, 0),
		SocketAddr = sockets__port_address(Host, Port),
		sockets__bind(Socket, SocketAddr, 16),
		sockets__listen(Socket, 10)
	->
		Connection = bound_tcp(Socket, SocketAddr)
	;
		error(string__format(
			"Failed to connect to %s on port %d because %s.\n",
			[s(Host), i(Port), s(sockets__error_message)]))
	).

%-----------------------------------------------------------------------------%

tcp__accept(bound_tcp(SocketNo, SocketAddr), Connection) :-
	(
		NewSocket = sockets__accept(SocketNo, SocketAddr)
	->
		Connection = tcp(NewSocket)
	;
		error(string__format("tcp__accept failed because %s.\n",
				[s(sockets__error_message)]))
	).

%-----------------------------------------------------------------------------%

tcp__shutdown(tcp(SocketNo)) :-
	sockets__close(SocketNo).
	
%-----------------------------------------------------------------------------%
	
:- pred read_char_from_socket(tcp::in, char::out) is semidet.
:- pragma c_code(read_char_from_socket(Socket::in, Chr::out),
		[will_not_call_mercury, thread_safe], "{
	if (recv(Socket, &Chr, 1, 0) == -1) {
		SUCCESS_INDICATOR = FALSE;
	} else {
		SUCCESS_INDICATOR = TRUE;
	}
}").

%-----------------------------------------------------------------------------%

:- pred tcp__write_char_to_socket(tcp::in, char::in) is semidet.
:- pragma c_code(tcp__write_char_to_socket(Socket::in, Chr::in),
		[will_not_call_mercury, thread_safe], "{
	if (send(Socket, &Chr, 1, 0) == -1) {
		SUCCESS_INDICATOR = FALSE;
	} else {
		SUCCESS_INDICATOR = TRUE;
	}
}").

%-----------------------------------------------------------------------------%

tcp__service_port(Service)  = sockets__getservbyname(Service, "tcp").

%-----------------------------------------------------------------------------%

	% XXX this is not thread safe!
:- func error_message(tcp) = string.
error_message(tcp(_Socket)) = sockets__error_message.

:- pred eof(tcp::in) is semidet.
eof(tcp(_Socket)) :- semidet_fail.

%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
-------------- next part --------------
%---------------------------------------------------------------------------%
% Copyright (C) 2000 The University of Melbourne.
% This file may only be copied under the terms of the GNU Library General
% Public License - see the file COPYING.LIB
%-----------------------------------------------------------------------------%
%
% Module:	sockets
% Main Author:	peter.ross at miscrit.be (based on code written by pma at miscrit.be)
% Stability:	low
%
% Provide a low-level interface to sockets.
% The more declarative interface is provided by the module tcp.
%
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%

:- module sockets.
:- interface.

:- type sockets__res(Type)
	--->	ok(Type)
	;	error(string).

	% This predicate needs to be called at least once to initialse a
	% socket under Win32.
:- pred sockets__init is semidet.

:- func sockets__gethostbyname(string) = string.
:- func sockets__getservbyname(string, string) = int.

:- func sockets__socket(int, int, int) = int is semidet.
:- func sockets__port_address(string, int) = c_pointer is semidet.
:- func sockets__service_address(string, string) = c_pointer is semidet.
:- pred sockets__connect(int::in, c_pointer::in, int::in) is semidet.
:- pred sockets__bind(int::in, c_pointer::in, int::in) is semidet.
:- pred sockets__listen(int::in, int::in) is semidet.
:- func sockets__accept(int, c_pointer) = int is semidet.
:- pred sockets__close(int::in) is det.

	% Why did the socket operation fail?
:- func sockets__error_message = string.

%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%

:- implementation.

:- pragma c_header_code("
#ifdef MR_WIN32
  #include <winsock.h>

  #define  error()		WSAGetLastError
#else
  #include <errno.h>
  #include <netdb.h>

  #include <netinet/in.h>

  #include <sys/types.h>
  #include <sys/socket.h>

  #define  error()		errno

  #define  INVALID_SOCKET	-1
#endif
").

:- pragma c_code("
	/* Save the errno into this variable if a function fails */
static int socket_errno;
").

:- pragma c_code(sockets__init,
		 [will_not_call_mercury, not_thread_safe], "{
#ifdef MR_WIN32
	static int initialiased = FALSE;

	WORD 	wVersionRequested;
	WSADATA wsaData;
	int	err; 

	if (!initialiased) {
		wVersionRequested = MAKEWORD( 2, 2 ); 
		err = WSAStartup(wVersionRequested, &wsaData);

		if ( err != 0 ) {
			MR_fatal_error(""Unable to find a ""
					""usable winsock.dll\\n"");
		}

		if ( LOBYTE( wsaData.wVersion ) != 2 ||
				HIBYTE( wsaData.wVersion ) != 2 ) {
			WSACleanup();
			MR_fatal_error(""Unable to find a ""
					""usable winsock.dll\\n"");
		}
		initialiased = TRUE;
	}
	SUCCESS_INDICATOR = TRUE;
#endif
}").

	% XXX thread safe?
:- pragma c_code(gethostbyname(Name::in) = (Host::out),
		 [will_not_call_mercury], "{
	struct hostent	*host;
	host = gethostbyname(Name);
	Host = (MR_String) host->h_name;
}").

	% XXX thread safe?
:- pragma c_code(getservbyname(Name::in, Protocol::in) = (Port::out),
		 [will_not_call_mercury], "{
	struct servent *service;
	service = getservbyname(Name, Protocol);
	Port = (MR_Integer) ntohs(service->s_port);
}").

	% XXX thread safe?
:- pragma c_code(socket(Domain::in, Type::in, Protocol::in) = (Socket::out),
		 [will_not_call_mercury], "{
	Socket = socket(Domain, Type, Protocol);
	if (Socket == INVALID_SOCKET) {
		socket_errno = error();
		SUCCESS_INDICATOR = FALSE;
	} else {
		SUCCESS_INDICATOR = TRUE;
	}
}").

	% XXX thread safe?
:- pragma c_code(port_address(Host::in, Port::in) = (SA::out),
		 [will_not_call_mercury], "{
	struct hostent *host;
	struct sockaddr_in *addr;

	host = gethostbyname(Host);
	if (host == NULL) {
		socket_errno = error();
		SUCCESS_INDICATOR = FALSE;
	} else {
		addr = MR_GC_NEW(struct sockaddr_in);

		memcpy(&(addr->sin_addr), host->h_addr, host->h_length);
		addr->sin_family = host->h_addrtype;
		addr->sin_port = htons(Port);

		SA = (MR_Word) addr;
		SUCCESS_INDICATOR = TRUE;
	}
}").

	% XXX thread safe?
:- pragma c_code(service_address(Service::in, Host::in) = (SA::out),
		 [will_not_call_mercury], "{
	struct hostent *host;
	struct servent *service;
	struct sockaddr_in *addr;

	host = gethostbyname(Host);
	if (host == NULL) {
		socket_errno = error();
		SUCCESS_INDICATOR = FALSE;
	} else {
		service = getservbyname(Service,""tcp"");

		if (service == NULL) {
			socket_errno = error();
			SUCCESS_INDICATOR = FALSE;
		} else {
			addr = MR_GC_NEW(struct sockaddr_in);

			memcpy(&(addr->sin_addr), host->h_addr, host->h_length);
			addr->sin_family = host->h_addrtype;
			addr->sin_port = service->s_port;

			SA = (MR_Word) addr;
			SUCCESS_INDICATOR = TRUE;
		}
	}
}").

	% XXX thread safe?
:- pragma c_code(connect(Fd::in, Addr::in, AddrLen::in), 
		 [will_not_call_mercury], "{
	struct sockaddr *addr = (struct sockaddr *) Addr;
	if (connect(Fd, addr, AddrLen) == INVALID_SOCKET) {
		socket_errno = error();
		SUCCESS_INDICATOR = FALSE;
	} else {
		SUCCESS_INDICATOR = TRUE;
	}
}").

	% XXX thread safe?
:- pragma c_code(bind(Fd::in, Addr::in, AddrLen::in), 
		 [will_not_call_mercury], "{
	struct sockaddr *addr = (struct sockaddr *) Addr;
	if (bind(Fd, addr, AddrLen) == INVALID_SOCKET) {
		socket_errno = error();
		SUCCESS_INDICATOR = FALSE;
	} else {
		SUCCESS_INDICATOR = TRUE;
	}
}").

	% XXX thread safe?
:- pragma c_code(listen(Fd::in, BackLog::in), 
		 [will_not_call_mercury], "{
	if (listen(Fd, BackLog) == INVALID_SOCKET) {
		socket_errno = error();
		SUCCESS_INDICATOR = FALSE;
	} else {
		SUCCESS_INDICATOR = TRUE;
	}
}").

	% This code can block, so we make it thread_safe
	% so as to avoid other code blocking on the global mutex.
:- pragma c_code(accept(Fd::in, Addr::in) = (NewSocket::out), 
		 [thread_safe, will_not_call_mercury], "{
	struct sockaddr *addr = (struct sockaddr *) Addr;
	NewSocket = accept(Fd, addr, NULL);
	if (NewSocket == INVALID_SOCKET) {
		socket_errno = error();
		SUCCESS_INDICATOR = FALSE;
	} else {
		SUCCESS_INDICATOR = TRUE;
	}
}").

	% XXX thread safe?
:- pragma c_code(sockets__close(Fd::in), [will_not_call_mercury], "{
	struct linger sockets_linger = { TRUE, 2 };
	setsockopt(Fd, SOL_SOCKET, SO_LINGER,
			&sockets_linger, sizeof(sockets_linger));
	shutdown(Fd, 2);
}").

	% XXX thread safe?
:- pragma c_code(error_message = (Err::out),
		 [will_not_call_mercury], "{
	MR_make_aligned_string_copy(Err, strerror(socket_errno));
}").


%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%


More information about the developers mailing list