[m-rev.] for review: stm runtime and library

Leon Ilario MIKA lmika at students.csse.unimelb.edu.au
Wed Sep 26 15:38:32 AEST 2007


Estimated hours taken: 8

Modifcation of Software Transactional Memory runtime and library to handle
blocking and nested STM transactions.

runtime/mercury_stm.c
runtime/mercury_stm.h
 	Defined condition variables for the high-level C grade.  This
 	is used for the scheduling of threads with blocking transactions.
 	Such grades used POSIX condition variables for this purpose.

 	Added wait queues to transaction variables.  These data structures
 	maintain a list of all threads waiting for the value of the
 	corresponding transaction variable to change.

 	Added a function to block a transaction until the value of a
 	transaction variable has changed.

 	Added a function which permits the merging of a transaction log
 	with it's parent.

runtime/stm_builtin.m
 	Added utility predicates which will assist the checking of atomic
 	goals durring the semantic analysis and error checking phase of the
 	compiler.  These are automatically inserted durring the construction
 	of the hlds and automatically removed durring the expansion of
 	stm_goals.

 	Completed the implementation of the predicate "stm_block".

Index: runtime/mercury_stm.c
===================================================================
RCS file: /home/mercury/mercury1/repository/mercury/runtime/mercury_stm.c,v
retrieving revision 1.5
diff -u -u -r1.5 mercury_stm.c
--- runtime/mercury_stm.c	21 Sep 2007 06:13:12 -0000	1.5
+++ runtime/mercury_stm.c	26 Sep 2007 05:22:51 -0000
@@ -20,8 +20,8 @@

  void
  MR_STM_record_transaction(MR_STM_TransLog *tlog, MR_STM_Var *var,
-    MR_Word old_value, MR_Word new_value)
-{
+        MR_Word old_value, MR_Word new_value) {
+
      MR_STM_TransRecord  *new_record;

      new_record = MR_GC_NEW(MR_STM_TransRecord);
@@ -33,32 +33,70 @@
  }

  void
