[m-rev.] for review: threadsafe version_arrays in Java
Peter Wang
novalazy at gmail.com
Fri Apr 16 16:11:02 AEST 2010
On 14 April 2010 16:22, Peter Ross <pro at missioncriticalit.com> wrote:
> Hi,
>
> Peter Wang is probably the best person to review this.
>
> Ralph might want to make version_arrays thread safe on the C backend,
> however at least now the user will get an error when they try and use
> the non-safe version_array in a context where it might have to support
> concurrent access.
I have made the analogous change for the C backends.
Committed this.
Estimated hours taken: 12 + some
Branches: main, 10.04
Make the version_array and version_hash_table types thread safe in the C and
Java grades.
Supply unsafe initialization functions so that users use non-thread safe types
with the proviso that they don't concurrently access the array or hash table.
The synchronized version dramatically slows down version_array.lookup (100x
slower on Java).
However the test case MC uses which is to replace a map being used as
a memo table
with a version_hash_table for the memo_table it is still approximately
40% faster
than the map even with the synchronization overhead.
The Java side of this patch was by Peter Ross.
library/version_array.m:
For Java: Make ML_va be an interface. Provide two implementations of
the ML_va interface: ML_uva (formerly ML_va) which provides an
unsynchronized version_array, and ML_sva which provides a synchronized
version_array. ML_sva just wraps a ML_uva but locks itself so that
only one thread can access the version array at a time, making the
ML_uva safe.
Add to ML_uva the method isClone() so that the ML_sva knows when the
underlying array has been cloned, so then we can use a different lock
object as we are protecting access to a different underlying array.
For C: in threaded grades, add a lock field to struct ML_va which may
point to a mutex. Use that to synchronise concurrent accesses to the
array.
library/version_hash_table.m:
Make new, new_default create hash tables which are backed by thread
safe version arrays.
Add unsafe_new, unsafe_new_default to create hash tables using
non-thread safe version arrays.
diff --git a/library/version_array.m b/library/version_array.m
index ef9407d..e4f644e 100644
--- a/library/version_array.m
+++ b/library/version_array.m
@@ -54,16 +54,20 @@
% Same as empty/0 except the resulting version_array is not thread safe.
%
% That is your program can segfault if you attempt to concurrently access
- % the version_array, however this version is much quicker if you guarantee
- % that you never concurrently access the version_array.
+ % or update the array from different threads, or any two arrays produced
+ % from operations on the same original array. However this version is much
+ % quicker if you guarantee that you never concurrently access the version
+ % array.
%
:- func unsafe_empty = version_array(T).
% Same as new(N, X) except the resulting version_array is not thread safe.
%
% That is your program can segfault if you attempt to concurrently access
- % the version_array, however this version is much quicker if you guarantee
- % that you never concurrently access the version_array.
+ % or update the array from different threads, or any two arrays produced
+ % from operations on the same original array. However this version is much
+ % quicker if you guarantee that you never concurrently access the version
+ % array.
%
:- func unsafe_new(int, T) = version_array(T).
@@ -348,19 +352,16 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
[will_not_call_mercury, promise_pure, will_not_modify_trail,
does_not_affect_liveness],
"
-#ifdef MR_THREAD_SAFE
- MR_fatal_error(""thread safe version_array not yet implemented"");
-#else
- /*
- ** If we are not in an environment which uses threads
- ** then it is safe to use the unsynchronized version_array.
- */
VA = MR_GC_NEW(struct ML_va);
VA->index = -1;
VA->value = (MR_Word) NULL;
VA->rest.array = (MR_ArrayPtr) MR_GC_NEW_ARRAY(MR_Word, 1);
VA->rest.array->size = 0;
+
+#ifdef MR_THREAD_SAFE
+ VA->lock = MR_GC_NEW(MercuryLock);
+ pthread_mutex_init(VA->lock, MR_MUTEX_ATTR);
#endif
").
@@ -383,6 +384,10 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
VA->value = (MR_Word) NULL;
VA->rest.array = (MR_ArrayPtr) MR_GC_NEW_ARRAY(MR_Word, 1);
VA->rest.array->size = 0;
+
+#ifdef MR_THREAD_SAFE
+ VA->lock = NULL;
+#endif
").
:- pragma foreign_proc("Java",
@@ -398,9 +403,6 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
[will_not_call_mercury, promise_pure, will_not_modify_trail,
does_not_affect_liveness, may_not_duplicate],
"
-#ifdef MR_THREAD_SAFE
- MR_fatal_error(""thread safe version_array not yet implemented"");
-#else
MR_Integer i;
VA = MR_GC_NEW(struct ML_va);
@@ -412,6 +414,10 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
for (i = 0; i < N; i++) {
VA->rest.array->elements[i] = X;
}
+
+#ifdef MR_THREAD_SAFE
+ VA->lock = MR_GC_NEW(MercuryLock);
+ pthread_mutex_init(VA->lock, MR_MUTEX_ATTR);
#endif
").
@@ -439,6 +445,10 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
for (i = 0; i < N; i++) {
VA->rest.array->elements[i] = X;
}
+
+#ifdef MR_THREAD_SAFE
+ VA->lock = NULL;
+#endif
").
:- pragma foreign_proc("Java",
@@ -452,33 +462,9 @@ cmp_version_array_2(I, Size, VAa, VAb, R) :-
:- pragma foreign_proc("C",
resize(VA0::in, N::in, X::in) = (VA::out),
[will_not_call_mercury, promise_pure, will_not_modify_trail,
- does_not_affect_liveness, may_not_duplicate],
+ does_not_affect_liveness],
"
- ML_va_ptr latest;
- MR_Integer i;
- MR_Integer size_VA0;
- MR_Integer min;
-
- latest = ML_va_get_latest(VA0);
-
- size_VA0 = ML_va_size(latest);
- min = (N <= size_VA0 ? N : size_VA0);
- VA = MR_GC_NEW(struct ML_va);
-
- VA->index = -1;
- VA->value = (MR_Word) NULL;
- VA->rest.array = (MR_ArrayPtr) MR_GC_NEW_ARRAY(MR_Word, N + 1);
- VA->rest.array->size = N;
-
- for (i = 0; i < min; i++) {
- VA->rest.array->elements[i] = latest->rest.array->elements[i];
- }
-
- ML_va_rewind_into(VA, VA0);
-
- for (i = min; i < N; i++) {
- VA->rest.array->elements[i] = X;
- }
+ VA = ML_va_resize_dolock(VA0, N, X);
").
:- pragma foreign_proc("Java",
@@ -496,7 +482,7 @@ resize(N, X, VA, resize(VA, N, X)).
[will_not_call_mercury, promise_pure, will_not_modify_trail,
does_not_affect_liveness],
"
- N = ML_va_size(VA);
+ N = ML_va_size_dolock(VA);
").
:- pragma foreign_proc("Java",
@@ -514,7 +500,7 @@ resize(N, X, VA, resize(VA, N, X)).
[will_not_call_mercury, promise_pure, will_not_modify_trail,
does_not_affect_liveness],
"
- SUCCESS_INDICATOR = ML_va_get(VA, I, &X);
+ SUCCESS_INDICATOR = ML_va_get_dolock(VA, I, &X);
").
:- pragma foreign_proc("Java",
@@ -539,7 +525,7 @@ resize(N, X, VA, resize(VA, N, X)).
[will_not_call_mercury, promise_pure, will_not_modify_trail,
does_not_affect_liveness],
"
- SUCCESS_INDICATOR = ML_va_set(VA0, I, X, &VA);
+ SUCCESS_INDICATOR = ML_va_set_dolock(VA0, I, X, &VA);
").
:- pragma foreign_proc("Java",
@@ -561,7 +547,7 @@ resize(N, X, VA, resize(VA, N, X)).
[will_not_call_mercury, promise_pure, will_not_modify_trail,
does_not_affect_liveness],
"
- VA = ML_va_rewind(VA0);
+ VA = ML_va_rewind_dolock(VA0);
").
:- pragma foreign_proc("Java",
@@ -590,6 +576,9 @@ struct ML_va {
MR_ArrayPtr array; /* Valid if index == -1 */
ML_va_ptr next; /* Valid if index >= 0 */
} rest;
+#ifdef MR_THREAD_SAFE
+ MercuryLock *lock; /* NULL or lock */
+#endif
};
/*
@@ -600,21 +589,25 @@ extern ML_va_ptr ML_va_get_latest(ML_va_ptr VA);
/*
** Returns the number of items in a version array.
*/
-extern MR_Integer ML_va_size(ML_va_ptr);
+extern MR_Integer ML_va_size_dolock(ML_va_ptr);
+static MR_Integer ML_va_size(ML_va_ptr);
/*
** If I is in range then ML_va_get(VA, I, &X) sets X to the Ith item
** in VA (counting from zero) and returns MR_TRUE. Otherwise it
** returns MR_FALSE.
*/
-extern int ML_va_get(ML_va_ptr, MR_Integer, MR_Word *);
+extern int ML_va_get_dolock(ML_va_ptr, MR_Integer, MR_Word *);
+static int ML_va_get(ML_va_ptr VA, MR_Integer I, MR_Word *Xptr);
/*
** If I is in range then ML_va_set(VA0, I, X, VA) sets VA to be VA0
** updated with the Ith item as X (counting from zero) and
** returns MR_TRUE. Otherwise it returns MR_FALSE.
*/
-extern int ML_va_set(ML_va_ptr, MR_Integer, MR_Word, ML_va_ptr *);
+extern int ML_va_set_dolock(ML_va_ptr, MR_Integer, MR_Word,
+ ML_va_ptr *);
+static int ML_va_set(ML_va_ptr, MR_Integer, MR_Word, ML_va_ptr *);
/*
** Create a copy of VA0 as a new array.
@@ -631,7 +624,14 @@ static void ML_va_rewind_into(ML_va_ptr
VA, const ML_va_ptr VA0);
** `Rewinds' a version array, invalidating all extant successors
** including the argument.
*/
-extern ML_va_ptr ML_va_rewind(ML_va_ptr);
+extern ML_va_ptr ML_va_rewind_dolock(ML_va_ptr);
+static ML_va_ptr ML_va_rewind(ML_va_ptr VA);
+
+ /*
+ ** Resize a version array.
+ */
+extern ML_va_ptr ML_va_resize_dolock(ML_va_ptr, MR_Integer, MR_Word);
+static ML_va_ptr ML_va_resize(ML_va_ptr, MR_Integer, MR_Word);
").
@@ -639,6 +639,25 @@ extern ML_va_ptr ML_va_rewind(ML_va_ptr);
#define ML_va_latest_version(VA) ((VA)->index == -1)
+#ifdef MR_THREAD_SAFE
+ #define ML_maybe_lock(lock) \
+ do { \
+ if (lock) { \
+ MR_LOCK(lock, ""ML_maybe_lock""); \
+ } \
+ } while (0)
+
+ #define ML_maybe_unlock(lock) \
+ do { \
+ if (lock) { \
+ MR_UNLOCK(lock, ""ML_maybe_unlock""); \
+ } \
+ } while (0)
+#else
+ #define ML_maybe_lock(lock) ((void) 0)
+ #define ML_maybe_unlock(lock) ((void) 0)
+#endif
+
ML_va_ptr
ML_va_get_latest(ML_va_ptr VA)
{
@@ -650,6 +669,23 @@ ML_va_get_latest(ML_va_ptr VA)
}
MR_Integer
+ML_va_size_dolock(ML_va_ptr VA)
+{
+#ifdef MR_THREAD_SAFE
+ MercuryLock *lock = VA->lock;
+#endif
+ MR_Integer size;
+
+ ML_maybe_lock(lock);
+
+ size = ML_va_size(VA);
+
+ ML_maybe_unlock(lock);
+
+ return size;
+}
+
+static MR_Integer
ML_va_size(ML_va_ptr VA)
{
VA = ML_va_get_latest(VA);
@@ -658,6 +694,23 @@ ML_va_size(ML_va_ptr VA)
}
int
+ML_va_get_dolock(ML_va_ptr VA, MR_Integer I, MR_Word *Xptr)
+{
+#ifdef MR_THREAD_SAFE
+ MercuryLock *lock = VA->lock;
+#endif
+ int ret;
+
+ ML_maybe_lock(lock);
+
+ ret = ML_va_get(VA, I, Xptr);
+
+ ML_maybe_unlock(lock);
+
+ return ret;
+}
+
+static int
ML_va_get(ML_va_ptr VA, MR_Integer I, MR_Word *Xptr)
{
while (!ML_va_latest_version(VA)) {
@@ -678,6 +731,23 @@ ML_va_get(ML_va_ptr VA, MR_Integer I, MR_Word *Xptr)
}
int
+ML_va_set_dolock(ML_va_ptr VA0, MR_Integer I, MR_Word X, ML_va_ptr *VAptr)
+{
+#ifdef MR_THREAD_SAFE
+ MercuryLock *lock = VA0->lock;
+#endif
+ int ret;
+
+ ML_maybe_lock(lock);
+
+ ret = ML_va_set(VA0, I, X, VAptr);
+
+ ML_maybe_unlock(lock);
+
+ return ret;
+}
+
+static int
ML_va_set(ML_va_ptr VA0, MR_Integer I, MR_Word X, ML_va_ptr *VAptr)
{
ML_va_ptr VA1;
@@ -691,6 +761,9 @@ ML_va_set(ML_va_ptr VA0, MR_Integer I, MR_Word X,
ML_va_ptr *VAptr)
VA1->index = -1;
VA1->value = (MR_Word) NULL;
VA1->rest.array = VA0->rest.array;
+#ifdef MR_THREAD_SAFE
+ VA1->lock = VA0->lock;
+#endif
VA0->index = I;
VA0->value = VA0->rest.array->elements[I];
@@ -732,6 +805,15 @@ ML_va_flat_copy(const ML_va_ptr VA0)
VA->rest.array->elements[i] = latest->rest.array->elements[i];
}
+#ifdef MR_THREAD_SAFE
+ if (VA0->lock != NULL) {
+ VA->lock = MR_GC_NEW(MercuryLock);
+ pthread_mutex_init(VA->lock, MR_MUTEX_ATTR);
+ } else {
+ VA->lock = NULL;
+ }
+#endif
+
ML_va_rewind_into(VA, VA0);
return VA;
@@ -757,6 +839,21 @@ ML_va_rewind_into(ML_va_ptr VA, const ML_va_ptr VA0)
}
ML_va_ptr
+ML_va_rewind_dolock(ML_va_ptr VA)
+{
+#ifdef MR_THREAD_SAFE
+ MercuryLock *lock = VA->lock;
+#endif
+ ML_maybe_lock(lock);
+
+ VA = ML_va_rewind(VA);
+
+ ML_maybe_unlock(lock);
+
+ return VA;
+}
+
+static ML_va_ptr
ML_va_rewind(ML_va_ptr VA)
{
MR_Integer I;
@@ -774,6 +871,65 @@ ML_va_rewind(ML_va_ptr VA)
return VA;
}
+ML_va_ptr
+ML_va_resize_dolock(ML_va_ptr VA0, MR_Integer N, MR_Word X)
+{
+#ifdef MR_THREAD_SAFE
+ MercuryLock *lock = VA0->lock;
+#endif
+ ML_va_ptr VA;
+
+ ML_maybe_lock(lock);
+
+ VA = ML_va_resize(VA0, N, X);
+
+ ML_maybe_unlock(lock);
+
+ return VA;
+}
+
+static ML_va_ptr
+ML_va_resize(ML_va_ptr VA0, MR_Integer N, MR_Word X)
+{
+ ML_va_ptr latest;
+ ML_va_ptr VA;
+ MR_Integer i;
+ MR_Integer size_VA0;
+ MR_Integer min;
+
+ latest = ML_va_get_latest(VA0);
+
+ size_VA0 = ML_va_size(latest);
+ min = (N <= size_VA0 ? N : size_VA0);
+ VA = MR_GC_NEW(struct ML_va);
+
+ VA->index = -1;
+ VA->value = (MR_Word) NULL;
+ VA->rest.array = (MR_ArrayPtr) MR_GC_NEW_ARRAY(MR_Word, N + 1);
+ VA->rest.array->size = N;
+
+ for (i = 0; i < min; i++) {
+ VA->rest.array->elements[i] = latest->rest.array->elements[i];
+ }
+
+#ifdef MR_THREAD_SAFE
+ if (VA0->lock != NULL) {
+ VA->lock = MR_GC_NEW(MercuryLock);
+ pthread_mutex_init(VA->lock, MR_MUTEX_ATTR);
+ } else {
+ VA->lock = NULL;
+ }
+#endif
+
+ ML_va_rewind_into(VA, VA0);
+
+ for (i = min; i < N; i++) {
+ VA->rest.array->elements[i] = X;
+ }
+
+ return VA;
+}
+
").
:- pragma foreign_code("Java", "
@@ -870,7 +1026,6 @@ public static class ML_uva implements ML_va {
va.value = null;
va.rest = new Object[0];
return va;
-
}
public static ML_uva init(int N, Object X) {
diff --git a/library/version_hash_table.m b/library/version_hash_table.m
index 3fb1450..c965282 100644
--- a/library/version_hash_table.m
+++ b/library/version_hash_table.m
@@ -49,12 +49,33 @@
:- func new(hash_pred(K)::in(hash_pred), int::in, float::in) =
(version_hash_table(K, V)::out) is det.
+ % unsafe_new(HashPred, N, MaxOccupancy)
+ %
+ % Like new/3, but the constructed hash table is backed by a non-thread safe
+ % version array. It is unsafe to concurrently access or update the hash
+ % table from different threads, or any two hash tables which were produced
+ % from operations on the same original hash table.
+ % However, if the hash table or its descendents will not be used in such a
+ % manner, a non-thread safe hash table can be much faster than a thread
+ % safe one.
+ %
+:- func unsafe_new(hash_pred(K)::in(hash_pred), int::in, float::in) =
+ (version_hash_table(K, V)::out) is det.
+
% new_default(HashFn) constructs a hash table with default size and
% occupancy arguments.
%
:- func new_default(hash_pred(K)::in(hash_pred)) =
(version_hash_table(K, V)::out) is det.
+ % unsafe_new_default(HashFn)
+ %
+ % Like new_default/3 but the constructed hash table is backed by a
+ % non-thread safe version array. See the description of unsafe_new/3 above.
+ %
+:- func unsafe_new_default(hash_pred(K)::in(hash_pred)) =
+ (version_hash_table(K, V)::out) is det.
+
% Retrieve the hash_pred associated with a hash table.
%
% :- func hash_pred(version_hash_table(K, V)) = hash_pred(K).
@@ -180,7 +201,14 @@
%-----------------------------------------------------------------------------%
-new(HashPred, N, MaxOccupancy) = HT :-
+new(HashPred, N, MaxOccupancy) = new_2(HashPred, N, MaxOccupancy, yes).
+
+unsafe_new(HashPred, N, MaxOccupancy) = new_2(HashPred, N, MaxOccupancy, no).
+
+:- func new_2(hash_pred(K)::in(hash_pred), int::in, float::in, bool::in) =
+ (version_hash_table(K, V)::out) is det.
+
+new_2(HashPred, N, MaxOccupancy, NeedSafety) = HT :-
( if N =< 0 then
throw(software_error("version_hash_table.new_hash_table: N =< 0"))
else if N >= int.bits_per_int then
@@ -190,10 +218,16 @@ new(HashPred, N, MaxOccupancy) = HT :-
throw(software_error(
"version_hash_table.new: MaxOccupancy =< 0.0"))
else
- NumBuckets = 1 << N,
+ NumBuckets = 1 << N,
MaxOccupants = ceiling_to_int(float(NumBuckets) * MaxOccupancy),
- Buckets = init(NumBuckets, []),
- HT = ht(0, MaxOccupants, HashPred, Buckets)
+ (
+ NeedSafety = yes,
+ Buckets = version_array.new(NumBuckets, [])
+ ;
+ NeedSafety = no,
+ Buckets = version_array.unsafe_new(NumBuckets, [])
+ ),
+ HT = ht(0, MaxOccupants, HashPred, Buckets)
).
%-----------------------------------------------------------------------------%
@@ -202,6 +236,8 @@ new(HashPred, N, MaxOccupancy) = HT :-
%
new_default(HashPred) = new(HashPred, 7, 0.9).
+unsafe_new_default(HashPred) = unsafe_new(HashPred, 7, 0.9).
+
%-----------------------------------------------------------------------------%
num_buckets(HT) = size(HT ^ buckets).
--------------------------------------------------------------------------
mercury-reviews mailing list
Post messages to: mercury-reviews at csse.unimelb.edu.au
Administrative Queries: owner-mercury-reviews at csse.unimelb.edu.au
Subscriptions: mercury-reviews-request at csse.unimelb.edu.au
--------------------------------------------------------------------------
More information about the reviews
mailing list