[m-rev.] for review: Make MercuryThreadPool notice when a thread blocks on a sempahore.

Paul Bone paul at bone.id.au
Fri Oct 3 19:25:33 AEST 2014


On Fri, Aug 01, 2014 at 04:14:02PM +1000, Peter Wang wrote:
> Hi Paul,
> 
> On Thu, 17 Jul 2014 13:52:56 +1000, Paul Bone <paul at bone.id.au> wrote:
> > diff --git a/java/runtime/Semaphore.java b/java/runtime/Semaphore.java
> > new file mode 100644
> > index 0000000..823a919
> > --- /dev/null
> > +++ b/java/runtime/Semaphore.java
> > +
> > +    public void aquire()
> > +        throws InterruptedException
> > +    {
> > +        aquire(1);
> > +    }
> 
> "acquire" (multiple instances in this file)

I've fixed this and another misspelling and re-tested this code.  I've had
to make a number of changes so the diff of the code I'm committing follows.

> > +
> > +    public void aquireUninterruptably(int permits)
> > +    {
> > +        boolean blocked = false;
> > +        boolean interrupted_once = false;
> > +        boolean not_interrupted_once = false;
> > +        boolean success;
> 
> Why both interrupted_once and not_interrupted_once?
> 

I've taken a closer look and found a logic-inversion bug in the algorithm.
I've renamed not_interrupted_once keep_trying, because that's what I'm using
it for.

-- 
Paul Bone

>From a2879c6837a8f00307ccece69b1b4a2f00c755f7 Mon Sep 17 00:00:00 2001
From: Paul Bone <paul at bone.id.au>
Date: Fri, 3 Oct 2014 14:19:03 +1000
Subject: diff: Make MercuryThreadPool (java) notice when a thread blocks
 on a semaphore.

If threads are blocked while there is work in the queue extra threads may be
spawned to keep the processors busy.

Beginning now, tasks created with thread.spawn are use the thread pool.
(thread.spawn_native does not use the thread pool.)

java/runtime/Semaphore.java:
    Wrap Java's Semaphore class which call the current thread's blocked()
    and running() methods when a thread blocks and then runs after being
    blocked.

library/thread.semaphore.m:
    Use our own Semaphore class.

java/runtime/MercuryThread.java:
java/runtime/MercuryWorkerThread.java:
    Define blocked() and running() on our threads.

java/runtime/NativeThread.java:
    This class is used by spawn_native/4 and is required to define blocked()
    and running(), however it implements them as no-ops as it isn't included
    in the thread pool.

java/runtime/ThreadStatus.java:
    Define the BLOCKED status.

java/runtime/MercuryThreadPool.java:
    Count blocked threads seperatly and allow the creation of new threads
    when existing threads become blocked.

    Add some tracing code to help debug the thread management code.  This is
    disabled by default.

library/thread.m:
    Implement spawn for Java using the thread pool.  This was not enabled
    earlier because without using java/runtime/Semaphore.java it was
    possible to deadlock the system.

java/runtime/Task.java:
    Add some tracing code to debug thread state changes, this is disabled by
    default.
---
 java/runtime/MercuryThread.java       |  28 ++++++-
 java/runtime/MercuryThreadPool.java   | 140 ++++++++++++++++++++-----------
 java/runtime/MercuryWorkerThread.java |  10 +++
 java/runtime/NativeThread.java        |  42 ++++++++++
 java/runtime/Semaphore.java           | 150 ++++++++++++++++++++++++++++++++++
 java/runtime/Task.java                |   8 ++
 java/runtime/ThreadStatus.java        |   1 +
 library/thread.m                      |   2 +-
 library/thread.semaphore.m            |   4 +-
 9 files changed, 333 insertions(+), 52 deletions(-)
 create mode 100644 java/runtime/NativeThread.java
 create mode 100644 java/runtime/Semaphore.java

diff --git a/java/runtime/MercuryThread.java b/java/runtime/MercuryThread.java
index 0e3b37f..49a4b57 100644
--- a/java/runtime/MercuryThread.java
+++ b/java/runtime/MercuryThread.java
@@ -14,7 +14,7 @@ public abstract class MercuryThread extends Thread
     private int                 id;
 
     /**
-     * Construct a new MercuryThread with the given ID and runnable.
+     * Construct a new MercuryThread with the given ID.
      * @param name A string that identifies the type of thread.
      * @param id A numeric identifier (should be unique).
      */
@@ -24,5 +24,31 @@ public abstract class MercuryThread extends Thread
         this.id = id;
     }
 