-MR_STM_attach_waiter(MR_STM_Var *var, MR_ThreadId tid)
-{
-    MR_fatal_error("NYI MR_STM_attach_waiter");
+MR_STM_attach_waiter(MR_STM_Var *var, MR_ThreadId tid, 
+        MR_STM_ConditionVar *cvar) {
+
+    MR_STM_Waiter   *new_waiter;
+
+    new_waiter = MR_GC_NEW(MR_STM_Waiter);
+    new_waiter->MR_STM_cond_var = cvar;
+
+    if (var->MR_STM_var_waiters == NULL) {
+        var->MR_STM_var_waiters = new_waiter;
+        new_waiter->MR_STM_waiter_prev = NULL;
+        new_waiter->MR_STM_waiter_next = NULL;
+    } else {
+        new_waiter->MR_STM_waiter_prev = NULL;
+        new_waiter->MR_STM_waiter_next = var->MR_STM_var_waiters;
+        var->MR_STM_var_waiters->MR_STM_waiter_prev = new_waiter;
+        var->MR_STM_var_waiters = new_waiter;
+    }
  }

  void
-MR_STM_detach_waiter(MR_STM_Var *var, MR_ThreadId tid)
-{
-    MR_fatal_error("NYI MR_STM_detach_waiter");
+MR_STM_detach_waiter(MR_STM_Var *var, MR_STM_ConditionVar *cvar) {
+ 
+    MR_STM_Waiter   *curr_waiter;
+
+    MR_assert(var != NULL);
+    MR_assert(var->MR_STM_var_waiters != NULL);
+
+    curr_waiter = var->MR_STM_var_waiters;
+    while (curr_waiter != NULL) {
+        if (curr_waiter->MR_STM_cond_var == cvar) {
+            if (curr_waiter == var->MR_STM_var_waiters) {
+                var->MR_STM_var_waiters = 
+                    var->MR_STM_var_waiters->MR_STM_waiter_next;
+            }
+            if (curr_waiter->MR_STM_waiter_prev != NULL) {
+                curr_waiter->MR_STM_waiter_prev->MR_STM_waiter_next = 
+                    curr_waiter->MR_STM_waiter_next;
+            }
+            if (curr_waiter->MR_STM_waiter_next != NULL) {
+                curr_waiter->MR_STM_waiter_next->MR_STM_waiter_prev =
+                    curr_waiter->MR_STM_waiter_prev;
+            }
+            curr_waiter = NULL;
+            return;
+        }
+    }
+
+    MR_fatal_error("MR_STM_detach_waiter: Thread ID not in wait queue");
  }

  MR_Integer
-MR_STM_validate(MR_STM_TransLog *tlog)
-{
+MR_STM_validate(MR_STM_TransLog *tlog) {
+
      MR_STM_TransRecord  *current;

      MR_assert(tlog != NULL);

      while (tlog != NULL) {
-
          current = tlog->MR_STM_tl_records;

          while (current != NULL) {
              if (current->MR_STM_tr_var->MR_STM_var_value !=
-                current->MR_STM_tr_old_value)
-            {
+                current->MR_STM_tr_old_value) {
                  return MR_STM_TRANSACTION_INVALID;
              }
              current = current->MR_STM_tr_next;
@@ -71,6 +109,19 @@
  }

  void
+MR_STM_signal_vars(MR_STM_Var *tvar) {
+
+    MR_STM_Waiter   *wait_queue;
+
+    wait_queue = tvar->MR_STM_var_waiters;
+
+    while (wait_queue != NULL) {
+        MR_STM_condvar_signal(wait_queue->MR_STM_cond_var);
+        wait_queue = wait_queue->MR_STM_waiter_next;
+    }
+}
+
+void
  MR_STM_commit(MR_STM_TransLog *tlog) {

      MR_STM_TransRecord  *current;
@@ -79,13 +130,15 @@
      while (current != NULL) {
          current->MR_STM_tr_var->MR_STM_var_value
              = current->MR_STM_tr_new_value;
+
+        MR_STM_signal_vars(current->MR_STM_tr_var);
          current = current->MR_STM_tr_next;
      }
  }

  void
-MR_STM_wait(MR_STM_TransLog *tlog)
-{
+MR_STM_wait(MR_STM_TransLog *tlog, MR_STM_ConditionVar *cvar) {
+
      MR_STM_TransRecord  *current;
      MR_ThreadId         this_thread_id;

@@ -93,14 +146,14 @@

      current = tlog->MR_STM_tl_records;
      while (current != NULL) {
-        MR_STM_attach_waiter(current->MR_STM_tr_var, this_thread_id);
+        MR_STM_attach_waiter(current->MR_STM_tr_var, this_thread_id, cvar);
          current = current->MR_STM_tr_next;
      }
  }

  void
-MR_STM_unwait(MR_STM_TransLog *tlog)
-{
+MR_STM_unwait(MR_STM_TransLog *tlog, MR_STM_ConditionVar *cvar) {
+
      MR_STM_TransRecord  *current;
      MR_ThreadId         this_thread_id;

@@ -108,15 +161,16 @@
      current = tlog->MR_STM_tl_records;

      while (current != NULL) {
-        MR_STM_detach_waiter(current->MR_STM_tr_var, this_thread_id);
+        MR_STM_detach_waiter(current->MR_STM_tr_var, cvar);
          current = current->MR_STM_tr_next;
      }
  }

  void
-MR_STM_write_var(MR_STM_Var *var, MR_Word value, MR_STM_TransLog *tlog)
-{
+MR_STM_write_var(MR_STM_Var *var, MR_Word value, MR_STM_TransLog *tlog) {
+
      MR_STM_TransRecord  *current;
+    MR_STM_TransRecord  *local_log;
      MR_bool             has_existing_record = MR_FALSE;

      /*
@@ -143,8 +197,8 @@
  }

  MR_Word
-MR_STM_read_var(MR_STM_Var *var, MR_STM_TransLog *tlog)
-{
+MR_STM_read_var(MR_STM_Var *var, MR_STM_TransLog *tlog) {
+
      MR_STM_TransLog     *current_tlog;
      MR_STM_TransRecord  *current;

@@ -176,3 +230,76 @@

      return var->MR_STM_var_value;
  }
+
+void
+MR_STM_merge_transactions(MR_STM_TransLog *tlog) {
+
+    MR_STM_TransLog     *parent_log;
+    MR_STM_TransRecord  *parent_current;
+    MR_STM_TransRecord  *current;
+    MR_STM_TransRecord  *records_to_append_to_parent;
+    MR_bool             found_tvar_in_parent;
+
+    MR_assert(tlog != NULL);
+    MR_assert(tlog->MR_STM_tl_parent != NULL);
+
+    parent_log = tlog->MR_STM_tl_parent;
+
+    current = tlog->MR_STM_tl_records;
+    while (current != NULL) {
+
+        found_tvar_in_parent = MR_NO;
+        parent_current = parent_log->MR_STM_tl_records;
+
+        while (parent_current != NULL) {
+            if (current->MR_STM_tr_var == parent_current->MR_STM_tr_var) {
+                parent_current->MR_STM_tr_new_value =
+                    current->MR_STM_tr_new_value;
+                found_tvar_in_parent = MR_YES;
+                break;
+            }
+
+            parent_current = parent_current->MR_STM_tr_next;
+        }
+
+        if (! found_tvar_in_parent) {
+            MR_STM_record_transaction(parent_log,
+                    current->MR_STM_tr_var, current->MR_STM_tr_old_value,
+                    current->MR_STM_tr_new_value);
+        }
+
+        current = current->MR_STM_tr_next;
+    }
+
+    /* Deallocate child log */
+#if !defined(MR_CONSERVATIVE_GC)
+    /* XXX -- Free tlog and log entries */
+#endif
+}
+
+void
+MR_STM_block_thread(MR_STM_TransLog *tlog) {
+
+#if defined(MR_THREAD_SAFE)
+    #if defined(MR_HIGHLEVEL_CODE)
+        MR_STM_ConditionVar     *thread_condvar;
+
+        thread_condvar = MR_GC_NEW(MR_STM_ConditionVar);
+ 
+        MR_STM_condvar_init(thread_condvar)
+        MR_STM_wait(tlog);
+
+        MR_STM_condvar_wait(tlog, MR_STM_lock);
+
+        MR_STM_unwait(tlog, thread_condvar);
+        MR_STM_condvar_destroy(thread_condvar);
+
+        MR_GC_free(thread_condvar);
+    #else
+        MR_fatal_error("Low-Level backend: Not implemented");
+    #endif
+#else
+    MR_fatal_error("Blocking thread in non-parallel grade");
+#endif
+
+}
Index: runtime/mercury_stm.h
===================================================================
RCS file: /home/mercury/mercury1/repository/mercury/runtime/mercury_stm.h,v
retrieving revision 1.5
diff -u -u -r1.5 mercury_stm.h
--- runtime/mercury_stm.h	17 Sep 2007 13:28:56 -0000	1.5
+++ runtime/mercury_stm.h	25 Sep 2007 05:35:24 -0000
@@ -17,6 +17,7 @@
  #include "mercury_types.h"
  #include "mercury_thread.h"
  #include "mercury_conf.h"
+#include "mercury_conf_param.h"
  #include "mercury_context.h"
  #include "mercury_engine.h"

@@ -44,10 +45,11 @@
  #if defined(MR_HIGHLEVEL_CODE)

      #if defined(MR_THREAD_SAFE)
-        typedef pthread_t   MR_ThreadId;
+        typedef pthread_t       MR_ThreadId;
+
          #define MR_THIS_THREAD_ID pthread_self()
      #else
-        typedef MR_Integer  MR_ThreadId;
+        typedef MR_Integer      MR_ThreadId;
          /*
          ** Since these grades don't support concurrency there is only one
          ** thread which we always give the id 0.
@@ -57,18 +59,58 @@

  #else /* !MR_HIGHLEVEL_CODE */

-    typedef MR_Context  *MR_ThreadId;
+    typedef MR_Context      *MR_ThreadId;
      #define MR_THIS_THREAD_ID (MR_ENGINE(MR_eng_this_context))

  #endif /* !MR_HIGHLEVEL_CODE */

  /*
+** The type MR_STM_ConditionVar provides an abstract method of blocking and
+** signalling threads based on conditions.
+*/
+#if defined(MR_HIGHLEVEL_CODE)
+
+    #if defined(MR_THREAD_SAFE)
+        typedef pthread_cond_t  MR_STM_ConditionVar;
+
+        #define MR_STM_condvar_init(x)        pthread_cond_init(x, NULL)
+        #define MR_STM_condvar_wait(x, y)     pthread_cond_wait(x, y)
+        #define MR_STM_condvar_signal(x)      pthread_cond_signal(x)
+        #define MR_STM_condvar_destroy(x)     pthread_cond_destroy(x)
+    #else
+        typedef MR_Integer      MR_STM_ConditionVar;
+        /*
+        ** Since these grades don't support concurrency, there is no
+        ** need to block the thread. 
+        */
+        #define MR_STM_condvar_init(x)
+        #define MR_STM_condvar_wait(x, y)
+        #define MR_STM_condvar_signal(x)
+        #define MR_STM_condvar_destroy(x)
+    #endif
+
+#else /* !MR_HIGHLEVEL_CODE */
+
+    typedef MR_Context  *MR_STM_ConditionVar;
+
+    /*
+    ** XXX Need to implement
+    */
+    #define MR_STM_condvar_init(x)
+    #define MR_STM_condvar_wait(x, y)
+    #define MR_STM_condvar_signal(x)
+    #define MR_STM_condvar_destroy(x)
+
+#endif /* !MR_HIGHLEVEL_CODE */
+
+/*
  ** A waiter is the identity of a thread that is blocking until the value
  ** of this transaction variable changes.
  */
  struct MR_STM_Waiter_Struct {
-    MR_ThreadId     MR_STM_waiter_thread;
+    MR_STM_ConditionVar *MR_STM_cond_var;
      MR_STM_Waiter   *MR_STM_waiter_next;
+    MR_STM_Waiter   *MR_STM_waiter_prev;
  };

  /*
@@ -139,20 +181,23 @@
  ** listed in the log.
  */
  extern void
-MR_STM_wait(MR_STM_TransLog *tlog);
+MR_STM_wait(MR_STM_TransLog *tlog, MR_STM_ConditionVar *cvar);

  /*
  ** Detach waiters for the current thread from all of the transaction variables
  ** referenced by the given transaction log.
  */
  extern void
-MR_STM_unwait(MR_STM_TransLog *tlog);
+MR_STM_unwait(MR_STM_TransLog *tlog, MR_STM_ConditionVar *cvar);

  /*
-** Attach a waiter for thread tid to the transaction variable.
+** Attach a waiter for thread tid to the transaction variable.  The condition
+** variable should be a condition variable properly initialised and associated
+** with the thread.
  */
  extern void
-MR_STM_attach_waiter(MR_STM_Var *var, MR_ThreadId tid);
+MR_STM_attach_waiter(MR_STM_Var *var, MR_ThreadId tid, 
+        MR_STM_ConditionVar *cvar);

  /*
  ** Detach any waiters for thread tid from the transaction variable.
@@ -161,7 +206,7 @@
  ** such a waiter exists.
  */
  extern void
-MR_STM_detach_waiter(MR_STM_Var *var, MR_ThreadId tid);
+MR_STM_detach_waiter(MR_STM_Var *var, MR_STM_ConditionVar *cvar);

  extern MR_Integer
  MR_STM_validate(MR_STM_TransLog *tlog);
@@ -183,6 +228,30 @@
  #endif

  /*
+** Blocks a thread from execution.  This method is called by the thread
+** which is to be blocked.  The STM lock MUST be aquired by the 
+** thread before this method is called and acquires the lock when the thread
+** is signalled.
+*/
+extern void
+MR_STM_block_thread(MR_STM_TransLog *tlog);
+
+/*
+** Merges a transaction with its parent.  Do not merge it with any
+** other ancestors.  Aborts if the given transaction log does not have a
+** parent.
+*/
+extern void
+MR_STM_merge_transactions(MR_STM_TransLog *tlog);
+
+/*
+** Reschedules all threads currently waiting on the given transaction
+** variables.
+*/
+extern void
+MR_STM_signal_vars(MR_STM_Var *tvar);
+
+/*
  ** These definitions need to be kept in sync with the definition of the type
  ** stm_validation_result/0 in library/stm_builtin.m.  Changes here may need
  ** be reflected there.
Index: library/stm_builtin.m
===================================================================
RCS file: /home/mercury/mercury1/repository/mercury/library/stm_builtin.m,v
retrieving revision 1.12
diff -u -u -r1.12 stm_builtin.m
--- library/stm_builtin.m	17 Sep 2007 13:28:55 -0000	1.12
+++ library/stm_builtin.m	26 Sep 2007 05:07:44 -0000
@@ -171,6 +171,15 @@
  :- type stm_dummy_output
      --->    stm_dummy_output.

+    % Used to enforce the uniqueness of outer and inner variables.
+    % Will be removed before stm_expansion.
+    %
+:- pred stm_from_outer_to_inner_io(io::di, stm::uo) is det.
+:- pred stm_from_outer_to_inner_stm(stm::di, stm::uo) is det.
+
+:- pred stm_from_inner_to_outer_io(stm::di, io::uo) is det.
+:- pred stm_from_inner_to_outer_stm(stm::di, stm::uo) is det.
+
  %-----------------------------------------------------------------------------%
  %-----------------------------------------------------------------------------%

@@ -276,12 +285,43 @@
      MR_STM_commit(STM);
  ").

+:- pragma foreign_proc("C",
+    stm_from_outer_to_inner_io(IO::di, STM::uo),
+    [promise_pure, will_not_call_mercury, thread_safe],
+"
+    STM = NULL;
+    MR_final_io_state(IO);
+").
+
+:- pragma foreign_proc("C",
+    stm_from_outer_to_inner_stm(STM0::di, STM::uo),
+    [promise_pure, will_not_call_mercury, thread_safe],
+"
+    STM = STM0;
+").
+
+:- pragma foreign_proc("C",
+    stm_from_inner_to_outer_io(STM0::di, IO::uo),
+    [promise_pure, will_not_call_mercury, thread_safe],
+"
+    STM0 = NULL;
+    IO = MR_initial_io_state();
+").
+
+:- pragma foreign_proc("C",
+    stm_from_inner_to_outer_stm(STM0::di, STM::uo),
+    [promise_pure, will_not_call_mercury, thread_safe],
+"
+    STM0 = NULL;
+    STM = NULL;
+").
  %-----------------------------------------------------------------------------%

  :- pragma foreign_proc("C",
-    stm_block(_STM::ui),
+    stm_block(STM::ui),
      [will_not_call_mercury, thread_safe],
  "
+    MR_stm_block_thread(STM);
  ").

  %-----------------------------------------------------------------------------%
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to:       mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions:          mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------



More information about the reviews mailing list