[m-rev.] for review: Make MercuryThreadPool notice when a thread blocks on a sempahore.
Paul Bone
paul at bone.id.au
Thu Jul 17 13:52:56 AEST 2014
For review by anyone.
Branches: master
---
Make MercuryThreadPool notice when a thread blocks on a sempahore.
Regarding the thread pooling in the Java backend.
If threads are blocked while there is work in the queue extra threads may be
spawned to keep the processors busy.
java/runtime/Semaphore.java:
Wrap Java's Semaphore class which call the current thread's blocked()
and running() methods when a thread blocks and then runs after being
blocked.
library/thread.semaphore.m:
Use our own Semaphore class.
java/runtime/MercuryThread.java:
java/runtime/MercuryWorkerThread.java:
Define blocked() and running() on our threads.
java/runtime/MercuryThreadPool.java:
java/runtime/ThreadStatus.java:
Define and use the BLOCKED status.
---
java/runtime/MercuryThread.java | 26 ++++++
java/runtime/MercuryThreadPool.java | 126 +++++++++++++++++-----------
java/runtime/MercuryWorkerThread.java | 10 +++
java/runtime/Semaphore.java | 150 ++++++++++++++++++++++++++++++++++
java/runtime/ThreadStatus.java | 3 +-
library/thread.semaphore.m | 4 +-
6 files changed, 270 insertions(+), 49 deletions(-)
create mode 100644 java/runtime/Semaphore.java
diff --git a/java/runtime/MercuryThread.java b/java/runtime/MercuryThread.java
index 0e3b37f..80e8dc5 100644
--- a/java/runtime/MercuryThread.java
+++ b/java/runtime/MercuryThread.java
@@ -24,5 +24,31 @@ public abstract class MercuryThread extends Thread
this.id = id;
}
+ /**
+ * The thread has become blocked.
+ */
+ public abstract void blocked();
+
+ /**
+ * The thread is unblocked and is now running again.
+ */
+ public abstract void running();
+
+ /**
+ * If the current thread is a MercuryThread then return a reference to
+ * it.
+ */
+ public static MercuryThread currentThread()
+ {
+ Thread thread;
+
+ thread = Thread.currentThread();
+ if (thread instanceof MercuryThread) {
+ return (MercuryThread)thread;
+ } else {
+ return null;
+ }
+ }
+
}
diff --git a/java/runtime/MercuryThreadPool.java b/java/runtime/MercuryThreadPool.java
index c72e34c..67877c4 100644
--- a/java/runtime/MercuryThreadPool.java
+++ b/java/runtime/MercuryThreadPool.java
@@ -24,9 +24,6 @@ import java.util.*;
* new threads may be created to avoid deadlocks and attempt to keep all
* processors busy.
*
- * TODO: Currently the thread pool does not know when a thread is blocked by
- * a barrier, channel, mvar or semaphore.
- *
* TODO: Currently the thread pool does not know when a thread is blocked in
* foreign code or performing IO.
*
@@ -79,6 +76,7 @@ public class MercuryThreadPool
*/
private int num_threads_working;
private int num_threads_waiting;
+ private int num_threads_blocked;
private int num_threads_other;
private LinkedList<MercuryThread> threads;
private Lock threads_lock;
@@ -123,6 +121,7 @@ public class MercuryThreadPool
thread_pool_size = size;
num_threads_working = 0;
num_threads_waiting = 0;
+ num_threads_blocked = 0;
num_threads_other = 0;
threads = new LinkedList<MercuryThread>();
threads_lock = new ReentrantLock();
@@ -238,6 +237,9 @@ public class MercuryThreadPool
case IDLE:
num_threads_waiting--;
break;
+ case BLOCKED:
+ num_threads_blocked--;
+ break;
case OTHER:
num_threads_other--;
break;
@@ -252,6 +254,9 @@ public class MercuryThreadPool
case IDLE:
num_threads_waiting++;
break;
+ case BLOCKED:
+ num_threads_blocked++;
+ break;
case OTHER:
num_threads_other++;
break;
@@ -262,10 +267,12 @@ public class MercuryThreadPool
threads_lock.unlock();
}
- if ((new_ == ThreadStatus.IDLE) ||
- (new_ == ThreadStatus.OTHER))
- {
- signalMainLoop();
+ switch (new_) {
+ case BLOCKED:
+ signalMainLoop();
+ break;
+ default:
+ break;
}
}
@@ -281,6 +288,9 @@ public class MercuryThreadPool
case IDLE:
num_threads_waiting--;
break;
+ case BLOCKED:
+ num_threads_blocked--;
+ break;
case OTHER:
num_threads_other--;
break;
@@ -333,15 +343,33 @@ public class MercuryThreadPool
}
/**
+ * Warm up the thread pool by starting some initial threads (currently
+ * one).
+ */
+ protected void startupInitialThreads()
+ {
+ MercuryWorkerThread t = thread_factory.newWorkerThread();
+
+ threads_lock.lock();
+ try {
+ threads.add(t);
+ num_threads_other++;
+ } finally {
+ threads_lock.unlock();
+ }
+
+ t.start();
+ }
+
+ /**
* Check threads.
* Chacks the numbers and status of the worker threads and starts more
* threads if required.
- * @return The number of currently working/blocked threads.
*/
- protected int checkThreads()
+ protected void checkThreads()
{
int num_new_threads;
- int num_working_blocked_threads;
+ int num_tasks_waiting;
List<MercuryWorkerThread> new_threads =
new LinkedList<MercuryWorkerThread>();
@@ -354,43 +382,54 @@ public class MercuryThreadPool
thread_pool_size = user_specified_size > 0 ? user_specified_size :
Runtime.getRuntime().availableProcessors();
- /*
- * If we have fewer than the default number of threads then start
- * some new threads.
- */
- threads_lock.lock();
+ tasks_lock.lock();
try {
- int num_threads = num_threads_working + num_threads_waiting +
- num_threads_other;
- num_working_blocked_threads = num_threads_working +
- num_threads_other;
- num_new_threads = thread_pool_size - num_threads;
- if (num_new_threads > 0) {
+ num_tasks_waiting = tasks.size();
+ } finally {
+ tasks_lock.unlock();
+ }
+
+ if (num_tasks_waiting > 0) {
+ /*
+ * If we have fewer than the default number of threads then
+ * start some new threads.
+ */
+ threads_lock.lock();
+ try {
+ int num_threads = num_threads_working + num_threads_waiting +
+ num_threads_blocked + num_threads_other;
+ int num_threads_limit = thread_pool_size +
+ num_threads_blocked;
+ // Determine the number of new threads that we want.
+ num_new_threads = num_tasks_waiting - num_threads_other -
+ num_threads_waiting;
+ if (num_new_threads + num_threads > num_threads_limit) {
+ /*
+ * The number of threads that we want, plus the number
+ * we already have, exceeds the number that we're
+ * allowed to have.
+ */
+ num_new_threads = num_threads_limit - num_threads;
+ }
+
for (int i = 0; i < num_new_threads; i++) {
MercuryWorkerThread t = thread_factory.newWorkerThread();
new_threads.add(t);
threads.add(t);
+ num_threads_other++;
}
- num_threads = thread_pool_size;
+ } finally {
+ threads_lock.unlock();
}
- num_threads_other += num_new_threads;
- } finally {
- threads_lock.unlock();
- }
- /*
- * Start the threads while we're not holding the lock, this makes
- * the above critical section smaller.
- */
- for (MercuryWorkerThread t : new_threads) {
- t.start();
+ /*
+ * Start the threads while we're not holding the lock, this makes
+ * the above critical section smaller.
+ */
+ for (MercuryWorkerThread t : new_threads) {
+ t.start();
+ }
}
-
- /*
- * If there are too many threads then superflous threads will
- * shutdown when they try to get a new task.
- */
- return num_working_blocked_threads;
}
/**
@@ -422,7 +461,6 @@ public class MercuryThreadPool
public void run()
{
boolean done = false;
- int num_working_blocked_threads;
long num_tasks_submitted;
long num_tasks_completed;
boolean tasks_locked = false;
@@ -444,7 +482,7 @@ public class MercuryThreadPool
* Start new threads if we have fewer than the
* thread_pool_size
*/
- num_working_blocked_threads = checkThreads();
+ checkThreads();
/*
* Acquire the main loop lock while we're still
@@ -519,6 +557,7 @@ public class MercuryThreadPool
public void runMain(Runnable run_main)
{
Task main_task = new Task(run_main);
+ startupInitialThreads();
submit(main_task);
try {
/*
@@ -555,12 +594,7 @@ public class MercuryThreadPool
* @param runnable The task the new thread should execute.
*/
public MercuryThread newThread(final Runnable runnable) {
- return new MercuryThread("Mercury Thread", allocateThreadId())
- {
- public void run() {
- runnable.run();
- }
- };
+ return new NativeThread(allocateThreadId(), runnable);
}
public MercuryWorkerThread newWorkerThread() {
diff --git a/java/runtime/MercuryWorkerThread.java b/java/runtime/MercuryWorkerThread.java
index b0aa0a2..40f0182 100644
--- a/java/runtime/MercuryWorkerThread.java
+++ b/java/runtime/MercuryWorkerThread.java
@@ -88,5 +88,15 @@ public class MercuryWorkerThread extends MercuryThread
pool.updateThreadCounts(status, ThreadStatus.OTHER);
status = ThreadStatus.OTHER;
}
+
+ public void blocked() {
+ pool.updateThreadCounts(status, ThreadStatus.BLOCKED);
+ status = ThreadStatus.BLOCKED;
+ }
+
+ public void running() {
+ pool.updateThreadCounts(status, ThreadStatus.WORKING);
+ status = ThreadStatus.WORKING;
+ }
}
diff --git a/java/runtime/Semaphore.java b/java/runtime/Semaphore.java
new file mode 100644
index 0000000..823a919
--- /dev/null
+++ b/java/runtime/Semaphore.java
@@ -0,0 +1,150 @@
+//
+// Copyright (C) 2014 The Mercury Team
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class Semaphore
+ extends java.util.concurrent.Semaphore
+{
+ public Semaphore(int permits, boolean fair)
+ {
+ super(permits, fair);
+ }
+
+ public Semaphore(int permits)
+ {
+ super(permits);
+ }
+
+ public void aquire()
+ throws InterruptedException
+ {
+ aquire(1);
+ }
+
+ public void aquire(int permits)
+ throws InterruptedException
+ {
+ boolean blocked = false;
+
+ try {
+ if (tryAcquire(permits, 0, TimeUnit.SECONDS)) {
+ return;
+ } else {
+ // This thread will (probably) block.
+ blocked();
+ blocked = true;
+ super.acquire(permits);
+ }
+ } finally {
+ if (blocked) {
+ running();
+ // The thread isn't blocked anymore.
+ }
+ }
+ }
+
+ public void aquireUninterruptably()
+ {
+ aquireUninterruptably(1);
+ }
+
+ public void aquireUninterruptably(int permits)
+ {
+ boolean blocked = false;
+ boolean interrupted_once = false;
+ boolean not_interrupted_once = false;
+ boolean success;
+
+ // Avoid a warning with the loop below
+ success = false;
+
+ /*
+ * tryAcquire is interruptable so we keep trying until we've
+ * executed it at least once and it was not interrupted. We also
+ * track if we were interrupted so we can raise this condition
+ * again.
+ */
+ do {
+ try {
+ success = tryAcquire(permits, 0, TimeUnit.SECONDS);
+ not_interrupted_once = true;
+ } catch (InterruptedException e) {
+ interrupted_once = true;
+ }
+ } while (not_interrupted_once);
+
+ if (!success) {
+ // This thread will (probably) block because tryAcquire failed.
+ blocked();
+ blocked = true;
+ super.acquireUninterruptibly(permits);
+ }
+ if (blocked) {
+ running();
+ // The thread isn't blocked anymore.
+ }
+
+ if (interrupted_once) {
+ // Raise the interrupted condition.
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public boolean tryAcquire(long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ return tryAcquire(1, timeout, unit);
+ }
+
+ public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ if (timeout < 1) {
+ return super.tryAcquire(permits, timeout, unit);
+ } else {
+ boolean result;
+
+ result = tryAcquire(permits, 0, unit);
+ if (result) {
+ // Blocking wasn't necessary
+ return true;
+ } else {
+ // Block.
+ blocked();
+ try {
+ return tryAcquire(permits, timeout, unit);
+ } finally {
+ running();
+ }
+ }
+ }
+ }
+
+ protected void blocked()
+ {
+ MercuryThread mer_thread;
+
+ mer_thread = MercuryThread.currentThread();
+ if (mer_thread != null) {
+ mer_thread.blocked();
+ }
+ }
+
+ protected void running()
+ {
+ MercuryThread mer_thread;
+
+ mer_thread = MercuryThread.currentThread();
+ if (mer_thread != null) {
+ mer_thread.running();
+ }
+ }
+}
+
diff --git a/java/runtime/ThreadStatus.java b/java/runtime/ThreadStatus.java
index 4ba0294..78512fc 100644
--- a/java/runtime/ThreadStatus.java
+++ b/java/runtime/ThreadStatus.java
@@ -9,6 +9,7 @@ package jmercury.runtime;
public enum ThreadStatus {
WORKING,
IDLE,
- OTHER
+ OTHER,
+ BLOCKED
}
diff --git a/library/thread.semaphore.m b/library/thread.semaphore.m
index 7058105..46c53e2 100644
--- a/library/thread.semaphore.m
+++ b/library/thread.semaphore.m
@@ -97,7 +97,7 @@ public class ML_Semaphore {
"class [mercury]mercury.thread.semaphore__csharp_code.mercury_code.ML_Semaphore").
:- pragma foreign_type("C#", semaphore, "thread__semaphore.ML_Semaphore").
:- pragma foreign_type("Erlang", semaphore, "").
-:- pragma foreign_type("Java", semaphore, "java.util.concurrent.Semaphore").
+:- pragma foreign_type("Java", semaphore, "jmercury.runtime.Semaphore").
:- pragma foreign_decl("C", "
extern void
@@ -153,7 +153,7 @@ init(Semaphore, !IO) :-
init(Count::in) = (Semaphore::uo),
[will_not_call_mercury, thread_safe],
"
- Semaphore = new java.util.concurrent.Semaphore(Count);
+ Semaphore = new jmercury.runtime.Semaphore(Count);
").
:- pragma foreign_code("C", "
--
2.0.0
More information about the reviews
mailing list