[m-rev.] for review: Make MercuryThreadPool notice when a thread blocks on a sempahore.

Paul Bone paul at bone.id.au
Thu Jul 17 13:52:56 AEST 2014


For review by anyone.

Branches: master

---

Make MercuryThreadPool notice when a thread blocks on a sempahore.

Regarding the thread pooling in the Java backend.

If threads are blocked while there is work in the queue extra threads may be
spawned to keep the processors busy.

java/runtime/Semaphore.java:
    Wrap Java's Semaphore class which call the current thread's blocked()
    and running() methods when a thread blocks and then runs after being
    blocked.

library/thread.semaphore.m:
    Use our own Semaphore class.

java/runtime/MercuryThread.java:
java/runtime/MercuryWorkerThread.java:
    Define blocked() and running() on our threads.

java/runtime/MercuryThreadPool.java:
java/runtime/ThreadStatus.java:
    Define and use the BLOCKED status.
---
 java/runtime/MercuryThread.java       |  26 ++++++
 java/runtime/MercuryThreadPool.java   | 126 +++++++++++++++++-----------
 java/runtime/MercuryWorkerThread.java |  10 +++
 java/runtime/Semaphore.java           | 150 ++++++++++++++++++++++++++++++++++
 java/runtime/ThreadStatus.java        |   3 +-
 library/thread.semaphore.m            |   4 +-
 6 files changed, 270 insertions(+), 49 deletions(-)
 create mode 100644 java/runtime/Semaphore.java

diff --git a/java/runtime/MercuryThread.java b/java/runtime/MercuryThread.java
index 0e3b37f..80e8dc5 100644
--- a/java/runtime/MercuryThread.java
+++ b/java/runtime/MercuryThread.java
@@ -24,5 +24,31 @@ public abstract class MercuryThread extends Thread
         this.id = id;
     }
 
+    /**
+     * The thread has become blocked.
+     */
+    public abstract void blocked();
+
+    /**
+     * The thread is unblocked and is now running again.
+     */
+    public abstract void running();
+
+    /**
+     * If the current thread is a MercuryThread then return a reference to
+     * it.
+     */
+    public static MercuryThread currentThread()
+    {
+        Thread          thread;
+
+        thread = Thread.currentThread();
+        if (thread instanceof MercuryThread) {
+            return (MercuryThread)thread;
+        } else {
+            return null;
+        }
+    }
+
 }
 
diff --git a/java/runtime/MercuryThreadPool.java b/java/runtime/MercuryThreadPool.java
index c72e34c..67877c4 100644
--- a/java/runtime/MercuryThreadPool.java
+++ b/java/runtime/MercuryThreadPool.java
@@ -24,9 +24,6 @@ import java.util.*;
  * new threads may be created to avoid deadlocks and attempt to keep all
  * processors busy.
  *
- * TODO: Currently the thread pool does not know when a thread is blocked by
- * a barrier, channel, mvar or semaphore.
- *
  * TODO: Currently the thread pool does not know when a thread is blocked in
  * foreign code or performing IO.
  *
@@ -79,6 +76,7 @@ public class MercuryThreadPool
      */
     private int                         num_threads_working;
     private int                         num_threads_waiting;
+    private int                         num_threads_blocked;
     private int                         num_threads_other;
     private LinkedList<MercuryThread>   threads;
     private Lock                        threads_lock;
@@ -123,6 +121,7 @@ public class MercuryThreadPool
         thread_pool_size = size;
         num_threads_working = 0;
         num_threads_waiting = 0;
+        num_threads_blocked = 0;
         num_threads_other = 0;
         threads = new LinkedList<MercuryThread>();
         threads_lock = new ReentrantLock();
@@ -238,6 +237,9 @@ public class MercuryThreadPool
                 case IDLE:
                     num_threads_waiting--;
                     break;
+                case BLOCKED:
+                    num_threads_blocked--;
+                    break;
                 case OTHER:
                     num_threads_other--;
                     break;
