[m-rev.] for review: threadsafe version_arrays in Java

Peter Wang novalazy at gmail.com
Fri Apr 16 16:11:02 AEST 2010


On 14 April 2010 16:22, Peter Ross <pro at missioncriticalit.com> wrote:
> Hi,
>
> Peter Wang is probably the best person to review this.
>
> Ralph might want to make version_arrays thread safe on the C backend,
> however at least now the user will get an error when they try and use
> the non-safe version_array in a context where it might have to support
> concurrent access.

I have made the analogous change for the C backends.
Committed this.


Estimated hours taken: 12 + some
Branches: main, 10.04

Make the version_array and version_hash_table types thread safe in the C and
Java grades.

Supply unsafe initialization functions so that users use non-thread safe types
with the proviso that they don't concurrently access the array or hash table.

The synchronized version dramatically slows down version_array.lookup (100x
slower on Java).

However the test case MC uses which is to replace a map being used as
a memo table
with a version_hash_table for the memo_table it is still approximately
40% faster
than the map even with the synchronization overhead.

The Java side of this patch was by Peter Ross.


library/version_array.m:
        For Java: Make ML_va be an interface.  Provide two implementations of
        the ML_va interface: ML_uva (formerly ML_va) which provides an
        unsynchronized version_array, and ML_sva which provides a synchronized
        version_array.  ML_sva just wraps a ML_uva but locks itself so that
        only one thread can access the version array at a time, making the
        ML_uva safe.

        Add to ML_uva the method isClone() so that the ML_sva knows when the
        underlying array has been cloned, so then we can use a different lock
        object as we are protecting access to a different underlying array.

        For C: in threaded grades, add a lock field to struct ML_va which may
        point to a mutex.  Use that to synchronise concurrent accesses to the
        array.

library/version_hash_table.m:
        Make new, new_default create hash tables which are backed by thread
        safe version arrays.

        Add unsafe_new, unsafe_new_default to create hash tables using
        non-thread safe version arrays.

diff --git a/library/version_array.m b/library/version_array.m
index ef9407d..e4f644e 100644
--- a/library/version_array.m
+++ b/library/version_array.m
@@ -54,16 +54,20 @@
     % Same as empty/0 except the resulting version_array is not thread safe.
     %
     % That is your program can segfault if you attempt to concurrently access
-    % the version_array, however this version is much quicker if you guarantee
-    % that you never concurrently access the version_array.
+    % or update the array from different threads, or any two arrays produced
+    % from operations on the same original array.  However this version is much
+    % quicker if you guarantee that you never concurrently access the version
+    % array.
     %
 :- func unsafe_empty = version_array(T).

     % Same as new(N, X) except the resulting version_array is not thread safe.
     %
     % That is your program can segfault if you attempt to concurrently access
-    % the version_array, however this version is much quicker if you guarantee
-    % that you never concurrently access the version_array.
+    % or update the array from different threads, or any two arrays produced
+    % from operations on the same original array.  However this version is much
+    % quicker if you guarantee that you never concurrently access the version
+    % array.
     %
 :- func unsafe_new(int, T) = version_array(T).

@@ -348,19 +352,16 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
     [will_not_call_mercury, promise_pure, will_not_modify_trail,
         does_not_affect_liveness],
 "
-#ifdef MR_THREAD_SAFE
-    MR_fatal_error(""thread safe version_array not yet implemented"");
-#else
-    /*
-    ** If we are not in an environment which uses threads
-    ** then it is safe to use the unsynchronized version_array.
-    */
     VA = MR_GC_NEW(struct ML_va);

     VA->index            = -1;
     VA->value            = (MR_Word) NULL;
     VA->rest.array       = (MR_ArrayPtr) MR_GC_NEW_ARRAY(MR_Word, 1);
     VA->rest.array->size = 0;
+
+#ifdef MR_THREAD_SAFE
+    VA->lock             = MR_GC_NEW(MercuryLock);
+    pthread_mutex_init(VA->lock, MR_MUTEX_ATTR);
 #endif
 ").

@@ -383,6 +384,10 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
     VA->value            = (MR_Word) NULL;
     VA->rest.array       = (MR_ArrayPtr) MR_GC_NEW_ARRAY(MR_Word, 1);
     VA->rest.array->size = 0;
