[m-rev.] for review: stm runtime and library
Leon Ilario MIKA
lmika at students.csse.unimelb.edu.au
Wed Sep 26 15:38:32 AEST 2007
Estimated hours taken: 8
Modifcation of Software Transactional Memory runtime and library to handle
blocking and nested STM transactions.
runtime/mercury_stm.c
runtime/mercury_stm.h
Defined condition variables for the high-level C grade. This
is used for the scheduling of threads with blocking transactions.
Such grades used POSIX condition variables for this purpose.
Added wait queues to transaction variables. These data structures
maintain a list of all threads waiting for the value of the
corresponding transaction variable to change.
Added a function to block a transaction until the value of a
transaction variable has changed.
Added a function which permits the merging of a transaction log
with it's parent.
runtime/stm_builtin.m
Added utility predicates which will assist the checking of atomic
goals durring the semantic analysis and error checking phase of the
compiler. These are automatically inserted durring the construction
of the hlds and automatically removed durring the expansion of
stm_goals.
Completed the implementation of the predicate "stm_block".
Index: runtime/mercury_stm.c
===================================================================
RCS file: /home/mercury/mercury1/repository/mercury/runtime/mercury_stm.c,v
retrieving revision 1.5
diff -u -u -r1.5 mercury_stm.c
--- runtime/mercury_stm.c 21 Sep 2007 06:13:12 -0000 1.5
+++ runtime/mercury_stm.c 26 Sep 2007 05:22:51 -0000
@@ -20,8 +20,8 @@
void
MR_STM_record_transaction(MR_STM_TransLog *tlog, MR_STM_Var *var,
- MR_Word old_value, MR_Word new_value)
-{
+ MR_Word old_value, MR_Word new_value) {
+
MR_STM_TransRecord *new_record;
new_record = MR_GC_NEW(MR_STM_TransRecord);
@@ -33,32 +33,70 @@
}
void
-MR_STM_attach_waiter(MR_STM_Var *var, MR_ThreadId tid)
-{
- MR_fatal_error("NYI MR_STM_attach_waiter");
+MR_STM_attach_waiter(MR_STM_Var *var, MR_ThreadId tid,
+ MR_STM_ConditionVar *cvar) {
+
+ MR_STM_Waiter *new_waiter;
+
+ new_waiter = MR_GC_NEW(MR_STM_Waiter);
+ new_waiter->MR_STM_cond_var = cvar;
+
+ if (var->MR_STM_var_waiters == NULL) {
+ var->MR_STM_var_waiters = new_waiter;
+ new_waiter->MR_STM_waiter_prev = NULL;
+ new_waiter->MR_STM_waiter_next = NULL;
+ } else {
+ new_waiter->MR_STM_waiter_prev = NULL;
+ new_waiter->MR_STM_waiter_next = var->MR_STM_var_waiters;
+ var->MR_STM_var_waiters->MR_STM_waiter_prev = new_waiter;
+ var->MR_STM_var_waiters = new_waiter;
+ }
}
void
-MR_STM_detach_waiter(MR_STM_Var *var, MR_ThreadId tid)
-{
- MR_fatal_error("NYI MR_STM_detach_waiter");
+MR_STM_detach_waiter(MR_STM_Var *var, MR_STM_ConditionVar *cvar) {
+
+ MR_STM_Waiter *curr_waiter;
+
+ MR_assert(var != NULL);
+ MR_assert(var->MR_STM_var_waiters != NULL);
+
+ curr_waiter = var->MR_STM_var_waiters;
+ while (curr_waiter != NULL) {
+ if (curr_waiter->MR_STM_cond_var == cvar) {
+ if (curr_waiter == var->MR_STM_var_waiters) {
+ var->MR_STM_var_waiters =
+ var->MR_STM_var_waiters->MR_STM_waiter_next;
+ }
+ if (curr_waiter->MR_STM_waiter_prev != NULL) {
+ curr_waiter->MR_STM_waiter_prev->MR_STM_waiter_next =
+ curr_waiter->MR_STM_waiter_next;
+ }
+ if (curr_waiter->MR_STM_waiter_next != NULL) {
+ curr_waiter->MR_STM_waiter_next->MR_STM_waiter_prev =
+ curr_waiter->MR_STM_waiter_prev;
+ }
+ curr_waiter = NULL;
+ return;
+ }
+ }
+
+ MR_fatal_error("MR_STM_detach_waiter: Thread ID not in wait queue");
}
MR_Integer
-MR_STM_validate(MR_STM_TransLog *tlog)
-{
+MR_STM_validate(MR_STM_TransLog *tlog) {
+
MR_STM_TransRecord *current;
MR_assert(tlog != NULL);
while (tlog != NULL) {
-
current = tlog->MR_STM_tl_records;
while (current != NULL) {
if (current->MR_STM_tr_var->MR_STM_var_value !=
- current->MR_STM_tr_old_value)
- {
+ current->MR_STM_tr_old_value) {
return MR_STM_TRANSACTION_INVALID;
}
current = current->MR_STM_tr_next;
@@ -71,6 +109,19 @@
}
void
+MR_STM_signal_vars(MR_STM_Var *tvar) {
+
+ MR_STM_Waiter *wait_queue;
+
+ wait_queue = tvar->MR_STM_var_waiters;
+
+ while (wait_queue != NULL) {
+ MR_STM_condvar_signal(wait_queue->MR_STM_cond_var);
+ wait_queue = wait_queue->MR_STM_waiter_next;
+ }
+}
+
+void
MR_STM_commit(MR_STM_TransLog *tlog) {
MR_STM_TransRecord *current;
@@ -79,13 +130,15 @@
while (current != NULL) {
current->MR_STM_tr_var->MR_STM_var_value
= current->MR_STM_tr_new_value;
+
+ MR_STM_signal_vars(current->MR_STM_tr_var);
current = current->MR_STM_tr_next;
}
}
void
-MR_STM_wait(MR_STM_TransLog *tlog)
-{
+MR_STM_wait(MR_STM_TransLog *tlog, MR_STM_ConditionVar *cvar) {
+
MR_STM_TransRecord *current;
MR_ThreadId this_thread_id;
@@ -93,14 +146,14 @@
current = tlog->MR_STM_tl_records;
while (current != NULL) {
- MR_STM_attach_waiter(current->MR_STM_tr_var, this_thread_id);
+ MR_STM_attach_waiter(current->MR_STM_tr_var, this_thread_id, cvar);
current = current->MR_STM_tr_next;
}
}
void
-MR_STM_unwait(MR_STM_TransLog *tlog)
-{
+MR_STM_unwait(MR_STM_TransLog *tlog, MR_STM_ConditionVar *cvar) {
+
MR_STM_TransRecord *current;
MR_ThreadId this_thread_id;
@@ -108,15 +161,16 @@
current = tlog->MR_STM_tl_records;
while (current != NULL) {
- MR_STM_detach_waiter(current->MR_STM_tr_var, this_thread_id);
+ MR_STM_detach_waiter(current->MR_STM_tr_var, cvar);
current = current->MR_STM_tr_next;
}
}
void
-MR_STM_write_var(MR_STM_Var *var, MR_Word value, MR_STM_TransLog *tlog)
-{
+MR_STM_write_var(MR_STM_Var *var, MR_Word value, MR_STM_TransLog *tlog) {
+
MR_STM_TransRecord *current;
+ MR_STM_TransRecord *local_log;
MR_bool has_existing_record = MR_FALSE;
/*
@@ -143,8 +197,8 @@
}
MR_Word
-MR_STM_read_var(MR_STM_Var *var, MR_STM_TransLog *tlog)
-{
+MR_STM_read_var(MR_STM_Var *var, MR_STM_TransLog *tlog) {
+
MR_STM_TransLog *current_tlog;
MR_STM_TransRecord *current;
@@ -176,3 +230,76 @@
return var->MR_STM_var_value;
}
+
+void
+MR_STM_merge_transactions(MR_STM_TransLog *tlog) {
+
+ MR_STM_TransLog *parent_log;
+ MR_STM_TransRecord *parent_current;
+ MR_STM_TransRecord *current;
+ MR_STM_TransRecord *records_to_append_to_parent;
+ MR_bool found_tvar_in_parent;
+
+ MR_assert(tlog != NULL);
+ MR_assert(tlog->MR_STM_tl_parent != NULL);
+
+ parent_log = tlog->MR_STM_tl_parent;
+
+ current = tlog->MR_STM_tl_records;
+ while (current != NULL) {
+
+ found_tvar_in_parent = MR_NO;
+ parent_current = parent_log->MR_STM_tl_records;
+
+ while (parent_current != NULL) {
+ if (current->MR_STM_tr_var == parent_current->MR_STM_tr_var) {
+ parent_current->MR_STM_tr_new_value =
+ current->MR_STM_tr_new_value;
+ found_tvar_in_parent = MR_YES;
+ break;
+ }
+
+ parent_current = parent_current->MR_STM_tr_next;
+ }
+
+ if (! found_tvar_in_parent) {
+ MR_STM_record_transaction(parent_log,
+ current->MR_STM_tr_var, current->MR_STM_tr_old_value,
+ current->MR_STM_tr_new_value);
+ }
+
+ current = current->MR_STM_tr_next;
+ }
+
+ /* Deallocate child log */
+#if !defined(MR_CONSERVATIVE_GC)
+ /* XXX -- Free tlog and log entries */
+#endif
+}
+
+void
+MR_STM_block_thread(MR_STM_TransLog *tlog) {
+
+#if defined(MR_THREAD_SAFE)
+ #if defined(MR_HIGHLEVEL_CODE)
+ MR_STM_ConditionVar *thread_condvar;
+
+ thread_condvar = MR_GC_NEW(MR_STM_ConditionVar);
+
+ MR_STM_condvar_init(thread_condvar)
+ MR_STM_wait(tlog);
+
+ MR_STM_condvar_wait(tlog, MR_STM_lock);
+
+ MR_STM_unwait(tlog, thread_condvar);
+ MR_STM_condvar_destroy(thread_condvar);
+
+ MR_GC_free(thread_condvar);
+ #else
+ MR_fatal_error("Low-Level backend: Not implemented");
+ #endif
+#else
+ MR_fatal_error("Blocking thread in non-parallel grade");
+#endif
+
+}
Index: runtime/mercury_stm.h
===================================================================
RCS file: /home/mercury/mercury1/repository/mercury/runtime/mercury_stm.h,v
retrieving revision 1.5
diff -u -u -r1.5 mercury_stm.h
--- runtime/mercury_stm.h 17 Sep 2007 13:28:56 -0000 1.5
+++ runtime/mercury_stm.h 25 Sep 2007 05:35:24 -0000
@@ -17,6 +17,7 @@
#include "mercury_types.h"
#include "mercury_thread.h"
#include "mercury_conf.h"
+#include "mercury_conf_param.h"
#include "mercury_context.h"
#include "mercury_engine.h"
@@ -44,10 +45,11 @@
#if defined(MR_HIGHLEVEL_CODE)
#if defined(MR_THREAD_SAFE)
- typedef pthread_t MR_ThreadId;
+ typedef pthread_t MR_ThreadId;
+
#define MR_THIS_THREAD_ID pthread_self()
#else
- typedef MR_Integer MR_ThreadId;
+ typedef MR_Integer MR_ThreadId;
/*
** Since these grades don't support concurrency there is only one
** thread which we always give the id 0.
@@ -57,18 +59,58 @@
#else /* !MR_HIGHLEVEL_CODE */
- typedef MR_Context *MR_ThreadId;
+ typedef MR_Context *MR_ThreadId;
#define MR_THIS_THREAD_ID (MR_ENGINE(MR_eng_this_context))
#endif /* !MR_HIGHLEVEL_CODE */
/*
+** The type MR_STM_ConditionVar provides an abstract method of blocking and
+** signalling threads based on conditions.
+*/
+#if defined(MR_HIGHLEVEL_CODE)
+
+ #if defined(MR_THREAD_SAFE)
+ typedef pthread_cond_t MR_STM_ConditionVar;
+
+ #define MR_STM_condvar_init(x) pthread_cond_init(x, NULL)
+ #define MR_STM_condvar_wait(x, y) pthread_cond_wait(x, y)
+ #define MR_STM_condvar_signal(x) pthread_cond_signal(x)
+ #define MR_STM_condvar_destroy(x) pthread_cond_destroy(x)
+ #else
+ typedef MR_Integer MR_STM_ConditionVar;
+ /*
+ ** Since these grades don't support concurrency, there is no
+ ** need to block the thread.
+ */
+ #define MR_STM_condvar_init(x)
+ #define MR_STM_condvar_wait(x, y)
+ #define MR_STM_condvar_signal(x)
+ #define MR_STM_condvar_destroy(x)
+ #endif
+
+#else /* !MR_HIGHLEVEL_CODE */
+
+ typedef MR_Context *MR_STM_ConditionVar;
+
+ /*
+ ** XXX Need to implement
+ */
+ #define MR_STM_condvar_init(x)
+ #define MR_STM_condvar_wait(x, y)
+ #define MR_STM_condvar_signal(x)
+ #define MR_STM_condvar_destroy(x)
+
+#endif /* !MR_HIGHLEVEL_CODE */
+
+/*
** A waiter is the identity of a thread that is blocking until the value
** of this transaction variable changes.
*/
struct MR_STM_Waiter_Struct {
- MR_ThreadId MR_STM_waiter_thread;
+ MR_STM_ConditionVar *MR_STM_cond_var;
MR_STM_Waiter *MR_STM_waiter_next;
+ MR_STM_Waiter *MR_STM_waiter_prev;
};
/*
@@ -139,20 +181,23 @@
** listed in the log.
*/
extern void
-MR_STM_wait(MR_STM_TransLog *tlog);
+MR_STM_wait(MR_STM_TransLog *tlog, MR_STM_ConditionVar *cvar);
/*
** Detach waiters for the current thread from all of the transaction variables
** referenced by the given transaction log.
*/
extern void
-MR_STM_unwait(MR_STM_TransLog *tlog);
+MR_STM_unwait(MR_STM_TransLog *tlog, MR_STM_ConditionVar *cvar);
/*
-** Attach a waiter for thread tid to the transaction variable.
+** Attach a waiter for thread tid to the transaction variable. The condition
+** variable should be a condition variable properly initialised and associated
+** with the thread.
*/
extern void
-MR_STM_attach_waiter(MR_STM_Var *var, MR_ThreadId tid);
+MR_STM_attach_waiter(MR_STM_Var *var, MR_ThreadId tid,
+ MR_STM_ConditionVar *cvar);
/*
** Detach any waiters for thread tid from the transaction variable.
@@ -161,7 +206,7 @@
** such a waiter exists.
*/
extern void
-MR_STM_detach_waiter(MR_STM_Var *var, MR_ThreadId tid);
+MR_STM_detach_waiter(MR_STM_Var *var, MR_STM_ConditionVar *cvar);
extern MR_Integer
MR_STM_validate(MR_STM_TransLog *tlog);
@@ -183,6 +228,30 @@
#endif
/*
+** Blocks a thread from execution. This method is called by the thread
+** which is to be blocked. The STM lock MUST be aquired by the
+** thread before this method is called and acquires the lock when the thread
+** is signalled.
+*/
+extern void
+MR_STM_block_thread(MR_STM_TransLog *tlog);
+
+/*
+** Merges a transaction with its parent. Do not merge it with any
+** other ancestors. Aborts if the given transaction log does not have a
+** parent.
+*/
+extern void
+MR_STM_merge_transactions(MR_STM_TransLog *tlog);
+
+/*
+** Reschedules all threads currently waiting on the given transaction
+** variables.
+*/
+extern void
+MR_STM_signal_vars(MR_STM_Var *tvar);
+
+/*
** These definitions need to be kept in sync with the definition of the type
** stm_validation_result/0 in library/stm_builtin.m. Changes here may need
** be reflected there.
Index: library/stm_builtin.m
===================================================================
RCS file: /home/mercury/mercury1/repository/mercury/library/stm_builtin.m,v
retrieving revision 1.12
diff -u -u -r1.12 stm_builtin.m
--- library/stm_builtin.m 17 Sep 2007 13:28:55 -0000 1.12
+++ library/stm_builtin.m 26 Sep 2007 05:07:44 -0000
@@ -171,6 +171,15 @@
:- type stm_dummy_output
---> stm_dummy_output.
+ % Used to enforce the uniqueness of outer and inner variables.
+ % Will be removed before stm_expansion.
+ %
+:- pred stm_from_outer_to_inner_io(io::di, stm::uo) is det.
+:- pred stm_from_outer_to_inner_stm(stm::di, stm::uo) is det.
+
+:- pred stm_from_inner_to_outer_io(stm::di, io::uo) is det.
+:- pred stm_from_inner_to_outer_stm(stm::di, stm::uo) is det.
+
%-----------------------------------------------------------------------------%
%-----------------------------------------------------------------------------%
@@ -276,12 +285,43 @@
MR_STM_commit(STM);
").
+:- pragma foreign_proc("C",
+ stm_from_outer_to_inner_io(IO::di, STM::uo),
+ [promise_pure, will_not_call_mercury, thread_safe],
+"
+ STM = NULL;
+ MR_final_io_state(IO);
+").
+
+:- pragma foreign_proc("C",
+ stm_from_outer_to_inner_stm(STM0::di, STM::uo),
+ [promise_pure, will_not_call_mercury, thread_safe],
+"
+ STM = STM0;
+").
+
+:- pragma foreign_proc("C",
+ stm_from_inner_to_outer_io(STM0::di, IO::uo),
+ [promise_pure, will_not_call_mercury, thread_safe],
+"
+ STM0 = NULL;
+ IO = MR_initial_io_state();
+").
+
+:- pragma foreign_proc("C",
+ stm_from_inner_to_outer_stm(STM0::di, STM::uo),
+ [promise_pure, will_not_call_mercury, thread_safe],
+"
+ STM0 = NULL;
+ STM = NULL;
+").
%-----------------------------------------------------------------------------%
:- pragma foreign_proc("C",
- stm_block(_STM::ui),
+ stm_block(STM::ui),
[will_not_call_mercury, thread_safe],
"
+ MR_stm_block_thread(STM);
").
%-----------------------------------------------------------------------------%
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to: mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions: mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------
More information about the reviews
mailing list