@@ -252,6 +254,9 @@ public class MercuryThreadPool
                 case IDLE:
                     num_threads_waiting++;
                     break;
+                case BLOCKED:
+                    num_threads_blocked++;
+                    break;
                 case OTHER:
                     num_threads_other++;
                     break;
@@ -262,10 +267,12 @@ public class MercuryThreadPool
             threads_lock.unlock();
         }
 
-        if ((new_ == ThreadStatus.IDLE) ||
-            (new_ == ThreadStatus.OTHER))
-        {
-            signalMainLoop();
+        switch (new_) {
+            case BLOCKED:
+                signalMainLoop();
+                break;
+            default:
+                break;
         }
     }
 
@@ -281,6 +288,9 @@ public class MercuryThreadPool
                 case IDLE:
                     num_threads_waiting--;
                     break;
+                case BLOCKED:
+                    num_threads_blocked--;
+                    break;
                 case OTHER:
                     num_threads_other--;
                     break;
@@ -333,15 +343,33 @@ public class MercuryThreadPool
     }
 
     /**
+     * Warm up the thread pool by starting some initial threads (currently
+     * one).
+     */
+    protected void startupInitialThreads()
+    {
+        MercuryWorkerThread t = thread_factory.newWorkerThread();
+
+        threads_lock.lock();
+        try {
+            threads.add(t);
+            num_threads_other++;
+        } finally {
+            threads_lock.unlock();
+        }
+
+        t.start();
+    }
+
+    /**
      * Check threads.
      * Chacks the numbers and status of the worker threads and starts more
      * threads if required.
-     * @return The number of currently working/blocked threads.
      */
