[m-rev.] diff: 2/2 Add a test case for barriers and fix a bug.

Paul Bone paul at bone.id.au
Fri May 16 20:26:56 AEST 2014


Branches: master

Add a test case for barriers and fix a bug.

This test case is based off Sebastian Godelet's work, which found a bug that
can occur when release/3 is called on a barrier and then other calls to
wait/3 are made.  The final call to wait/3 tries to execute the normal
'unblock the barrier' code and deadlocks there because the barrier is
already unlocked, specifically a value is already present in the "Go" mvar.
Thanks Sebastian for your test case.

library/thread.barrier.m:
    Handle release/3 properly: release/3 now sets the number of remaining
    threads to reach the barrier to zero, and wait/3 will determine why this
    iz zero (it's normally at least one), and if it is because release was
    called then no error is raised.  The other reasion why the number of
    remaining threads may be zero is because wait has been called too many
    times, in this case wait/3 will throw an exception.

tests/hard_coded/Mmakefile:
tests/hard_coded/thread_barrier_test.exp:
tests/hard_coded/thread_barrier_test.m:
    Add new test case.

tests/hard_coded/thread_test_utils.m
    This module contains code that may be useful for other concurrency
    tests.
---
 library/thread.barrier.m                 |  59 +++++++---
 tests/hard_coded/Mmakefile               |   1 +
 tests/hard_coded/thread_barrier_test.exp |  65 +++++++++++
 tests/hard_coded/thread_barrier_test.m   | 158 +++++++++++++++++++++++++++
 tests/hard_coded/thread_test_utils.m     | 178 +++++++++++++++++++++++++++++++
 5 files changed, 449 insertions(+), 12 deletions(-)
 create mode 100644 tests/hard_coded/thread_barrier_test.exp
 create mode 100644 tests/hard_coded/thread_barrier_test.m
 create mode 100644 tests/hard_coded/thread_test_utils.m

diff --git a/library/thread.barrier.m b/library/thread.barrier.m
index 508fd44..a132d8f 100644
--- a/library/thread.barrier.m
+++ b/library/thread.barrier.m
@@ -48,8 +48,10 @@
 
     % release_barrier(Barrier, !IO)
     %
-    % Release all the threads waiting at the barrier regardless
-    % of whether or not N threads have arrived at the barrier.
+    % Release all the threads waiting at the barrier regardless of whether
+    % or not N threads have arrived at the barrier.  This can be called by
+    % any thread, it does not have to be a thread that would normally call
+    % wait/3.
     %
 :- pred release(barrier::in, io::di, io::uo) is det.
 
@@ -79,9 +81,17 @@
                 b_waiting_for   :: mvar(int),
 
                 % Can we go yet?
-                b_go            :: mvar(unit)
+                b_go            :: mvar(why_can_we_go)
             ).
 
+    % We use this type to say why execution may proceed after reaching tha
+    % barrier.  If it is because the counter reached zero or because
+    % release/3 was called.
+    %
+:- type why_can_we_go
+    --->    can_go_normal
+    ;       can_go_release_called.
+
 %------------------------------------------------------------------------------%
 
 init(N, barrier(WaitingOn, Go), !IO) :-
@@ -97,27 +107,52 @@ wait(barrier(WaitingOn, Go), !IO) :-
 
     ( StillWaitingFor > 0 ->
         % There are still outstanding threads.
-        
+
         % Unlock the counter
         put(WaitingOn, StillWaitingFor, !IO),
 
         % Wait on the barrier then unlock another thread.
-        take(Go, _, !IO),
-        put(Go, unit, !IO)
+        take(Go, WhyGo, !IO),
+        put(Go, WhyGo, !IO)
     ; StillWaitingFor = 0 ->
         % The last thread at the barrier, so signal that we can go.
-        put(Go, unit, !IO),
+        put(Go, can_go_normal, !IO),
         put(WaitingOn, StillWaitingFor, !IO)
     ;
-        unexpected($file, $pred,
-            "Too many threads called barrier/3 on this barrier.")
+        put(WaitingOn, 0, !IO),
+
+        % Go is always updated before WaitingOn, so if this branch is being
+        % executed (either because release was called or because the barrier
+        % was called excessively) then we know that this call to take will
+        % not block, in either of those cases there will always be a value
+        % in Go.
+        take(Go, WhyGo, !IO),
+        put(Go, WhyGo, !IO),
+        (
+            WhyGo = can_go_normal,
+            unexpected($file, $pred,
+                "Too many threads called barrier/3 on this barrier.")
+        ;
+            WhyGo = can_go_release_called
+        )
     ).
 
 release(barrier(WaitingOn, Go), !IO) :-
     % Allow all the threads at the barrier to go.
