[m-rev.] for review: improvements to net/tcp.m
Peter Wang
wangp at students.csse.unimelb.edu.au
Wed Mar 28 15:50:36 AEST 2007
Improvements to the tcp module.
extras/net/tcp.m:
Make tcp an instance of the `reader' typeclass with unit `string' and
implement line reading efficiently in C.
Do input buffering on sockets so it's not necessary to call recv() for
each character read.
Delete `is_eof' predicate. This is no longer needed due to the change
in the way characters are read.
Add predicates tcp__ignore_sigpipe and tcp__unignore_sigpipe to ignore
SIGPIPE signals that are sent if writing to a broken socket. Disabling
SIGPIPE allows write calls to return an error code instead of aborting
the process.
Fix an incorrect `will_not_call_mercury' annotation on
`handle_shutdown'.
Fix memory leaks caused by calling MR_NEW instead of MR_GC_NEW.
Index: tcp.m
===================================================================
RCS file: /home/mercury1/repository/mercury/extras/net/tcp.m,v
retrieving revision 1.1
diff -u -r1.1 tcp.m
--- tcp.m 16 Nov 2006 04:01:49 -0000 1.1
+++ tcp.m 28 Mar 2007 05:41:41 -0000
@@ -57,11 +57,23 @@
:- instance input(tcp, io.state, tcp.error).
:- instance reader(tcp, character, io.state, tcp.error).
+:- instance reader(tcp, string, io.state, tcp.error).
:- instance output(tcp, io.state).
:- instance writer(tcp, character, io.state).
:- instance writer(tcp, string, io.state).
+ % Sending data to a broken pipe will cause the SIGPIPE signal to be
+ % sent to the process. If SIGPIPE is ignored or blocked then send()
+ % fails with EPIPE. This predicate causes SIGPIPE signals to be
+ % ignored.
+ %
+:- pred tcp__ignore_sigpipe(io::di, io::uo) is det.
+
+ % Restores the SIGPIPE signal handler before the last
+ % tcp__ignore_sigpipe() call.
+ %
+:- pred tcp__unignore_sigpipe(io::di, io::uo) is det.
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
@@ -70,6 +82,7 @@
:- import_module bool.
:- import_module char.
+:- import_module list.
:- import_module require.
:- import_module string.
@@ -125,7 +138,7 @@
:- pragma foreign_proc(c,
handle_shutdown(TCP::in, IO0::di, IO::uo),
- [will_not_call_mercury, thread_safe, promise_pure, tabled_for_io],
+ [may_call_mercury, thread_safe, promise_pure, tabled_for_io],
"{
struct linger sockets_linger = { MR_TRUE, 2 };
@@ -175,10 +188,14 @@
#define BACKLOG 16
#define FULL 2
+#define TCP_BUFSIZE 1024
+
typedef struct {
int socket;
int error;
- MR_bool eof;
+ size_t buf_len;
+ off_t buf_pos;
+ char buf[TCP_BUFSIZE];
} ML_tcp;
void ML_tcp_init(void);
@@ -234,11 +251,12 @@
ML_tcp_init();
- sock = MR_NEW(ML_tcp);
+ sock = MR_GC_NEW(ML_tcp);
sock->socket = socket(PF_INET, SOCK_STREAM, 0);
sock->error = 0;
- sock->eof = MR_FALSE;
+ sock->buf_len = 0;
+ sock->buf_pos = 0;
if (sock->socket == INVALID_SOCKET) {
sock->error = ML_error();
@@ -247,7 +265,7 @@
if (host == NULL) {
sock->error = ML_error();
} else {
- addr = MR_NEW(struct sockaddr_in);
+ addr = MR_GC_NEW(struct sockaddr_in);
memset(addr,0,sizeof(struct sockaddr_in));
memcpy(&(addr->sin_addr), host->h_addr, host->h_length);
addr->sin_family = host->h_addrtype;
@@ -305,7 +323,7 @@
if (host == NULL) {
Errno = ML_error();
} else {
- addr = MR_NEW(struct sockaddr_in);
+ addr = MR_GC_NEW(struct sockaddr_in);
memset(addr,0,sizeof(struct sockaddr_in));
memcpy(&(addr->sin_addr), host->h_addr, host->h_length);
addr->sin_family = host->h_addrtype;
@@ -342,12 +360,13 @@
struct sockaddr *addr;
int size = sizeof(struct sockaddr_in);
- sock = MR_NEW(ML_tcp);
+ sock = MR_GC_NEW(ML_tcp);
addr = (struct sockaddr *) Addr;
sock->socket = accept(Socket, addr, &size);
sock->error = 0;
- sock->eof = MR_FALSE;
+ sock->buf_len = 0;
+ sock->buf_pos = 0;
if (sock->socket == INVALID_SOCKET) {
sock->error = ML_error();
@@ -376,18 +395,27 @@
:- instance input(tcp, io.state, tcp.error) where [].
:- instance reader(tcp, character, io.state, tcp.error) where [
(get(T, Result, !IO) :-
- tcp.read_char(T ^ handle, C, B, !IO),
- ( B = yes,
- Result = ok(C)
- ; B = no,
- is_eof(T ^ handle, IsEof, !IO),
- ( IsEof = yes ->
- Result = eof
- ;
- get_errno(T ^ handle, Errno, !IO),
- Result = error(Errno)
- )
-
+ tcp__read_char(T ^ handle, Char, !IO),
+ ( Char = -1 ->
+ Result = eof
+ ; Char = -2 ->
+ get_errno(T ^ handle, Errno, !IO),
+ Result = error(Errno)
+ ;
+ Result = ok(char.det_from_int(Char))
+ )
+ )
+].
+:- instance reader(tcp, string, io.state, tcp.error) where [
+ (get(T, Result, !IO) :-
+ tcp__read_line_as_string(T ^ handle, ErrCode, String, !IO),
+ ( ErrCode = -1 ->
+ Result = eof
+ ; ErrCode = -2 ->
+ get_errno(T ^ handle, Errno, !IO),
+ Result = error(Errno)
+ ;
+ Result = ok(String)
)
)
].
@@ -425,31 +453,94 @@
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
-:- pred read_char(tcp_handle::in, char::out, bool::out, io::di, io::uo)
- is det.
+:- pragma foreign_decl("C", "
+ /* Note: some Mercury code uses the -1 and -2 constants directly. */
+ #define TCP_EOF -1
+ #define TCP_ERROR -2
+
+ int TCP_get_char(ML_tcp *sock);
+").
+
+:- pragma foreign_code("C", "
+ int TCP_get_char(ML_tcp *sock)
+ {
+ if (sock->buf_pos >= sock->buf_len) {
+ /* Refill buffer. */
+ int nchars = recv(sock->socket,
+ sock->buf, sizeof(sock->buf), 0);
+ if (nchars == SOCKET_ERROR) {
+ sock->error = ML_error();
+ return TCP_ERROR;
+ } else if (nchars == 0) {
+ return TCP_EOF;
+ } else {
+ sock->buf_pos = 1;
+ sock->buf_len = nchars;
+ return sock->buf[0];
+ }
+ } else {
+ return sock->buf[sock->buf_pos++];
+ }
+ }
+").
+
+:- pred tcp__read_char(tcp_handle::in, int::out, io::di, io::uo) is det.
:- pragma foreign_proc(c,
- read_char(Socket::in, Chr::out, Success::out, _IO0::di, _IO::uo),
+ tcp__read_char(Socket::in, Chr::out, _IO0::di, _IO::uo),
[will_not_call_mercury, thread_safe, promise_pure, tabled_for_io],
"{
-
ML_tcp *sock = (ML_tcp *) Socket;
- int nchars;
- nchars = recv(sock->socket, &Chr, 1, 0);
- if (nchars == SOCKET_ERROR) {
- sock->error = ML_error();
- Success = MR_FALSE;
- Chr = 0;
- } else if (nchars == 0) {
- sock->eof = MR_TRUE;
- Success = MR_FALSE;
- Chr = 0;
- } else {
- Success = MR_TRUE;
- }
+ Chr = TCP_get_char(sock);
}").
+:- pred tcp__read_line_as_string(tcp_handle::in, int::out, string::out,
+ io::di, io::uo) is det.
+:- pragma foreign_proc("C",
+ tcp__read_line_as_string(TCP::in, ErrCode::out, Str::out,
+ IO0::di, IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe, tabled_for_io],
+"
+ ML_tcp *sock = (ML_tcp *) TCP;
+ size_t BufLen = 1024;
+ off_t BufPos = 0;
+ char *Buf;
+ int Chr;
+
+ Buf = MR_malloc(BufLen);
+
+ while (1) {
+ Chr = TCP_get_char(sock);
+ if (Chr < 0) {
+ ErrCode = Chr;
+ break;
+ }
+ if (BufPos >= BufLen) {
+ BufLen += 1024;
+ Buf = MR_realloc(Buf, BufLen);
+ }
+ Buf[BufPos++] = Chr;
+ if (Chr == '\\n') {
+ ErrCode = 0;
+ break;
+ }
+ }
+
+ if (ErrCode == 0) {
+ if (BufPos >= BufLen) {
+ BufLen += 1;
+ Buf = MR_realloc(Buf, BufLen);
+ }
+ Buf[BufPos] = '\\0';
+ MR_make_aligned_string_copy(Str, Buf);
+ } else {
+ Str = NULL;
+ }
+
+ MR_free(Buf);
+ IO = IO0;
+").
:- pred tcp__write_char(tcp_handle::in, char::in, bool::out,
io::di, io::uo) is det.
@@ -506,17 +597,6 @@
MR_restore_transient_hp();
}").
-
-:- pred tcp__is_eof(tcp_handle::in, bool::out, io::di, io::uo) is det.
-:- pragma foreign_proc(c,
- tcp__is_eof(Socket::in, Success::out, _IO0::di, _IO::uo),
- [will_not_call_mercury, thread_safe, promise_pure, tabled_for_io],
-"{
- ML_tcp *sock = (ML_tcp *) Socket;
-
- Success = sock->eof;
-}").
-
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
@@ -576,6 +656,35 @@
throw_tcp_excption(S) :-
error(S).
+
+%-----------------------------------------------------------------------------%
+%-----------------------------------------------------------------------------%
+
+:- pragma foreign_decl("C", "
+ #include <signal.h>
+
+ extern void *TCP__prev_sigpipe_handler;
+").
+
+:- pragma foreign_code("C", "
+ void *TCP__prev_sigpipe_handler = SIG_DFL;
+").
+
+:- pragma foreign_proc("C",
+ tcp__ignore_sigpipe(IO0::di, IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe],
+"
+ TCP__prev_sigpipe_handler = signal(SIGPIPE, SIG_IGN);
+ IO = IO0;
+").
+
+:- pragma foreign_proc("C",
+ tcp__unignore_sigpipe(IO0::di, IO::uo),
+ [will_not_call_mercury, promise_pure, thread_safe],
+"
+ signal(SIGPIPE, TCP__prev_sigpipe_handler);
+ IO = IO0;
+").
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to: mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions: mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------
More information about the reviews
mailing list