+    /**
+     * The thread has become blocked.
+     */
+    public abstract void blocked();
+
+    /**
+     * The thread is unblocked and is now running again.
+     */
+    public abstract void running();
+
+    /**
+     * If the current thread is a MercuryThread then return a reference to
+     * it.
+     */
+    public static MercuryThread currentThread()
+    {
+        Thread          thread;
+
+        thread = Thread.currentThread();
+        if (thread instanceof MercuryThread) {
+            return (MercuryThread)thread;
+        } else {
+            return null;
+        }
+    }
+
 }
 
diff --git a/java/runtime/MercuryThreadPool.java b/java/runtime/MercuryThreadPool.java
index f1d6347..a08864b 100644
--- a/java/runtime/MercuryThreadPool.java
+++ b/java/runtime/MercuryThreadPool.java
@@ -24,9 +24,6 @@ import java.util.*;
  * new threads may be created to avoid deadlocks and attempt to keep all
  * processors busy.
  *
- * TODO: Currently the thread pool does not know when a thread is blocked by
- * a barrier, channel, mvar or semaphore.
- *
  * TODO: Currently the thread pool does not know when a thread is blocked in
  * foreign code or performing IO.
  *
@@ -50,6 +47,8 @@ import java.util.*;
 public class MercuryThreadPool
     implements Runnable
 {
+    public static final boolean         debug = false;
+
     private static MercuryThreadPool    instance;
 
     private MercuryThreadFactory        thread_factory;
@@ -80,6 +79,7 @@ public class MercuryThreadPool
      */
     private int                         num_threads_working;
     private volatile int                num_threads_waiting;
+    private int                         num_threads_blocked;
     private int                         num_threads_other;
     private LinkedList<MercuryThread>   threads;
     private Lock                        threads_lock;
@@ -111,6 +111,7 @@ public class MercuryThreadPool
         thread_pool_size = size;
         num_threads_working = 0;
         num_threads_waiting = 0;
+        num_threads_blocked = 0;
         num_threads_other = 0;
         threads = new LinkedList<MercuryThread>();
         threads_lock = new ReentrantLock();
@@ -236,6 +237,9 @@ public class MercuryThreadPool
                 case IDLE:
                     num_threads_waiting--;
                     break;
+                case BLOCKED:
+                    num_threads_blocked--;
+                    break;
                 case OTHER:
                     num_threads_other--;
                     break;
@@ -250,6 +254,9 @@ public class MercuryThreadPool
                 case IDLE:
                     num_threads_waiting++;
                     break;
+                case BLOCKED:
+                    num_threads_blocked++;
+                    break;
                 case OTHER:
                     num_threads_other++;
                     break;
@@ -260,10 +267,12 @@ public class MercuryThreadPool
             threads_lock.unlock();
         }
 
-        if ((new_ == ThreadStatus.IDLE) ||
-            (new_ == ThreadStatus.OTHER))
-        {
-            signalMainLoop();
+        switch (new_) {
+            case BLOCKED:
+                signalMainLoop();
+                break;
+            default:
+                break;
         }
     }
 
@@ -279,6 +288,9 @@ public class MercuryThreadPool
                 case IDLE:
                     num_threads_waiting--;
                     break;
+                case BLOCKED:
+                    num_threads_blocked--;
+                    break;
                 case OTHER:
                     num_threads_other--;
                     break;
@@ -331,15 +343,33 @@ public class MercuryThreadPool
     }
 
     /**
+     * Warm up the thread pool by starting some initial threads (currently
+     * one).
+     */
+    protected void startupInitialThreads()
+    {
+        MercuryWorkerThread t = thread_factory.newWorkerThread();
+
+        threads_lock.lock();
+        try {
+            threads.add(t);
+            num_threads_other++;
+        } finally {
+            threads_lock.unlock();
+        }
+
+        t.start();
+    }
+
+    /**
      * Check threads.
      * Checks the numbers and status of the worker threads and starts more
      * threads if required.
-     * @return The number of currently working/blocked threads.
      */
