[m-dev.] for discussion: stream library v2
Peter Ross
petdr at miscrit.be
Mon Sep 25 00:15:24 AEDT 2000
Please find attached my second attempt at a stream library. This time
I have tried to define a typeclass for an impure lowlevel interface
for streams, and then defined a pure interface on top of that.
This version is an improvement on the old version for two reasons
* it requires less implementation effort to add a new stream type.
* you have a non-unique handle on a stream which means that you can
still process that stream after an exception.
The only design issue is that currently each call to a stream lowlevel
interface needs both the handle and unique pair of stream state and it
would be possible to use the wrong stream pair with a handle.
stream__write_char(Tcp, 'a', Stream, _) % Is Stream associated with Tcp?
This problem could be solved by placing the handle inside the stream
unique pair.
Any comments?
And should I place this stuff into the std library?
Cheers,
Pete
PS. I also attached the socket implementation which uses this new
interface.
-------------- next part --------------
%-----------------------------------------------------------------------------%
% Copyright (C) 2000 The University of Melbourne.
% This file may only be copied under the terms of the GNU Library General
% Public License - see the file COPYING.LIB
%-----------------------------------------------------------------------------%
%
% File: lowlevel.m.
% Main author: petdr
% Stability: exceptionally low.
%
% An impure interface for describing streams.
%
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
:- module lowlevel.
:- interface.
:- import_module char.
:- typeclass lowlevel(S) where [
% Get an error message which describes the stream error.
semipure func lowlevel__error_message(S) = string
].
:- typeclass lowlevel__input(S) <= lowlevel(S) where [
% Read one character from the stream described by S.
% Fail if we reach eof or some error condition.
impure pred lowlevel__read_char(S::in, char::out) is semidet,
% Have we reached the eof for S?
semipure pred lowlevel__eof(S::in) is semidet
].
:- typeclass lowlevel__output(S) <= lowlevel(S) where [
% Read one character from the current stream.
impure pred lowlevel__write_char(S::in, char::in) is semidet
].
:- typeclass lowlevel__duplex(S)
<= (lowlevel__input(S), lowlevel__output(S)) where [].
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
-------------- next part --------------
%-----------------------------------------------------------------------------%
% Copyright (C) 2000 The University of Melbourne.
% This file may only be copied under the terms of the GNU Library General
% Public License - see the file COPYING.LIB
%-----------------------------------------------------------------------------%
%
% File: stream.m.
% Main author: petdr
% Stability: exceptionally low.
%
% This file provides interfaces for stream based I/O.
% This interface is built on top of the impure interface defined in
% lowlevel.m
%
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
:- module stream.
:- interface.
:- import_module lowlevel.
:- import_module char, list.
:- type stream(S).
:- type stream_error ---> stream_error(string).
:- type stream__result(T)
---> ok(T)
; eof
; error(string)
.
:- type stream__result
---> ok
; eof
; error(string)
.
:- pred stream__init(S::in, stream(S)::uo) is det.
:- pred stream__read_char(S::in, stream__result(char)::out,
stream(S)::di, stream(S)::uo) is det <= lowlevel__input(S).
:- pred stream__putback_char(S::in, char::in,
stream(S)::di, stream(S)::uo) is det <= lowlevel__input(S).
:- pred stream__write_char(S::in, char::in,
stream(S)::di, stream(S)::uo) is det <= lowlevel__output(S).
%-----------------------------------------------------------------------------%
% Predicates which require an input stream.
% Reads a whitespace delimited word from the current input stream.
:- pred stream__read_word(S, stream__result(list(char)),
stream(S), stream(S)) <= lowlevel__input(S).
:- mode stream__read_word(in, out, di, uo) is det.
% Reads one line of input from the current input stream.
:- pred stream__read_line(S, stream__result(list(char)),
stream(S), stream(S)) <= lowlevel__input(S).
:- mode stream__read_line(in, out, di, uo) is det.
% Discards all the whitespace from the current stream.
:- pred stream__ignore_whitespace(S, stream__result,
stream(S), stream(S)) <= lowlevel__input(S).
:- mode stream__ignore_whitespace(in, out, di, uo) is det.
%-----------------------------------------------------------------------------%
% Predicates which require an output stream.
:- pred stream__write_string(S, string,
stream(S), stream(S)) <= lowlevel__output(S).
:- mode stream__write_string(in, in, di, uo) is det.
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
:- implementation.
:- import_module exception, string.
:- type stream(S) ---> stream(list(char)).
stream__init(_S, stream([])).
%-----------------------------------------------------------------------------%
:- pragma promise_pure(stream__read_char/4).
stream__read_char(Stream, Result, stream(PutbackChars), StreamOut) :-
(
PutbackChars = [],
NewPutbackChars = PutbackChars,
( impure lowlevel__read_char(Stream, Chr) ->
Result = ok(Chr)
;
( semipure lowlevel__eof(Stream) ->
Result = eof
;
semipure Err = lowlevel__error_message(Stream),
Result = error(Err)
)
)
;
PutbackChars = [Chr | NewPutbackChars],
Result = ok(Chr)
),
unsafe_promise_unique(stream(NewPutbackChars), StreamOut).
%-----------------------------------------------------------------------------%
stream__putback_char(_Stream, Chr, stream(PutbackChars), StreamOut) :-
unsafe_promise_unique(stream([Chr | PutbackChars]), StreamOut).
%-----------------------------------------------------------------------------%
:- pragma promise_pure(stream__write_char/4).
stream__write_char(Stream, Chr, StreamIn, StreamOut) :-
% XXX A better design choice may be to throw the
% stream_error exception from inside
% lowlevel__write_char
( impure lowlevel__write_char(Stream, Chr) ->
true
;
semipure Err = lowlevel__error_message(Stream),
throw(stream_error(Err))
),
StreamOut = StreamIn.
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
stream__read_word(Stream, Result) -->
stream__ignore_whitespace(Stream, WSResult),
(
{ WSResult = error(Error) },
{ Result = error(Error) }
;
{ WSResult = eof },
{ Result = eof }
;
{ WSResult = ok },
stream__read_word_2(Stream, Result)
).
:- pred read_word_2(S, stream__result(list(char)),
stream(S), stream(S)) <= lowlevel__input(S).
:- mode read_word_2(in, out, di, uo) is det.
read_word_2(Stream, Result) -->
stream__read_char(Stream, CharResult),
(
{ CharResult = error(Error) },
{ Result = error(Error) }
;
{ CharResult = eof },
{ Result = eof }
;
{ CharResult = ok(Char) },
( { char__is_whitespace(Char) } ->
stream__putback_char(Stream, Char),
{ Result = ok([]) }
;
read_word_2(Stream, Result0),
(
{ Result0 = ok(Chars) },
{ Result = ok([Char | Chars]) }
;
{ Result0 = error(_) },
{ Result = Result0 }
;
{ Result0 = eof },
{ Result = ok([Char]) }
)
)
).
read_line(Stream, Result) -->
stream__read_char(Stream, CharResult),
(
{ CharResult = error(Error) },
{ Result = error(Error) }
;
{ CharResult = eof },
{ Result = eof }
;
{ CharResult = ok(Char) },
( { Char = '\n' } ->
{ Result = ok([Char]) }
;
read_line(Stream, Result0),
(
{ Result0 = ok(Chars) },
{ Result = ok([Char | Chars]) }
;
{ Result0 = error(_) },
{ Result = Result0 }
;
{ Result0 = eof },
{ Result = ok([Char]) }
)
)
).
stream__ignore_whitespace(Stream, Result) -->
stream__read_char(Stream, CharResult),
(
{ CharResult = error(Error) },
{ Result = error(Error) }
;
{ CharResult = eof },
{ Result = eof }
;
{ CharResult = ok(Char) },
( { char__is_whitespace(Char) } ->
stream__ignore_whitespace(Stream, Result)
;
stream__putback_char(Stream, Char),
{ Result = ok }
)
).
%-----------------------------------------------------------------------------%
stream__write_string(Stream, String) -->
{ string__to_char_list(String, CharList) },
list__foldl(stream__write_char(Stream), CharList).
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
-------------- next part --------------
%---------------------------------------------------------------------------%
% Copyright (C) 2000 The University of Melbourne.
% This file may only be copied under the terms of the GNU Library General
% Public License - see the file COPYING.LIB
%-----------------------------------------------------------------------------%
%
% Module: tcp
% Main Author: peter.ross at miscrit.be (based on code written by pma at miscrit.be)
% Stability: low
%
% An implementation of TCP streams.
%
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
:- module tcp.
:- interface.
:- import_module lowlevel.
:- type tcp.
:- type bound_tcp.
:- type host == string. % A hostname ie "localhost"
:- type service == string. % A service ie "www"
:- type protocol == string. % A protocol ie "tcp"
:- type port == int. % A portnumber ie 80 - the webserver
:- pred tcp__connect(host::in, port::in, tcp::out) is det.
% You bind to a port to start listening at that port.
:- pred tcp__bind(host::in, port::in, bound_tcp::out) is det.
:- pred tcp__accept(bound_tcp::in, tcp::out) is det.
:- instance lowlevel(tcp).
:- instance lowlevel__input(tcp).
:- instance lowlevel__output(tcp).
:- instance lowlevel__duplex(tcp).
:- pred tcp__shutdown(tcp::in) is det.
:- func tcp__service_port(service) = port.
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
:- implementation.
:- import_module sockets.
:- import_module char, list, string, std_util, require.
:- type tcp
---> tcp(
int % socket fd
).
:- type bound_tcp
---> bound_tcp(
int, % socket fd
c_pointer % struct sockaddr
).
:- instance lowlevel(tcp) where [
func(lowlevel__error_message/1) is tcp__error_message
].
:- instance lowlevel__input(tcp) where [
pred(lowlevel__read_char/2) is read_char_from_socket,
pred(lowlevel__eof/1) is tcp__eof
].
:- instance lowlevel__output(tcp) where [
pred(lowlevel__write_char/2) is write_char_to_socket
].
:- instance lowlevel__duplex(tcp) where [].
:- pragma c_header_code("
#ifndef MR_WIN32
#include <unistd.h>
#endif
").
%-----------------------------------------------------------------------------%
tcp__connect(Host, Port, Connection) :-
(
sockets__init,
Socket = sockets__socket(2, 1, 0),
SocketAddr = sockets__port_address(Host, Port),
sockets__connect(Socket, SocketAddr, 16)
->
Connection = tcp(Socket)
;
error(string__format(
"Failed to connect to %s:%d because %s.\n",
[s(Host), i(Port), s(sockets__error_message)]))
).
%-----------------------------------------------------------------------------%
tcp__bind(Host, Port, Connection) :-
(
sockets__init,
Socket = sockets__socket(2, 1, 0),
SocketAddr = sockets__port_address(Host, Port),
sockets__bind(Socket, SocketAddr, 16),
sockets__listen(Socket, 10)
->
Connection = bound_tcp(Socket, SocketAddr)
;
error(string__format(
"Failed to connect to %s on port %d because %s.\n",
[s(Host), i(Port), s(sockets__error_message)]))
).
%-----------------------------------------------------------------------------%
tcp__accept(bound_tcp(SocketNo, SocketAddr), Connection) :-
(
NewSocket = sockets__accept(SocketNo, SocketAddr)
->
Connection = tcp(NewSocket)
;
error(string__format("tcp__accept failed because %s.\n",
[s(sockets__error_message)]))
).
%-----------------------------------------------------------------------------%
tcp__shutdown(tcp(SocketNo)) :-
sockets__close(SocketNo).
%-----------------------------------------------------------------------------%
:- pred read_char_from_socket(tcp::in, char::out) is semidet.
:- pragma c_code(read_char_from_socket(Socket::in, Chr::out),
[will_not_call_mercury, thread_safe], "{
if (recv(Socket, &Chr, 1, 0) == -1) {
SUCCESS_INDICATOR = FALSE;
} else {
SUCCESS_INDICATOR = TRUE;
}
}").
%-----------------------------------------------------------------------------%
:- pred tcp__write_char_to_socket(tcp::in, char::in) is semidet.
:- pragma c_code(tcp__write_char_to_socket(Socket::in, Chr::in),
[will_not_call_mercury, thread_safe], "{
if (send(Socket, &Chr, 1, 0) == -1) {
SUCCESS_INDICATOR = FALSE;
} else {
SUCCESS_INDICATOR = TRUE;
}
}").
%-----------------------------------------------------------------------------%
tcp__service_port(Service) = sockets__getservbyname(Service, "tcp").
%-----------------------------------------------------------------------------%
% XXX this is not thread safe!
:- func error_message(tcp) = string.
error_message(tcp(_Socket)) = sockets__error_message.
:- pred eof(tcp::in) is semidet.
eof(tcp(_Socket)) :- semidet_fail.
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
-------------- next part --------------
%---------------------------------------------------------------------------%
% Copyright (C) 2000 The University of Melbourne.
% This file may only be copied under the terms of the GNU Library General
% Public License - see the file COPYING.LIB
%-----------------------------------------------------------------------------%
%
% Module: sockets
% Main Author: peter.ross at miscrit.be (based on code written by pma at miscrit.be)
% Stability: low
%
% Provide a low-level interface to sockets.
% The more declarative interface is provided by the module tcp.
%
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
:- module sockets.
:- interface.
:- type sockets__res(Type)
---> ok(Type)
; error(string).
% This predicate needs to be called at least once to initialse a
% socket under Win32.
:- pred sockets__init is semidet.
:- func sockets__gethostbyname(string) = string.
:- func sockets__getservbyname(string, string) = int.
:- func sockets__socket(int, int, int) = int is semidet.
:- func sockets__port_address(string, int) = c_pointer is semidet.
:- func sockets__service_address(string, string) = c_pointer is semidet.
:- pred sockets__connect(int::in, c_pointer::in, int::in) is semidet.
:- pred sockets__bind(int::in, c_pointer::in, int::in) is semidet.
:- pred sockets__listen(int::in, int::in) is semidet.
:- func sockets__accept(int, c_pointer) = int is semidet.
:- pred sockets__close(int::in) is det.
% Why did the socket operation fail?
:- func sockets__error_message = string.
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
:- implementation.
:- pragma c_header_code("
#ifdef MR_WIN32
#include <winsock.h>
#define error() WSAGetLastError
#else
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#define error() errno
#define INVALID_SOCKET -1
#endif
").
:- pragma c_code("
/* Save the errno into this variable if a function fails */
static int socket_errno;
").
:- pragma c_code(sockets__init,
[will_not_call_mercury, not_thread_safe], "{
#ifdef MR_WIN32
static int initialiased = FALSE;
WORD wVersionRequested;
WSADATA wsaData;
int err;
if (!initialiased) {
wVersionRequested = MAKEWORD( 2, 2 );
err = WSAStartup(wVersionRequested, &wsaData);
if ( err != 0 ) {
MR_fatal_error(""Unable to find a ""
""usable winsock.dll\\n"");
}
if ( LOBYTE( wsaData.wVersion ) != 2 ||
HIBYTE( wsaData.wVersion ) != 2 ) {
WSACleanup();
MR_fatal_error(""Unable to find a ""
""usable winsock.dll\\n"");
}
initialiased = TRUE;
}
SUCCESS_INDICATOR = TRUE;
#endif
}").
% XXX thread safe?
:- pragma c_code(gethostbyname(Name::in) = (Host::out),
[will_not_call_mercury], "{
struct hostent *host;
host = gethostbyname(Name);
Host = (MR_String) host->h_name;
}").
% XXX thread safe?
:- pragma c_code(getservbyname(Name::in, Protocol::in) = (Port::out),
[will_not_call_mercury], "{
struct servent *service;
service = getservbyname(Name, Protocol);
Port = (MR_Integer) ntohs(service->s_port);
}").
% XXX thread safe?
:- pragma c_code(socket(Domain::in, Type::in, Protocol::in) = (Socket::out),
[will_not_call_mercury], "{
Socket = socket(Domain, Type, Protocol);
if (Socket == INVALID_SOCKET) {
socket_errno = error();
SUCCESS_INDICATOR = FALSE;
} else {
SUCCESS_INDICATOR = TRUE;
}
}").
% XXX thread safe?
:- pragma c_code(port_address(Host::in, Port::in) = (SA::out),
[will_not_call_mercury], "{
struct hostent *host;
struct sockaddr_in *addr;
host = gethostbyname(Host);
if (host == NULL) {
socket_errno = error();
SUCCESS_INDICATOR = FALSE;
} else {
addr = MR_GC_NEW(struct sockaddr_in);
memcpy(&(addr->sin_addr), host->h_addr, host->h_length);
addr->sin_family = host->h_addrtype;
addr->sin_port = htons(Port);
SA = (MR_Word) addr;
SUCCESS_INDICATOR = TRUE;
}
}").
% XXX thread safe?
:- pragma c_code(service_address(Service::in, Host::in) = (SA::out),
[will_not_call_mercury], "{
struct hostent *host;
struct servent *service;
struct sockaddr_in *addr;
host = gethostbyname(Host);
if (host == NULL) {
socket_errno = error();
SUCCESS_INDICATOR = FALSE;
} else {
service = getservbyname(Service,""tcp"");
if (service == NULL) {
socket_errno = error();
SUCCESS_INDICATOR = FALSE;
} else {
addr = MR_GC_NEW(struct sockaddr_in);
memcpy(&(addr->sin_addr), host->h_addr, host->h_length);
addr->sin_family = host->h_addrtype;
addr->sin_port = service->s_port;
SA = (MR_Word) addr;
SUCCESS_INDICATOR = TRUE;
}
}
}").
% XXX thread safe?
:- pragma c_code(connect(Fd::in, Addr::in, AddrLen::in),
[will_not_call_mercury], "{
struct sockaddr *addr = (struct sockaddr *) Addr;
if (connect(Fd, addr, AddrLen) == INVALID_SOCKET) {
socket_errno = error();
SUCCESS_INDICATOR = FALSE;
} else {
SUCCESS_INDICATOR = TRUE;
}
}").
% XXX thread safe?
:- pragma c_code(bind(Fd::in, Addr::in, AddrLen::in),
[will_not_call_mercury], "{
struct sockaddr *addr = (struct sockaddr *) Addr;
if (bind(Fd, addr, AddrLen) == INVALID_SOCKET) {
socket_errno = error();
SUCCESS_INDICATOR = FALSE;
} else {
SUCCESS_INDICATOR = TRUE;
}
}").
% XXX thread safe?
:- pragma c_code(listen(Fd::in, BackLog::in),
[will_not_call_mercury], "{
if (listen(Fd, BackLog) == INVALID_SOCKET) {
socket_errno = error();
SUCCESS_INDICATOR = FALSE;
} else {
SUCCESS_INDICATOR = TRUE;
}
}").
% This code can block, so we make it thread_safe
% so as to avoid other code blocking on the global mutex.
:- pragma c_code(accept(Fd::in, Addr::in) = (NewSocket::out),
[thread_safe, will_not_call_mercury], "{
struct sockaddr *addr = (struct sockaddr *) Addr;
NewSocket = accept(Fd, addr, NULL);
if (NewSocket == INVALID_SOCKET) {
socket_errno = error();
SUCCESS_INDICATOR = FALSE;
} else {
SUCCESS_INDICATOR = TRUE;
}
}").
% XXX thread safe?
:- pragma c_code(sockets__close(Fd::in), [will_not_call_mercury], "{
struct linger sockets_linger = { TRUE, 2 };
setsockopt(Fd, SOL_SOCKET, SO_LINGER,
&sockets_linger, sizeof(sockets_linger));
shutdown(Fd, 2);
}").
% XXX thread safe?
:- pragma c_code(error_message = (Err::out),
[will_not_call_mercury], "{
MR_make_aligned_string_copy(Err, strerror(socket_errno));
}").
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
More information about the developers
mailing list