-    protected int checkThreads()
+    protected void checkThreads()
     {
         int num_new_threads;
-        int num_working_blocked_threads;
+        int num_tasks_waiting;
         List<MercuryWorkerThread> new_threads =
             new LinkedList<MercuryWorkerThread>();
 
@@ -354,43 +382,54 @@ public class MercuryThreadPool
         thread_pool_size = user_specified_size > 0 ? user_specified_size :
             Runtime.getRuntime().availableProcessors();
 
-        /*
-         * If we have fewer than the default number of threads then start
-         * some new threads.
-         */
-        threads_lock.lock();
+        tasks_lock.lock();
         try {
-            int num_threads = num_threads_working + num_threads_waiting +
-                num_threads_other;
-            num_working_blocked_threads = num_threads_working +
-                num_threads_other;
-            num_new_threads = thread_pool_size - num_threads;
-            if (num_new_threads > 0) {
+            num_tasks_waiting = tasks.size();
+        } finally {
+            tasks_lock.unlock();
+        }
+
+        if (num_tasks_waiting > 0) {
+            /*
+             * If we have fewer than the default number of threads then
+             * start some new threads.
+             */
+            threads_lock.lock();
+            try {
+                int num_threads = num_threads_working + num_threads_waiting +
+                    num_threads_blocked + num_threads_other;
+                int num_threads_limit = thread_pool_size +
+                    num_threads_blocked;
+                // Determine the number of new threads that we want.
+                num_new_threads = num_tasks_waiting - num_threads_other -
+                    num_threads_waiting;
+                if (num_new_threads + num_threads > num_threads_limit) {
+                    /*
+                     * The number of threads that we want, plus the number
+                     * we already have, exceeds the number that we're
+                     * allowed to have.
+                     */
+                    num_new_threads = num_threads_limit - num_threads;
+                }
+
                 for (int i = 0; i < num_new_threads; i++) {
                     MercuryWorkerThread t = thread_factory.newWorkerThread();
                     new_threads.add(t);
                     threads.add(t);
+                    num_threads_other++;
                 }
-                num_threads = thread_pool_size;
+            } finally {
+                threads_lock.unlock();
             }
-            num_threads_other += num_new_threads;
-        } finally {
-            threads_lock.unlock();
-        }
 
-        /*
-         * Start the threads while we're not holding the lock, this makes
-         * the above critical section smaller.
-         */
-        for (MercuryWorkerThread t : new_threads) {
-            t.start();
+            /*
+             * Start the threads while we're not holding the lock, this makes
+             * the above critical section smaller.
+             */
+            for (MercuryWorkerThread t : new_threads) {
+                t.start();
+            }
         }
-
-        /*
-         * If there are too many threads then superflous threads will
-         * shutdown when they try to get a new task.
-         */
-        return num_working_blocked_threads;
     }
 
     /**
@@ -422,7 +461,6 @@ public class MercuryThreadPool
     public void run()
     {
         boolean done = false;
-        int     num_working_blocked_threads;
         long    num_tasks_submitted;
         long    num_tasks_completed;
         boolean tasks_locked = false;
@@ -444,7 +482,7 @@ public class MercuryThreadPool
                          * Start new threads if we have fewer than the
                          * thread_pool_size
                          */
-                        num_working_blocked_threads = checkThreads();
+                        checkThreads();
                     
                         /*
                          * Acquire the main loop lock while we're still
@@ -519,6 +557,7 @@ public class MercuryThreadPool
     public void runMain(Runnable run_main)
     {
         Task main_task = new Task(run_main);
+        startupInitialThreads();
         submit(main_task);
         try {
             /*
@@ -555,12 +594,7 @@ public class MercuryThreadPool
          * @param runnable The task the new thread should execute.
          */
         public MercuryThread newThread(final Runnable runnable) {
-            return new MercuryThread("Mercury Thread", allocateThreadId())
-                {
-                    public void run() {
-                        runnable.run();
-                    }
-                };
+            return new NativeThread(allocateThreadId(), runnable);
         }
 
         public MercuryWorkerThread newWorkerThread() {
diff --git a/java/runtime/MercuryWorkerThread.java b/java/runtime/MercuryWorkerThread.java
index b0aa0a2..40f0182 100644
--- a/java/runtime/MercuryWorkerThread.java
+++ b/java/runtime/MercuryWorkerThread.java
@@ -88,5 +88,15 @@ public class MercuryWorkerThread extends MercuryThread
         pool.updateThreadCounts(status, ThreadStatus.OTHER);
         status = ThreadStatus.OTHER;
     }
+
+    public void blocked() {
+        pool.updateThreadCounts(status, ThreadStatus.BLOCKED);
+        status = ThreadStatus.BLOCKED;
+    }
+
+    public void running() {
+        pool.updateThreadCounts(status, ThreadStatus.WORKING);
+        status = ThreadStatus.WORKING;
+    }
 }
 