-    protected int checkThreads()
+    protected void checkThreads()
     {
         int num_new_threads;
-        int num_working_blocked_threads;
+        int num_tasks_waiting;
         List<MercuryWorkerThread> new_threads =
             new LinkedList<MercuryWorkerThread>();
 
@@ -352,43 +382,62 @@ public class MercuryThreadPool
         thread_pool_size = (user_specified_size > 0) ? user_specified_size :
             Runtime.getRuntime().availableProcessors();
 
-        /*
-         * If we have fewer than the default number of threads then start
-         * some new threads.
-         */
-        threads_lock.lock();
+        tasks_lock.lock();
         try {
-            int num_threads = num_threads_working + num_threads_waiting +
-                num_threads_other;
-            num_working_blocked_threads = num_threads_working +
-                num_threads_other;
-            num_new_threads = thread_pool_size - num_threads;
-            if (num_new_threads > 0) {
+            num_tasks_waiting = tasks.size();
+        } finally {
+            tasks_lock.unlock();
+        }
+
+        if (num_tasks_waiting > 0) {
+            /*
+             * If we have fewer than the default number of threads then
+             * start some new threads.
+             */
+            threads_lock.lock();
+            try {
+                int num_threads = num_threads_working + num_threads_waiting +
+                    num_threads_blocked + num_threads_other;
+                int num_threads_limit = thread_pool_size +
+                    num_threads_blocked;
+                // Determine the number of new threads that we want.
+                num_new_threads = num_tasks_waiting - num_threads_other -
+                    num_threads_waiting;
+                if (num_new_threads + num_threads > num_threads_limit) {
+                    /*
+                     * The number of threads that we want, plus the number
+                     * we already have, exceeds the number that we're
+                     * allowed to have.
+                     */
+                    num_new_threads = num_threads_limit - num_threads;
+                }
+                if (debug) {
+                    System.err.println("Pool has " +
+                        num_threads_working + " working threads, " +
+                        num_threads_blocked + " blocked threads, " +
+                        num_threads_waiting + " idle threads, " +
+                        num_threads_other + " other (starting up) threads. " +
+                        "will create " + num_new_threads + " new threads.");
+                }
+
                 for (int i = 0; i < num_new_threads; i++) {
                     MercuryWorkerThread t = thread_factory.newWorkerThread();
                     new_threads.add(t);
                     threads.add(t);
+                    num_threads_other++;
                 }
-                num_threads = thread_pool_size;
+            } finally {
+                threads_lock.unlock();
             }
-            num_threads_other += num_new_threads;
-        } finally {
-            threads_lock.unlock();
-        }
 
-        /*
-         * Start the threads while we're not holding the lock, this makes
-         * the above critical section smaller.
-         */
-        for (MercuryWorkerThread t : new_threads) {
-            t.start();
+            /*
+             * Start the threads while we're not holding the lock, this makes
+             * the above critical section smaller.
+             */
+            for (MercuryWorkerThread t : new_threads) {
+                t.start();
+            }
         }
-
-        /*
-         * If there are too many threads then superfluous threads will
-         * shut down when they try to get a new task.
-         */
-        return num_working_blocked_threads;
     }
 
     /**
@@ -420,7 +469,6 @@ public class MercuryThreadPool
     public void run()
     {
         boolean done = false;
-        int     num_working_blocked_threads;
         long    num_tasks_submitted;
         long    num_tasks_completed;
         boolean tasks_locked = false;
@@ -436,14 +484,14 @@ public class MercuryThreadPool
                     num_tasks_submitted = this.num_tasks_submitted;
                     num_tasks_completed = this.num_tasks_completed;
                     done = (num_tasks_submitted == num_tasks_completed);
-                    
+
                     if (!done) {
                         /*
                          * Start new threads if we have fewer than the
                          * thread_pool_size
                          */
-                        num_working_blocked_threads = checkThreads();
-                    
+                        checkThreads();
+
                         /*
                          * Acquire the main loop lock while we're still
                          * holding tasks_lock.  This prevents a race whereby
@@ -517,6 +565,7 @@ public class MercuryThreadPool
     public void runMain(Runnable run_main)
     {
         Task main_task = new Task(run_main);
+        startupInitialThreads();
         submit(main_task);
         try {
             /*
@@ -553,12 +602,7 @@ public class MercuryThreadPool
          * @param runnable The task the new thread should execute.
          */
         public MercuryThread newThread(final Runnable runnable) {
-            return new MercuryThread("Mercury Thread", allocateThreadId())
-                {
-                    public void run() {
-                        runnable.run();
-                    }
-                };
+            return new NativeThread(allocateThreadId(), runnable);
         }
 
         public MercuryWorkerThread newWorkerThread() {
diff --git a/java/runtime/MercuryWorkerThread.java b/java/runtime/MercuryWorkerThread.java
index cd1d2c1..a3d0d4f 100644
--- a/java/runtime/MercuryWorkerThread.java
+++ b/java/runtime/MercuryWorkerThread.java
@@ -78,5 +78,15 @@ public class MercuryWorkerThread extends MercuryThread
         pool.updateThreadCounts(status, new_status);
         status = new_status;
     }
+
+    public void blocked() {
+        pool.updateThreadCounts(status, ThreadStatus.BLOCKED);
+        status = ThreadStatus.BLOCKED;
+    }
+
+    public void running() {
+        pool.updateThreadCounts(status, ThreadStatus.WORKING);
+        status = ThreadStatus.WORKING;
+    }
 }
 