-    put(Go, unit, !IO),
-    take(WaitingOn, N, !IO),
-    put(WaitingOn, N - 1, !IO).
+    put(Go, can_go_release_called, !IO),
+
+    % We must set WaitingOn to zero so that the StillWaitingFor = 0 branch
+    % above is not executed more than once, if it is it will block when it
+    % tries to write a value to Go as Go already has a value.  Instead we
+    % set it to zero, which means that StillWaitingOn will be -1, we use a
+    % special value of can_go_release_called for Go so that this branch does
+    % not raise an error.
+    %
+    % This algorithm has the nice benefit that if release/3 is not
+    % considered an alternative to calling barrier, so that a barrier can be
+    % canceled by a thread that would not normally call wait/3 itself.
+    take(WaitingOn, _N, !IO),
+    put(WaitingOn, 0, !IO).
 
 %------------------------------------------------------------------------------%
 %------------------------------------------------------------------------------%
diff --git a/tests/hard_coded/Mmakefile b/tests/hard_coded/Mmakefile
index 76f6996..ec70eac 100644
--- a/tests/hard_coded/Mmakefile
+++ b/tests/hard_coded/Mmakefile
@@ -314,6 +314,7 @@ ORDINARY_PROGS=	\
 	test_semaphore \
 	test_tree_bitset \
 	test_yield \
+	thread_barrier_test \
 	tim_qual1 \
 	time_test \
 	trace_goal_1 \