diff --git a/java/runtime/Semaphore.java b/java/runtime/Semaphore.java
new file mode 100644
index 0000000..823a919
--- /dev/null
+++ b/java/runtime/Semaphore.java
@@ -0,0 +1,150 @@
+//
+// Copyright (C) 2014 The Mercury Team
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class Semaphore
+    extends java.util.concurrent.Semaphore
+{
+    public Semaphore(int permits, boolean fair)
+    {
+        super(permits, fair);
+    }
+
+    public Semaphore(int permits)
+    {
+        super(permits);
+    }
+
+    public void aquire()
+        throws InterruptedException
+    {
+        aquire(1);
+    }
+
+    public void aquire(int permits)
+        throws InterruptedException
+    {
+        boolean blocked = false;
+
+        try {
+            if (tryAcquire(permits, 0, TimeUnit.SECONDS)) {
+                return;
+            } else {
+                // This thread will (probably) block.
+                blocked();
+                blocked = true;
+                super.acquire(permits);
+            }
+        } finally {
+            if (blocked) {
+                running();
+                // The thread isn't blocked anymore.
+            }
+        }
+    }
+
+    public void aquireUninterruptably()
+    {
+        aquireUninterruptably(1);
+    }
+
+    public void aquireUninterruptably(int permits)
+    {
+        boolean blocked = false;
+        boolean interrupted_once = false;
+        boolean not_interrupted_once = false;
+        boolean success;
+
+        // Avoid a warning with the loop below
+        success = false;
+
+        /*
+         * tryAcquire is interruptable so we keep trying until we've
+         * executed it at least once and it was not interrupted.  We also
+         * track if we were interrupted so we can raise this condition
+         * again.
+         */
+        do {
+            try {
+                success = tryAcquire(permits, 0, TimeUnit.SECONDS);
+                not_interrupted_once = true;
+            } catch (InterruptedException e) {
+                interrupted_once = true;
+            }
+        } while (not_interrupted_once);
+
+        if (!success) {
+            // This thread will (probably) block because tryAcquire failed.
+            blocked();
+            blocked = true;
+            super.acquireUninterruptibly(permits);
+        }
+        if (blocked) {
+            running();
+            // The thread isn't blocked anymore.
+        }
+
+        if (interrupted_once) {
+            // Raise the interrupted condition.
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public boolean tryAcquire(long timeout, TimeUnit unit)
+        throws InterruptedException
+    {
+        return tryAcquire(1, timeout, unit);
+    }
+
+    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+        throws InterruptedException
+    {
+        if (timeout < 1) {
+            return super.tryAcquire(permits, timeout, unit);
+        } else {
+            boolean result;
+
+            result = tryAcquire(permits, 0, unit);
+            if (result) {
+                // Blocking wasn't necessary
+                return true;
+            } else {
+                // Block.
+                blocked();
+                try {
+                    return tryAcquire(permits, timeout, unit);
+                } finally {
+                    running();
+                }
+            }
+        }
+    }
+
+    protected void blocked()
+    {
+        MercuryThread   mer_thread;
+
+        mer_thread = MercuryThread.currentThread();
+        if (mer_thread != null) {
+            mer_thread.blocked();
+        }
+    }
+
+    protected void running()
+    {
+        MercuryThread   mer_thread;
+
+        mer_thread = MercuryThread.currentThread();
+        if (mer_thread != null) {
+            mer_thread.running();
+        }
+    }
+}
+
diff --git a/java/runtime/ThreadStatus.java b/java/runtime/ThreadStatus.java
index 4ba0294..78512fc 100644
--- a/java/runtime/ThreadStatus.java
+++ b/java/runtime/ThreadStatus.java
@@ -9,6 +9,7 @@ package jmercury.runtime;
 public enum ThreadStatus {
     WORKING,
     IDLE,
-    OTHER
+    OTHER,
+    BLOCKED
 }
 
diff --git a/library/thread.semaphore.m b/library/thread.semaphore.m
index 7058105..46c53e2 100644
--- a/library/thread.semaphore.m
+++ b/library/thread.semaphore.m
@@ -97,7 +97,7 @@ public class ML_Semaphore {
     "class [mercury]mercury.thread.semaphore__csharp_code.mercury_code.ML_Semaphore").
 :- pragma foreign_type("C#", semaphore, "thread__semaphore.ML_Semaphore").
 :- pragma foreign_type("Erlang", semaphore, "").
-:- pragma foreign_type("Java", semaphore, "java.util.concurrent.Semaphore").
+:- pragma foreign_type("Java", semaphore, "jmercury.runtime.Semaphore").
 
 :- pragma foreign_decl("C", "
 extern void
@@ -153,7 +153,7 @@ init(Semaphore, !IO) :-
     init(Count::in) = (Semaphore::uo),
     [will_not_call_mercury, thread_safe],
 "
-    Semaphore = new java.util.concurrent.Semaphore(Count);
+    Semaphore = new jmercury.runtime.Semaphore(Count);
 ").
 
 :- pragma foreign_code("C", "
-- 
2.0.0




More information about the reviews mailing list