diff --git a/java/runtime/NativeThread.java b/java/runtime/NativeThread.java
new file mode 100644
index 0000000..a334389
--- /dev/null
+++ b/java/runtime/NativeThread.java
@@ -0,0 +1,42 @@
+
+package jmercury.runtime;
+
+/**
+ * Native thread.
+ *
+ * A Native thread is created by Mercury to tasks created with
+ * spawn_native/4.
+ */
+public class NativeThread extends MercuryThread
+{
+    private Runnable task;
+
+    /**
+     * Create a new native thread.
+     * @param id The id for the new thread.
+     * @param task The task to execute.
+     */
+    public NativeThread(int id, Runnable task) {
+        super("Mercury Native Thread", id);
+        this.task = task;
+    }
+
+    public void run() {
+        task.run();
+    }
+
+    /**
+     * no-op
+     */
+    public void blocked() {
+        ;
+    }
+
+    /**
+     * no-op
+     */
+    public void running() {
+        ;
+    }
+}
+
diff --git a/java/runtime/Semaphore.java b/java/runtime/Semaphore.java
new file mode 100644
index 0000000..f09ebb6
--- /dev/null
+++ b/java/runtime/Semaphore.java
@@ -0,0 +1,150 @@
+//
+// Copyright (C) 2014 The Mercury Team
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class Semaphore
+    extends java.util.concurrent.Semaphore
+{
+    public Semaphore(int permits, boolean fair)
+    {
+        super(permits, fair);
+    }
+
+    public Semaphore(int permits)
+    {
+        super(permits);
+    }
+
+    public void acquire()
+        throws InterruptedException
+    {
+        acquire(1);
+    }
+
+    public void acquire(int permits)
+        throws InterruptedException
+    {
+        boolean blocked = false;
+
+        try {
+            if (tryAcquire(permits, 0, TimeUnit.SECONDS)) {
+                return;
+            } else {
+                // This thread will (probably) block.
+                blocked();
+                blocked = true;
+                super.acquire(permits);
+            }
+        } finally {
+            if (blocked) {
+                running();
+                // The thread isn't blocked anymore.
+            }
+        }
+    }
+
+    public void acquireUninterruptibly()
+    {
+        acquireUninterruptibly(1);
+    }
+
+    public void acquireUninterruptibly(int permits)
+    {
+        boolean interrupted_once = false;
+        boolean keep_trying;
+        boolean success;
+
+        // Avoid a warning with the loop below
+        success = false;
+
+        /*
+         * tryAcquire is interruptable so we keep trying until we've
+         * executed it at least once and it was not interrupted.  We also
+         * track if we were interrupted so we can raise this condition
+         * again.
+         */
+        do {
+            keep_trying = true;
+            try {
+                success = tryAcquire(permits, 0, TimeUnit.SECONDS);
+                keep_trying = false;
+            } catch (InterruptedException e) {
+                interrupted_once = true;
+            }
+        } while (keep_trying);
+
+        if (!success) {
+            // This thread will (probably) block because tryAcquire failed.
+            blocked();
+            super.acquireUninterruptibly(permits);
+            running();
+        }
+
+        if (interrupted_once) {
+            /*
+            ** Because this was supposed to be uninterruptable we need to
+            ** raise the interrupt that we received earlier.
+            */
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public boolean tryAcquire(long timeout, TimeUnit unit)
+        throws InterruptedException
+    {
+        return tryAcquire(1, timeout, unit);
+    }
+
+    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+        throws InterruptedException
+    {
+        if (timeout < 1) {
+            return super.tryAcquire(permits, timeout, unit);
+        } else {
+            boolean result;
+
+            result = tryAcquire(permits, 0, unit);
+            if (result) {
+                // Blocking wasn't necessary
+                return true;
+            } else {
+                // Blocking required, notify thread.
+                blocked();
+                try {
+                    // Block.
+                    return tryAcquire(permits, timeout, unit);
+                } finally {
+                    running();
+                }
+            }
+        }
+    }
+
+    protected void blocked()
+    {
+        MercuryThread   mer_thread;
+
+        mer_thread = MercuryThread.currentThread();
+        if (mer_thread != null) {
+            mer_thread.blocked();
+        }
+    }
+
+    protected void running()
+    {
+        MercuryThread   mer_thread;
+
+        mer_thread = MercuryThread.currentThread();
+        if (mer_thread != null) {
+            mer_thread.running();
+        }
+    }
+}
+
diff --git a/java/runtime/Task.java b/java/runtime/Task.java
index c0e3ff4..1a2b947 100644
--- a/java/runtime/Task.java
+++ b/java/runtime/Task.java
@@ -57,6 +57,10 @@ public class Task implements Runnable
      * status change.
      */
     protected synchronized void updateStatus(Status status) {
+        if (MercuryThreadPool.debug) {
+            System.err.println("Thread: " + Thread.currentThread() +
+                "task: " + this + " status " + status);
+        }
         this.status = status;
         notifyAll();
     }
@@ -73,6 +77,10 @@ public class Task implements Runnable
             wait();
         }
     }
