[m-rev.] for review: improvements to net/tcp.m

Peter Wang wangp at students.csse.unimelb.edu.au
Wed Mar 28 15:50:36 AEST 2007


Improvements to the tcp module.

extras/net/tcp.m:
	Make tcp an instance of the `reader' typeclass with unit `string' and
	implement line reading efficiently in C.

	Do input buffering on sockets so it's not necessary to call recv() for
	each character read.

	Delete `is_eof' predicate.  This is no longer needed due to the change
	in the way characters are read.

	Add predicates tcp__ignore_sigpipe and tcp__unignore_sigpipe to ignore
	SIGPIPE signals that are sent if writing to a broken socket.  Disabling
	SIGPIPE allows write calls to return an error code instead of aborting
	the process.

	Fix an incorrect `will_not_call_mercury' annotation on
	`handle_shutdown'.

	Fix memory leaks caused by calling MR_NEW instead of MR_GC_NEW.


Index: tcp.m
===================================================================
RCS file: /home/mercury1/repository/mercury/extras/net/tcp.m,v
retrieving revision 1.1
diff -u -r1.1 tcp.m
--- tcp.m	16 Nov 2006 04:01:49 -0000	1.1
+++ tcp.m	28 Mar 2007 05:41:41 -0000
@@ -57,11 +57,23 @@
 
 :- instance input(tcp, io.state, tcp.error).
 :- instance reader(tcp, character, io.state, tcp.error).
+:- instance reader(tcp, string, io.state, tcp.error).
 
 :- instance output(tcp, io.state).
 :- instance writer(tcp, character, io.state).
 :- instance writer(tcp, string, io.state).
 
+	% Sending data to a broken pipe will cause the SIGPIPE signal to be
+	% sent to the process.  If SIGPIPE is ignored or blocked then send()
+	% fails with EPIPE.  This predicate causes SIGPIPE signals to be
+	% ignored.
+	%
+:- pred tcp__ignore_sigpipe(io::di, io::uo) is det.
+
+	% Restores the SIGPIPE signal handler before the last
+	% tcp__ignore_sigpipe() call.
+	%
+:- pred tcp__unignore_sigpipe(io::di, io::uo) is det.
 
 %-----------------------------------------------------------------------------%
 %-----------------------------------------------------------------------------%
@@ -70,6 +82,7 @@
 
 :- import_module bool.
 :- import_module char.
+:- import_module list.
 :- import_module require.
 :- import_module string.
 
@@ -125,7 +138,7 @@
 
 :- pragma foreign_proc(c,
 	handle_shutdown(TCP::in, IO0::di, IO::uo),
-	[will_not_call_mercury, thread_safe, promise_pure, tabled_for_io],
+	[may_call_mercury, thread_safe, promise_pure, tabled_for_io],
 "{
 
 	struct linger sockets_linger = { MR_TRUE, 2 };
@@ -175,10 +188,14 @@
 #define BACKLOG	16
 #define FULL	2
 
+#define TCP_BUFSIZE		1024
+
 typedef struct {
 	int	socket;
 	int	error;
-	MR_bool	eof;
+	size_t	buf_len;
+	off_t 	buf_pos;
+	char	buf[TCP_BUFSIZE];
 } ML_tcp;
 
 void ML_tcp_init(void);
@@ -234,11 +251,12 @@
 
 	ML_tcp_init();
 
-	sock = MR_NEW(ML_tcp);
+	sock = MR_GC_NEW(ML_tcp);
 
 	sock->socket = socket(PF_INET, SOCK_STREAM, 0);
 	sock->error = 0;
-	sock->eof = MR_FALSE;
+	sock->buf_len = 0;
+	sock->buf_pos = 0;
 
 	if (sock->socket == INVALID_SOCKET) {
 		sock->error = ML_error();
@@ -247,7 +265,7 @@
 		if (host == NULL) {
 			sock->error = ML_error();
 		} else {
-			addr = MR_NEW(struct sockaddr_in);
+			addr = MR_GC_NEW(struct sockaddr_in);
 			memset(addr,0,sizeof(struct sockaddr_in));
 			memcpy(&(addr->sin_addr), host->h_addr, host->h_length);
 			addr->sin_family = host->h_addrtype;
@@ -305,7 +323,7 @@
 		if (host == NULL) {
 			Errno = ML_error();
 		} else {
-			addr = MR_NEW(struct sockaddr_in);
+			addr = MR_GC_NEW(struct sockaddr_in);
 			memset(addr,0,sizeof(struct sockaddr_in));
 			memcpy(&(addr->sin_addr), host->h_addr, host->h_length);
 			addr->sin_family = host->h_addrtype;
@@ -342,12 +360,13 @@
 	struct sockaddr *addr;
 	int size = sizeof(struct sockaddr_in);
 
-	sock = MR_NEW(ML_tcp);
+	sock = MR_GC_NEW(ML_tcp);
 	addr = (struct sockaddr *) Addr;
 
 	sock->socket = accept(Socket, addr, &size);
 	sock->error = 0;
-	sock->eof = MR_FALSE;
+	sock->buf_len = 0;
+	sock->buf_pos = 0;
 
 	if (sock->socket == INVALID_SOCKET) {
 		sock->error = ML_error();
@@ -376,18 +395,27 @@
 :- instance input(tcp, io.state, tcp.error) where [].
 :- instance reader(tcp, character, io.state, tcp.error) where [
 	(get(T, Result, !IO) :-
-		tcp.read_char(T ^ handle, C, B, !IO),
-		( B = yes,
-			Result = ok(C)
-		; B = no,
-			is_eof(T ^ handle, IsEof, !IO),
-			( IsEof = yes ->
-				Result = eof
-			;
-				get_errno(T ^ handle, Errno, !IO),
-				Result = error(Errno)
-			)
-			
+		tcp__read_char(T ^ handle, Char, !IO),
+		( Char = -1 ->
+			Result = eof
+		; Char = -2 ->
+			get_errno(T ^ handle, Errno, !IO),
+			Result = error(Errno)
+		;
+			Result = ok(char.det_from_int(Char))
+		)
+	)
+].
+:- instance reader(tcp, string, io.state, tcp.error) where [
+	(get(T, Result, !IO) :-
+		tcp__read_line_as_string(T ^ handle, ErrCode, String, !IO),
+		( ErrCode = -1 ->
+			Result = eof
+		; ErrCode = -2 ->
+			get_errno(T ^ handle, Errno, !IO),
+			Result = error(Errno)
+		;
+			Result = ok(String)
 		)
 	)
 ].
@@ -425,31 +453,94 @@
 %-----------------------------------------------------------------------------%
 %-----------------------------------------------------------------------------%
 
-:- pred read_char(tcp_handle::in, char::out, bool::out, io::di, io::uo)
-	is det.
+:- pragma foreign_decl("C", "
+	/* Note: some Mercury code uses the -1 and -2 constants directly. */
+	#define TCP_EOF	    -1
+	#define TCP_ERROR   -2
+
+	int TCP_get_char(ML_tcp *sock);
+").
+
+:- pragma foreign_code("C", "
+	int TCP_get_char(ML_tcp *sock)
+	{
+		if (sock->buf_pos >= sock->buf_len) {
+			/* Refill buffer. */
+			int nchars = recv(sock->socket,
+				sock->buf, sizeof(sock->buf), 0);
+			if (nchars == SOCKET_ERROR) {
+				sock->error = ML_error();
+				return TCP_ERROR;
+			} else if (nchars == 0) {
+				return TCP_EOF;
+			} else {
+				sock->buf_pos = 1;
+				sock->buf_len = nchars;
+				return sock->buf[0];
+			}
+		} else {
+			return sock->buf[sock->buf_pos++];
+		}
+	}
+").
+
+:- pred tcp__read_char(tcp_handle::in, int::out, io::di, io::uo) is det.
 :- pragma foreign_proc(c,
-	read_char(Socket::in, Chr::out, Success::out, _IO0::di, _IO::uo),
+	tcp__read_char(Socket::in, Chr::out, _IO0::di, _IO::uo),
 	[will_not_call_mercury, thread_safe, promise_pure, tabled_for_io],
 "{
-
 	ML_tcp *sock = (ML_tcp *) Socket;
-	int nchars;
 
-	nchars = recv(sock->socket, &Chr, 1, 0);
-	if (nchars == SOCKET_ERROR) {
-		sock->error = ML_error();
-		Success = MR_FALSE;
-		Chr = 0;
-	} else if (nchars == 0) {
-		sock->eof = MR_TRUE;
-		Success = MR_FALSE;
-		Chr = 0;
-	} else {
-		Success = MR_TRUE;
-	}
+	Chr = TCP_get_char(sock);
 }").
 
+:- pred tcp__read_line_as_string(tcp_handle::in, int::out, string::out,
+	io::di, io::uo) is det.
+:- pragma foreign_proc("C",
+	tcp__read_line_as_string(TCP::in, ErrCode::out, Str::out,
+		IO0::di, IO::uo),
+	[will_not_call_mercury, promise_pure, thread_safe, tabled_for_io],
+"
+	ML_tcp *sock = (ML_tcp *) TCP;
+	size_t	BufLen = 1024;
+	off_t	BufPos = 0;
+	char	*Buf;
+	int	Chr;
+
+	Buf = MR_malloc(BufLen);
+
+	while (1) {
+		Chr = TCP_get_char(sock);
+		if (Chr < 0) {
+			ErrCode = Chr;
+			break;
+		}
 
+		if (BufPos >= BufLen) {
+			BufLen += 1024;
+			Buf = MR_realloc(Buf, BufLen);
+		}
+		Buf[BufPos++] = Chr;
+		if (Chr == '\\n') {
+			ErrCode = 0;
+			break;
+		}
+	}
+
+	if (ErrCode == 0) {
+		if (BufPos >= BufLen) {
+			BufLen += 1;
+			Buf = MR_realloc(Buf, BufLen);
+		}
+		Buf[BufPos] = '\\0';
+		MR_make_aligned_string_copy(Str, Buf);
+	} else {
+		Str = NULL;
+	}
+
+	MR_free(Buf);
+	IO = IO0;
+").
 
 :- pred tcp__write_char(tcp_handle::in, char::in, bool::out,
 	io::di, io::uo) is det.
@@ -506,17 +597,6 @@
 	MR_restore_transient_hp();
 }").
 
-
-:- pred tcp__is_eof(tcp_handle::in, bool::out, io::di, io::uo) is det.
-:- pragma foreign_proc(c,
-	tcp__is_eof(Socket::in, Success::out, _IO0::di, _IO::uo),
-	[will_not_call_mercury, thread_safe, promise_pure, tabled_for_io],
-"{
-	ML_tcp *sock = (ML_tcp *) Socket;
-
-	Success = sock->eof;
-}").
-
 %-----------------------------------------------------------------------------%
 %-----------------------------------------------------------------------------%
 
@@ -576,6 +656,35 @@
 
 throw_tcp_excption(S) :-
 	error(S).
+
+%-----------------------------------------------------------------------------%
+%-----------------------------------------------------------------------------%
+
+:- pragma foreign_decl("C", "
+    #include <signal.h>
+
+    extern void *TCP__prev_sigpipe_handler;
+").
+
+:- pragma foreign_code("C", "
+    void *TCP__prev_sigpipe_handler = SIG_DFL;
+").
+
+:- pragma foreign_proc("C",
+    tcp__ignore_sigpipe(IO0::di, IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe],
+"
+    TCP__prev_sigpipe_handler = signal(SIGPIPE, SIG_IGN);
+    IO = IO0;
+").
+
+:- pragma foreign_proc("C",
+    tcp__unignore_sigpipe(IO0::di, IO::uo),
+    [will_not_call_mercury, promise_pure, thread_safe],
+"
+    signal(SIGPIPE, TCP__prev_sigpipe_handler);
+    IO = IO0;
+").
 
 %-----------------------------------------------------------------------------%
 %-----------------------------------------------------------------------------%
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to:       mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions:          mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------



More information about the reviews mailing list