+
+#ifdef MR_THREAD_SAFE
+    VA->lock             = NULL;
+#endif
 ").

 :- pragma foreign_proc("Java",
@@ -398,9 +403,6 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
     [will_not_call_mercury, promise_pure, will_not_modify_trail,
         does_not_affect_liveness, may_not_duplicate],
 "
-#ifdef MR_THREAD_SAFE
-    MR_fatal_error(""thread safe version_array not yet implemented"");
-#else
     MR_Integer  i;

     VA = MR_GC_NEW(struct ML_va);
@@ -412,6 +414,10 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
     for (i = 0; i < N; i++) {
         VA->rest.array->elements[i] = X;
     }
+
+#ifdef MR_THREAD_SAFE
+    VA->lock             = MR_GC_NEW(MercuryLock);
+    pthread_mutex_init(VA->lock, MR_MUTEX_ATTR);
 #endif
 ").

@@ -439,6 +445,10 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
     for (i = 0; i < N; i++) {
         VA->rest.array->elements[i] = X;
     }
+
+#ifdef MR_THREAD_SAFE
+    VA->lock             = NULL;
+#endif
 ").

 :- pragma foreign_proc("Java",
@@ -452,33 +462,9 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
 :- pragma foreign_proc("C",
     resize(VA0::in, N::in, X::in) = (VA::out),
     [will_not_call_mercury, promise_pure, will_not_modify_trail,
-        does_not_affect_liveness, may_not_duplicate],
+        does_not_affect_liveness],
 "
-    ML_va_ptr   latest;
-    MR_Integer  i;
-    MR_Integer  size_VA0;
-    MR_Integer  min;
-
-    latest = ML_va_get_latest(VA0);
-
-    size_VA0 = ML_va_size(latest);
-    min      = (N <= size_VA0 ? N : size_VA0);
-    VA       = MR_GC_NEW(struct ML_va);
-
-    VA->index            = -1;
-    VA->value            = (MR_Word) NULL;
-    VA->rest.array       = (MR_ArrayPtr) MR_GC_NEW_ARRAY(MR_Word, N + 1);
-    VA->rest.array->size = N;
-
-    for (i = 0; i < min; i++) {
-        VA->rest.array->elements[i] = latest->rest.array->elements[i];
-    }
-
-    ML_va_rewind_into(VA, VA0);
-
-    for (i = min; i < N; i++) {
-        VA->rest.array->elements[i] = X;
-    }
+    VA = ML_va_resize_dolock(VA0, N, X);
 ").

 :- pragma foreign_proc("Java",
@@ -496,7 +482,7 @@ resize(N, X, VA, resize(VA, N, X)).
     [will_not_call_mercury, promise_pure, will_not_modify_trail,
         does_not_affect_liveness],
 "
-    N = ML_va_size(VA);
+    N = ML_va_size_dolock(VA);
 ").

 :- pragma foreign_proc("Java",
@@ -514,7 +500,7 @@ resize(N, X, VA, resize(VA, N, X)).
     [will_not_call_mercury, promise_pure, will_not_modify_trail,
         does_not_affect_liveness],
 "
-    SUCCESS_INDICATOR = ML_va_get(VA, I, &X);
+    SUCCESS_INDICATOR = ML_va_get_dolock(VA, I, &X);
 ").

 :- pragma foreign_proc("Java",
@@ -539,7 +525,7 @@ resize(N, X, VA, resize(VA, N, X)).
     [will_not_call_mercury, promise_pure, will_not_modify_trail,
         does_not_affect_liveness],
 "
-    SUCCESS_INDICATOR = ML_va_set(VA0, I, X, &VA);
+    SUCCESS_INDICATOR = ML_va_set_dolock(VA0, I, X, &VA);
 ").

 :- pragma foreign_proc("Java",
@@ -561,7 +547,7 @@ resize(N, X, VA, resize(VA, N, X)).
     [will_not_call_mercury, promise_pure, will_not_modify_trail,
         does_not_affect_liveness],
 "
-    VA = ML_va_rewind(VA0);
+    VA = ML_va_rewind_dolock(VA0);
 ").

 :- pragma foreign_proc("Java",
@@ -590,6 +576,9 @@ struct ML_va {
         MR_ArrayPtr     array;  /* Valid if index == -1          */
         ML_va_ptr       next;   /* Valid if index >= 0           */
     } rest;
+#ifdef MR_THREAD_SAFE
+    MercuryLock         *lock;  /* NULL or lock                  */
+#endif
 };

     /*
@@ -600,21 +589,25 @@ extern ML_va_ptr    ML_va_get_latest(ML_va_ptr VA);
     /*
     ** Returns the number of items in a version array.
     */
-extern MR_Integer   ML_va_size(ML_va_ptr);
+extern MR_Integer   ML_va_size_dolock(ML_va_ptr);
+static MR_Integer   ML_va_size(ML_va_ptr);

     /*
     ** If I is in range then ML_va_get(VA, I, &X) sets X to the Ith item
     ** in VA (counting from zero) and returns MR_TRUE.  Otherwise it
     ** returns MR_FALSE.
     */
-extern int          ML_va_get(ML_va_ptr, MR_Integer, MR_Word *);
+extern int          ML_va_get_dolock(ML_va_ptr, MR_Integer, MR_Word *);
+static int          ML_va_get(ML_va_ptr VA, MR_Integer I, MR_Word *Xptr);

     /*
     ** If I is in range then ML_va_set(VA0, I, X, VA) sets VA to be VA0
     ** updated with the Ith item as X (counting from zero) and
     ** returns MR_TRUE.  Otherwise it returns MR_FALSE.
     */
-extern int          ML_va_set(ML_va_ptr, MR_Integer, MR_Word, ML_va_ptr *);
+extern int          ML_va_set_dolock(ML_va_ptr, MR_Integer, MR_Word,
+                        ML_va_ptr *);
+static int          ML_va_set(ML_va_ptr, MR_Integer, MR_Word, ML_va_ptr *);

     /*
     ** Create a copy of VA0 as a new array.
@@ -631,7 +624,14 @@ static void         ML_va_rewind_into(ML_va_ptr
VA, const ML_va_ptr VA0);
     ** `Rewinds' a version array, invalidating all extant successors
     ** including the argument.
     */
-extern ML_va_ptr    ML_va_rewind(ML_va_ptr);
+extern ML_va_ptr    ML_va_rewind_dolock(ML_va_ptr);
+static ML_va_ptr    ML_va_rewind(ML_va_ptr VA);
+
+    /*
+    ** Resize a version array.
+    */
+extern ML_va_ptr    ML_va_resize_dolock(ML_va_ptr, MR_Integer, MR_Word);
+static ML_va_ptr    ML_va_resize(ML_va_ptr, MR_Integer, MR_Word);

 ").

@@ -639,6 +639,25 @@ extern ML_va_ptr    ML_va_rewind(ML_va_ptr);

 #define ML_va_latest_version(VA)   ((VA)->index == -1)

+#ifdef MR_THREAD_SAFE
+    #define ML_maybe_lock(lock)                         \
+        do {                                            \
+            if (lock) {                                 \
+                MR_LOCK(lock, ""ML_maybe_lock"");       \
+            }                                           \
+        } while (0)
+
+    #define ML_maybe_unlock(lock)                       \
+        do {                                            \
+            if (lock) {                                 \
+                MR_UNLOCK(lock, ""ML_maybe_unlock"");   \
+            }                                           \
+        } while (0)
+#else
+    #define ML_maybe_lock(lock)     ((void) 0)
+    #define ML_maybe_unlock(lock)   ((void) 0)
+#endif
+
 ML_va_ptr
 ML_va_get_latest(ML_va_ptr VA)
 {
@@ -650,6 +669,23 @@ ML_va_get_latest(ML_va_ptr VA)
 }

 MR_Integer
+ML_va_size_dolock(ML_va_ptr VA)
+{
+#ifdef MR_THREAD_SAFE
+    MercuryLock *lock = VA->lock;
+#endif
+    MR_Integer  size;
+
+    ML_maybe_lock(lock);
+
+    size = ML_va_size(VA);
+
+    ML_maybe_unlock(lock);
+
+    return size;
+}
+
+static MR_Integer
 ML_va_size(ML_va_ptr VA)
 {
     VA = ML_va_get_latest(VA);
@@ -658,6 +694,23 @@ ML_va_size(ML_va_ptr VA)
 }

 int
+ML_va_get_dolock(ML_va_ptr VA, MR_Integer I, MR_Word *Xptr)
+{
+#ifdef MR_THREAD_SAFE
+    MercuryLock *lock = VA->lock;
+#endif
+    int         ret;
+
+    ML_maybe_lock(lock);
+
+    ret = ML_va_get(VA, I, Xptr);
+
+    ML_maybe_unlock(lock);
+
+    return ret;
+}
+
+static int
 ML_va_get(ML_va_ptr VA, MR_Integer I, MR_Word *Xptr)
 {
     while (!ML_va_latest_version(VA)) {
@@ -678,6 +731,23 @@ ML_va_get(ML_va_ptr VA, MR_Integer I, MR_Word *Xptr)
 }

 int
+ML_va_set_dolock(ML_va_ptr VA0, MR_Integer I, MR_Word X, ML_va_ptr *VAptr)
+{
+#ifdef MR_THREAD_SAFE
+    MercuryLock *lock = VA0->lock;
+#endif
+    int         ret;
+
+    ML_maybe_lock(lock);
+
+    ret = ML_va_set(VA0, I, X, VAptr);
+
+    ML_maybe_unlock(lock);
+
+    return ret;
+}
+
+static int
 ML_va_set(ML_va_ptr VA0, MR_Integer I, MR_Word X, ML_va_ptr *VAptr)
 {
     ML_va_ptr VA1;
@@ -691,6 +761,9 @@ ML_va_set(ML_va_ptr VA0, MR_Integer I, MR_Word X,
ML_va_ptr *VAptr)
         VA1->index      = -1;
         VA1->value      = (MR_Word) NULL;
         VA1->rest.array = VA0->rest.array;
+#ifdef MR_THREAD_SAFE
+        VA1->lock       = VA0->lock;
+#endif

         VA0->index     = I;
         VA0->value     = VA0->rest.array->elements[I];
@@ -732,6 +805,15 @@ ML_va_flat_copy(const ML_va_ptr VA0)
         VA->rest.array->elements[i] = latest->rest.array->elements[i];
     }

+#ifdef MR_THREAD_SAFE
+    if (VA0->lock != NULL) {
+        VA->lock = MR_GC_NEW(MercuryLock);
+        pthread_mutex_init(VA->lock, MR_MUTEX_ATTR);
+    } else {
+        VA->lock = NULL;
+    }
+#endif
+
     ML_va_rewind_into(VA, VA0);

     return VA;
@@ -757,6 +839,21 @@ ML_va_rewind_into(ML_va_ptr VA, const ML_va_ptr VA0)
 }

 ML_va_ptr
+ML_va_rewind_dolock(ML_va_ptr VA)
+{
+#ifdef MR_THREAD_SAFE
+    MercuryLock *lock = VA->lock;
+#endif
+    ML_maybe_lock(lock);
+
+    VA = ML_va_rewind(VA);
+
+    ML_maybe_unlock(lock);
+
+    return VA;
+}
+
+static ML_va_ptr
 ML_va_rewind(ML_va_ptr VA)
 {
     MR_Integer I;
@@ -774,6 +871,65 @@ ML_va_rewind(ML_va_ptr VA)
     return VA;
 }

+ML_va_ptr
+ML_va_resize_dolock(ML_va_ptr VA0, MR_Integer N, MR_Word X)
+{
+#ifdef MR_THREAD_SAFE
+    MercuryLock *lock = VA0->lock;
+#endif
+    ML_va_ptr   VA;
+
+    ML_maybe_lock(lock);
+
+    VA = ML_va_resize(VA0, N, X);
+
+    ML_maybe_unlock(lock);
+
+    return VA;
+}
+
+static ML_va_ptr
+ML_va_resize(ML_va_ptr VA0, MR_Integer N, MR_Word X)
+{
+    ML_va_ptr   latest;
+    ML_va_ptr   VA;
+    MR_Integer  i;
+    MR_Integer  size_VA0;
+    MR_Integer  min;
+
+    latest = ML_va_get_latest(VA0);
+
+    size_VA0 = ML_va_size(latest);
+    min      = (N <= size_VA0 ? N : size_VA0);
+    VA       = MR_GC_NEW(struct ML_va);
+
+    VA->index            = -1;
+    VA->value            = (MR_Word) NULL;
+    VA->rest.array       = (MR_ArrayPtr) MR_GC_NEW_ARRAY(MR_Word, N + 1);
+    VA->rest.array->size = N;
+
+    for (i = 0; i < min; i++) {
+        VA->rest.array->elements[i] = latest->rest.array->elements[i];
+    }
+
+#ifdef MR_THREAD_SAFE
+    if (VA0->lock != NULL) {
+        VA->lock = MR_GC_NEW(MercuryLock);
+        pthread_mutex_init(VA->lock, MR_MUTEX_ATTR);
+    } else {
+        VA->lock = NULL;
+    }
+#endif
+
+    ML_va_rewind_into(VA, VA0);
+
+    for (i = min; i < N; i++) {
+        VA->rest.array->elements[i] = X;
+    }
+
+    return VA;
+}
+
 ").

 :- pragma foreign_code("Java", "
@@ -870,7 +1026,6 @@ public static class ML_uva implements ML_va {
         va.value = null;
         va.rest  = new Object[0];
         return va;
-
     }

     public static ML_uva init(int N, Object X) {
diff --git a/library/version_hash_table.m b/library/version_hash_table.m
index 3fb1450..c965282 100644
--- a/library/version_hash_table.m
+++ b/library/version_hash_table.m
@@ -49,12 +49,33 @@
 :- func new(hash_pred(K)::in(hash_pred), int::in, float::in) =
             (version_hash_table(K, V)::out) is det.

+    % unsafe_new(HashPred, N, MaxOccupancy)
+    %
+    % Like new/3, but the constructed hash table is backed by a non-thread safe
+    % version array. It is unsafe to concurrently access or update the hash
+    % table from different threads, or any two hash tables which were produced
+    % from operations on the same original hash table.
+    % However, if the hash table or its descendents will not be used in such a
+    % manner, a non-thread safe hash table can be much faster than a thread
+    % safe one.
+    %
+:- func unsafe_new(hash_pred(K)::in(hash_pred), int::in, float::in) =
+            (version_hash_table(K, V)::out) is det.
+
     % new_default(HashFn) constructs a hash table with default size and
     % occupancy arguments.
     %
 :- func new_default(hash_pred(K)::in(hash_pred)) =
             (version_hash_table(K, V)::out) is det.

+    % unsafe_new_default(HashFn)
+    %
+    % Like new_default/3 but the constructed hash table is backed by a
+    % non-thread safe version array. See the description of unsafe_new/3 above.
+    %
+:- func unsafe_new_default(hash_pred(K)::in(hash_pred)) =
+            (version_hash_table(K, V)::out) is det.
+
     % Retrieve the hash_pred associated with a hash table.
     %
 % :- func hash_pred(version_hash_table(K, V)) = hash_pred(K).
@@ -180,7 +201,14 @@

 %-----------------------------------------------------------------------------%

-new(HashPred, N, MaxOccupancy) = HT :-
+new(HashPred, N, MaxOccupancy) = new_2(HashPred, N, MaxOccupancy, yes).
+
+unsafe_new(HashPred, N, MaxOccupancy) = new_2(HashPred, N, MaxOccupancy, no).
+
+:- func new_2(hash_pred(K)::in(hash_pred), int::in, float::in, bool::in) =
+            (version_hash_table(K, V)::out) is det.
+
+new_2(HashPred, N, MaxOccupancy, NeedSafety) = HT :-
     (      if N =< 0 then
             throw(software_error("version_hash_table.new_hash_table: N =< 0"))
       else if N >= int.bits_per_int then
@@ -190,10 +218,16 @@ new(HashPred, N, MaxOccupancy) = HT :-
             throw(software_error(
                 "version_hash_table.new: MaxOccupancy =< 0.0"))
       else
-            NumBuckets   = 1 << N,
+            NumBuckets = 1 << N,
             MaxOccupants = ceiling_to_int(float(NumBuckets) * MaxOccupancy),
-            Buckets      = init(NumBuckets, []),
-            HT           = ht(0, MaxOccupants, HashPred, Buckets)
+            (
+                NeedSafety = yes,
+                Buckets = version_array.new(NumBuckets, [])
+            ;
+                NeedSafety = no,
+                Buckets = version_array.unsafe_new(NumBuckets, [])
+            ),
+            HT = ht(0, MaxOccupants, HashPred, Buckets)
     ).

 %-----------------------------------------------------------------------------%
@@ -202,6 +236,8 @@ new(HashPred, N, MaxOccupancy) = HT :-
     %
 new_default(HashPred) = new(HashPred, 7, 0.9).

+unsafe_new_default(HashPred) = unsafe_new(HashPred, 7, 0.9).
+
 %-----------------------------------------------------------------------------%

 num_buckets(HT) = size(HT ^ buckets).
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to:       mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions:          mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------



More information about the reviews mailing list