+
+    public String toString() {
+        return "Task " + id;
+    }
 }
 
 
diff --git a/java/runtime/ThreadStatus.java b/java/runtime/ThreadStatus.java
index 6fb2de2..2f40d0f 100644
--- a/java/runtime/ThreadStatus.java
+++ b/java/runtime/ThreadStatus.java
@@ -9,6 +9,7 @@ package jmercury.runtime;
 public enum ThreadStatus {
     WORKING,
     IDLE,
+    BLOCKED,
 
     /*
      * A thread in any other state, currently the only possibility is after
diff --git a/library/thread.m b/library/thread.m
index a98f92b..154a268 100644
--- a/library/thread.m
+++ b/library/thread.m
@@ -287,7 +287,7 @@ spawn_context_2(_, Res, "", !IO) :-
     Task task = new Task(rg);
     ThreadId = String.valueOf(task.getId());
     rg.setId(ThreadId);
-    JavaInternal.getThreadPool().submitExclusiveThread(task);
+    JavaInternal.getThreadPool().submit(task);
     Success = bool.YES;
 ").
 
diff --git a/library/thread.semaphore.m b/library/thread.semaphore.m
index 7058105..46c53e2 100644
--- a/library/thread.semaphore.m
+++ b/library/thread.semaphore.m
@@ -97,7 +97,7 @@ public class ML_Semaphore {
     "class [mercury]mercury.thread.semaphore__csharp_code.mercury_code.ML_Semaphore").
 :- pragma foreign_type("C#", semaphore, "thread__semaphore.ML_Semaphore").
 :- pragma foreign_type("Erlang", semaphore, "").
-:- pragma foreign_type("Java", semaphore, "java.util.concurrent.Semaphore").
+:- pragma foreign_type("Java", semaphore, "jmercury.runtime.Semaphore").
 
 :- pragma foreign_decl("C", "
 extern void
@@ -153,7 +153,7 @@ init(Semaphore, !IO) :-
     init(Count::in) = (Semaphore::uo),
     [will_not_call_mercury, thread_safe],
 "
-    Semaphore = new java.util.concurrent.Semaphore(Count);
+    Semaphore = new jmercury.runtime.Semaphore(Count);
 ").
 
 :- pragma foreign_code("C", "
-- 
2.1.0




More information about the reviews mailing list