diff --git a/tests/hard_coded/thread_barrier_test.exp b/tests/hard_coded/thread_barrier_test.exp
new file mode 100644
index 0000000..c729337
--- /dev/null
+++ b/tests/hard_coded/thread_barrier_test.exp
@@ -0,0 +1,65 @@
+Test spawn and wait
+Messages from thread 0:
+	-- testing spawning with 5 threads
+	spawning thread #1
+	spawning thread #2
+	spawning thread #3
+	spawning thread #4
+	spawning thread #5
+	-- test finished
+Messages from thread 1:
+	thread 1 starting
+	fib(10) = 89
+	thread 1 exiting
+Messages from thread 2:
+	thread 2 starting
+	fib(15) = 987
+	thread 2 exiting
+Messages from thread 3:
+	thread 3 starting
+	fib(20) = 10946
+	thread 3 exiting
+Messages from thread 4:
+	thread 4 starting
+	fib(25) = 121393
+	thread 4 exiting
+Messages from thread 5:
+	thread 5 starting
+	fib(30) = 1346269
+	thread 5 exiting
+
+Test release
+Messages from thread 0:
+	-- testing barrier release at 3 of 5
+	spawning thread #1
+	spawning thread #2
+	spawning thread #3
+	spawning thread #4
+	spawning thread #5
+	waiting:	before release
+	done waiting, test finished:	after release
+Messages from thread 1:
+	thread starting
+	fib(10) = 89
+	waiting:	before release
+	done waiting:	after release
+Messages from thread 2:
+	thread starting
+	fib(15) = 987
+	waiting:	before release
+	done waiting:	after release
+Messages from thread 3:
+	thread starting
+	fib(20) = 10946
+	releasing barrier
+	released.
+Messages from thread 4:
+	thread starting
+	fib(25) = 121393
+	waiting:	after release
+	done waiting:	after release
+Messages from thread 5:
+	thread starting
+	fib(30) = 1346269
+	waiting:	after release
+	done waiting:	after release
diff --git a/tests/hard_coded/thread_barrier_test.m b/tests/hard_coded/thread_barrier_test.m
new file mode 100644
index 0000000..1390230
--- /dev/null
+++ b/tests/hard_coded/thread_barrier_test.m
@@ -0,0 +1,158 @@
+%------------------------------------------------------------------------------%
+% File: thread_barrier_test.m
+% Main author: Sebastian Godelet <sebastian.godelet+github at gmail.com>
+% Created on: Tue Apr  8 15:54:57 CEST 2014
+% vim: ft=mercury ff=unix ts=4 sw=4 et
+%
+%------------------------------------------------------------------------------%
+
+:- module thread_barrier_test.
+
+:- interface.
+
+:- import_module io.
+
+:- pred main(io::di, io::uo) is cc_multi.
+
+%------------------------------------------------------------------------------%
+%------------------------------------------------------------------------------%
+
+:- implementation.
+
+:- import_module int.
+:- import_module integer.
+:- import_module list.
+:- import_module require.
+:- import_module string.
+:- import_module thread.
+:- import_module thread.barrier.
+:- import_module thread.mvar.
+
+:- import_module thread_test_utils.
+
+%------------------------------------------------------------------------------%
+
+:- func fib(integer) = integer.
+
+fib(N) = Fib :-
+    ( N < integer(2) -> Fib = integer(1)
+    ; Fib = fib(N-integer(1)) + fib(N-integer(2))
+    ).
+
+:- pred test_spawn_and_wait(int::in, io::di, io::uo) is cc_multi.
+
+test_spawn_and_wait(ThreadCount, !IO) :-
+    init_all_thread_output(AllThreadOutput, !IO),
+    init_thread_output(AllThreadOutput, 0, Output, !IO),
+    t_write_string(Output, format("-- testing spawning with %d threads", 
+        [i(ThreadCount)]), !IO),
+    barrier.init(ThreadCount + 1, Barrier, !IO),
+    list.foldl((pred(Thread::in, !.IO::di, !:IO::uo) is cc_multi :-
+        t_write_string(Output, format("spawning thread #%d", [i(Thread)]),
+            !IO),
+        spawn(test_spawn_and_wait_thread(Thread, AllThreadOutput, Barrier),
+            !IO)
+    ), 1 `..` ThreadCount, !IO),
+    barrier.wait(Barrier, !IO),
+    t_write_string(Output, "-- test finished", !IO),
+    close_thread_output(Output, !IO),
+    write_all_thread_output(AllThreadOutput, !IO).
+
+:- pragma no_determinism_warning(test_spawn_and_wait_thread/5).
+:- pred test_spawn_and_wait_thread(int::in, all_threads_output::in,
+    barrier::in, io::di, io::uo) is cc_multi.
+
+test_spawn_and_wait_thread(Thread, AllThreadOutput, Barrier, !IO) :-
+    init_thread_output(AllThreadOutput, Thread, Output, !IO),
+    t_write_string(Output, format("thread %d starting", [i(Thread)]), !IO),
+    N = 5 + Thread * 5,
+    t_write_string(Output, format("fib(%d) = %s",
+        [i(N), s(integer.to_string(fib(integer(N))))]), !IO),
+    barrier.wait(Barrier, !IO),
+    t_write_string(Output, format("thread %d exiting", [i(Thread)]), !IO),
+    close_thread_output(Output, !IO).
+
+    % This state allows us to determine if certain actions have already
+    % taken place.  This lets us prove that some things happen before/after
+    % release is called on the barrier.
+    %
+:- type state
+    --->    state_before_release
+    ;       state_after_release.
+
+:- pred test_release(int::in, int::in, io::di, io::uo) is cc_multi.
+
+test_release(AbortAt, ThreadCount, !IO) :-
+    init_all_thread_output(AllThreadOutput, !IO),
+    init_thread_output(AllThreadOutput, 0, Output, !IO),
+    t_write_string(Output, format("-- testing barrier release at %d of %d",
+        [i(AbortAt), i(ThreadCount)]), !IO),
+    barrier.init(ThreadCount + 1, Barrier, !IO),
+    mvar.init(StateMvar, !IO),
+    mvar.put(StateMvar, state_before_release, !IO),
+    list.foldl((pred(Thread::in, !.IO::di, !:IO::uo) is cc_multi :-
+        t_write_string(Output, format("spawning thread #%d", [i(Thread)]),
+            !IO),
+        spawn(
+            release_thread(AllThreadOutput, Thread, AbortAt, Barrier,
+                StateMvar),
+            !IO)
+    ), 1 `..` ThreadCount, !IO),
+    log_with_state(Output, StateMvar, "waiting", !IO),
+    barrier.wait(Barrier, !IO),
+    log_with_state(Output, StateMvar, "done waiting, test finished", !IO),
+    close_thread_output(Output, !IO),
+    write_all_thread_output(AllThreadOutput, !IO).
+
+:- pragma no_determinism_warning(release_thread/7).
+:- pred release_thread(all_threads_output::in, int::in, int::in, barrier::in,
+    mvar(thread_barrier_test.state)::in, io::di, io::uo) is cc_multi.
+
+release_thread(AllOutput, Thread, AbortAt, Barrier, StateMvar, !IO) :-
+    init_thread_output(AllOutput, Thread, Output, !IO),
+    t_write_string(Output, "thread starting", !IO),
+    N = 5 + Thread * 5,
+    t_write_string(Output, format("fib(%d) = %s",
+        [i(N), s(integer.to_string(fib(integer(N))))]), !IO),
+    ( Thread = AbortAt ->
+        t_write_string(Output, "releasing barrier", !IO),
+        mvar.take(StateMvar, _, !IO),
+        barrier.release(Barrier, !IO),
+        mvar.put(StateMvar, state_after_release, !IO),
+        t_write_string(Output, "released.", !IO)
+    ;
+        log_with_state(Output, StateMvar, "waiting", !IO),
+        barrier.wait(Barrier, !IO),
+        log_with_state(Output, StateMvar, "done waiting", !IO)
+    ),
+    close_thread_output(Output, !IO).
+
+:- pred log_with_state(thread_output::in,
+    mvar(thread_barrier_test.state)::in, string::in, io::di, io::uo) is det.
+
+log_with_state(Output, StateMvar, String, !IO) :-
+    mvar.take(StateMvar, State, !IO),
+    (
+        State = state_before_release,
+        StateStr = "before release"
+    ;
+        State = state_after_release,
+        StateStr = "after release"
+    ),
+    Message = format("%s:\t%s", [s(String), s(StateStr)]),
+    t_write_string(Output, Message, !IO),
+    mvar.put(StateMvar, State, !IO).
+
+main(!IO) :-
+    ( thread.can_spawn ->
+        io.write_string("Test spawn and wait\n", !IO),
+        test_spawn_and_wait(5, !IO),
+        io.write_string("\nTest release\n", !IO),
+        test_release(3, 5, !IO)
+    ;
+        unexpected($file, $pred, $grade ++ " does not support thread spawning")
+    ).
+
+%------------------------------------------------------------------------------%
+% -*- Mode: Mercury; column: 80; indent-tabs-mode: nil; tabs-width: 4 -*-
+%------------------------------------------------------------------------------%
diff --git a/tests/hard_coded/thread_test_utils.m b/tests/hard_coded/thread_test_utils.m
new file mode 100644
index 0000000..2f80e1f
--- /dev/null
+++ b/tests/hard_coded/thread_test_utils.m
@@ -0,0 +1,178 @@
+%-----------------------------------------------------------------------------%
+% vim: ft=mercury ts=4 sw=4 et
+%-----------------------------------------------------------------------------%
+% Copyright (C) 2014 Mission Critical IT.
+% Copyright (C) 2014 The Mercury team.
+% This file may only be copied under the terms of the GNU Library General
+% Public License - see the file COPYING.LIB in the Mercury distribution.
+%-----------------------------------------------------------------------------%
+%
+% File: thread_test_utils.m
+% Author: Paul Bone
+%
+% These utilities make it easier to test concurrent code.  In particular a
+% concurrent program's IO actions may occur in a nondeterminsic order.  This
+% module provides alternative IO operations that provide some order so that
+% program output matches test output (when the test passes).
+%
+%-----------------------------------------------------------------------------%
+
+:- module thread_test_utils.
+
+:- interface.
+
+:- import_module io.
+:- import_module string.
+
+%-----------------------------------------------------------------------------%
+
+    % This type represents all the output of all the threads in the program.
+    %
+:- type all_threads_output.
+
+    % This type represents the output of an individual thread.
+    %
+:- type thread_output.
+
+:- pred init_all_thread_output(all_threads_output::out, io::di, io::uo)
+    is det.
+
+    % new_thread_output(N) = Output
+    %
+    % Create a new thread output object for thread number N.
+    %
+:- pred init_thread_output(all_threads_output::in, int::in,
+    thread_output::out, io::di, io::uo) is det.
+
+    % Save some output into the buffer.
+    %
+:- pred t_write_string(thread_output::in, string::in, io::di, io::uo) is det.
+
+    % Close this threads output stream.  All streams must be closed as
+    % write_all_thread_output/3 will use this to make sure it's recieved all
+    % the messages it should have.
+    %
+:- pred close_thread_output(thread_output::in, io::di, io::uo) is det.
+
+    % Write out this set of buffers.
+    %
+:- pred write_all_thread_output(all_threads_output::in, io::di, io::uo) is det.
+
+%-----------------------------------------------------------------------------%
+%-----------------------------------------------------------------------------%
+
+:- implementation.
+
+:- import_module int.
+:- import_module list.
+:- import_module map.
+:- import_module maybe.
+:- import_module thread.
+:- import_module thread.channel.
+:- import_module set.
+
+%-----------------------------------------------------------------------------%
+
+:- type all_threads_output
+    --->    all_threads_output(
+                channel(message)
+            ).
+
+:- type thread_output
+    --->    thread_output(
+                to_thread           :: int,
+                to_chan             :: channel(message)
+            ).
+
+:- type message 
+    --->    message_output(
+                om_thread       :: int,
+                om_string       :: string
+            )
+    ;       message_open(
+                mopen_thread    :: int
+            )
+    ;       message_close(
+                mc_thread       :: int
+            ).
+
+%-----------------------------------------------------------------------------%
+
+init_all_thread_output(all_threads_output(Chan), !IO) :-
+    channel.init(Chan, !IO).
+
+init_thread_output(AllOutput, Thread, Output, !IO) :-
+    AllOutput = all_threads_output(Chan),
+    Output = thread_output(Thread, Chan),
+
+    % Pust a message that will be used later to show that this Output has
+    % been opened.
+    put(Chan, message_open(Thread), !IO). 
+
+t_write_string(Output, String, !IO) :-
+    put(Output ^ to_chan, message_output(Output ^ to_thread, String), !IO).
+
+write_all_thread_output(AllOutput, !IO) :-
+    get_all_messages(AllOutput, set.init, map.init, Messages, !IO),
+    foldl(write_out_thread_messages, Messages, !IO).
+
+    % Messages indexed by thread.  Each list is stored in reverse order.
+    %
+:- type messages
+    == map(int, list(string)).
+
+:- pred get_all_messages(all_threads_output::in, set(int)::in,
+    messages::in, messages::out, io::di, io::uo) is det.
+
+get_all_messages(AllOutput, OpenThreads, !Messages, !IO) :-
+    AllOutput = all_threads_output(Chan),
+    ( empty(OpenThreads) ->
+        % If this might be the end of the messages then we only try and
+        % take, so we know if we should exit.
+        try_take(Chan, MaybeMessage, !IO)
+    ;
+        % OTOH, if there may be threads that havn't finished sending our
+        % messages, then we use a blocking take to ensure that we don't miss
+        % their messages.
+        take(Chan, Message0, !IO),
+        MaybeMessage = yes(Message0)
+    ),
+    (
+        MaybeMessage = yes(Message),
+        (
+            Message = message_output(Thread, String),
+            ( map.search(!.Messages, Thread, TMessages0) ->
+                TMessages = [String | TMessages0]
+            ;
+                TMessages = [String]
+            ),
+            map.set(Thread, TMessages, !Messages),
+            get_all_messages(AllOutput, OpenThreads, !Messages, !IO)
+        ;
+            Message = message_open(Thread),
+            get_all_messages(AllOutput, insert(OpenThreads, Thread),
+                !Messages, !IO)
+        ;
+            Message = message_close(Thread),
+            get_all_messages(AllOutput, delete(OpenThreads, Thread),
+                !Messages, !IO)
+        )
+    ;
+        MaybeMessage = no
+    ).
+
+:- pred write_out_thread_messages(int::in, list(string)::in, io::di, io::uo)
+    is det.
+
+write_out_thread_messages(Thread, Messages, !IO) :-
+    io.format("Messages from thread %d:\n", [i(Thread)], !IO),
+    foldr(write_out_message, Messages, !IO).
+
+close_thread_output(Output, !IO) :-
+    put(Output ^ to_chan, message_close(Output ^ to_thread), !IO).
+
+:- pred write_out_message(string::in, io::di, io::uo) is det.
+
+write_out_message(String, !IO) :-
+    io.format("\t%s\n", [s(String)], !IO).
+
-- 
2.0.0.rc0




More information about the reviews mailing list